#![allow(dead_code)]
pub mod snapshot;
use std::fmt;
use std::fs::{self, File};
use std::io::{self, Read, Seek, SeekFrom};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::Mutex;
use xxhash_rust::xxh3::xxh3_64;
use crate::storage::FileManifest;
use crate::write::memtable::{MemTable, MemTableError};
const FRAME_HEADER_LEN: usize = 12;
const ALIGNMENT: usize = 4096;
#[derive(Clone, Debug)]
pub struct RecoveryOptions {
pub wal_path: PathBuf,
pub manifest_source: PathBuf,
pub manifest_target: PathBuf,
pub memtable_limit_bytes: u64,
pub wal_tail_bytes: Option<u64>,
}
impl RecoveryOptions {
pub fn new<P: Into<PathBuf>>(wal_path: P, manifest: P) -> Self {
let manifest_path = manifest.into();
Self {
wal_path: wal_path.into(),
manifest_source: manifest_path.clone(),
manifest_target: manifest_path,
memtable_limit_bytes: 512 * 1024 * 1024,
wal_tail_bytes: None,
}
}
pub fn with_manifest_target<P: Into<PathBuf>>(mut self, path: P) -> Self {
self.manifest_target = path.into();
self
}
pub fn with_memtable_limit(mut self, limit: u64) -> Self {
self.memtable_limit_bytes = limit.max(1);
self
}
pub fn with_wal_tail(mut self, bytes: Option<u64>) -> Self {
self.wal_tail_bytes = bytes;
self
}
}
#[derive(Clone, Debug)]
pub struct ManifestRemap {
pub source: PathBuf,
pub active: PathBuf,
pub bytes_copied: u64,
pub updated: bool,
}
pub struct RecoveryOutcome {
pub memtables: Vec<Arc<MemTable>>,
pub manifest: ManifestRemap,
pub replayed_entries: usize,
pub corrupted_frames: usize,
pub truncated_bytes: u64,
pub elapsed: Duration,
}
impl fmt::Debug for RecoveryOutcome {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RecoveryOutcome")
.field("memtables", &self.memtables.len())
.field("manifest", &self.manifest)
.field("replayed_entries", &self.replayed_entries)
.field("corrupted_frames", &self.corrupted_frames)
.field("truncated_bytes", &self.truncated_bytes)
.field("elapsed", &self.elapsed)
.finish()
}
}
#[derive(Debug, thiserror::Error)]
pub enum RecoveryError {
#[error("io error: {0}")]
Io(#[from] io::Error),
#[error("manifest not found at {0}")]
MissingManifest(PathBuf),
#[error("failed to prepare manifest directory: {0}")]
ManifestDir(String),
#[error("malformed wal frame at offset {offset}: {reason}")]
WalCorruption { offset: u64, reason: String },
#[error("memtable error during replay: {0}")]
MemTable(#[from] MemTableError),
}
pub struct RecoveryManager {
options: RecoveryOptions,
payload: Mutex<Vec<u8>>,
}
impl RecoveryManager {
pub fn new(options: RecoveryOptions) -> Self {
Self {
options,
payload: Mutex::new(Vec::with_capacity(4096)),
}
}
pub fn recover(&self) -> Result<RecoveryOutcome, RecoveryError> {
let start = Instant::now();
let manifest = self.remap_manifest()?;
let replay = self.replay_wal()?;
Ok(RecoveryOutcome {
memtables: replay.memtables,
manifest,
replayed_entries: replay.entries,
corrupted_frames: replay.corrupted_frames,
truncated_bytes: replay.truncated_bytes,
elapsed: start.elapsed(),
})
}
fn remap_manifest(&self) -> Result<ManifestRemap, RecoveryError> {
let source = &self.options.manifest_source;
let target = &self.options.manifest_target;
let (active, bytes_copied, updated) = if target.exists() {
(target.clone(), 0, false)
} else if source.exists() {
if let Some(parent) = target.parent() {
fs::create_dir_all(parent)
.map_err(|err| RecoveryError::ManifestDir(err.to_string()))?;
}
let copied = fs::copy(source, target)?;
(target.clone(), copied as u64, true)
} else {
return Err(RecoveryError::MissingManifest(source.clone()));
};
Ok(ManifestRemap {
source: source.clone(),
active,
bytes_copied,
updated,
})
}
fn replay_wal(&self) -> Result<WalReplay, RecoveryError> {
let path = &self.options.wal_path;
if !path.exists() {
return Ok(WalReplay::empty());
}
let mut file = File::open(path)?;
let mut start_offset = 0u64;
let wal_len = file.metadata()?.len();
if let Some(tail) = self.options.wal_tail_bytes {
if wal_len > tail {
let offset = wal_len - tail;
let aligned = offset - (offset % ALIGNMENT as u64);
start_offset = aligned;
file.seek(SeekFrom::Start(aligned))?;
}
}
let mut memtable = Arc::new(MemTable::with_limit(self.options.memtable_limit_bytes));
let mut tables = vec![Arc::clone(&memtable)];
let mut entries = 0usize;
let mut corrupted_frames = 0usize;
let mut truncated_bytes = 0u64;
let mut last_good = start_offset;
loop {
let frame_offset = file.stream_position()?;
match read_frame(frame_offset, &mut file, &self.payload) {
Ok(Some(frame)) => {
last_good = frame_offset + frame.total_len as u64;
if frame.valid_checksum {
match decode_entries(&frame.payload) {
Ok(decoded) => {
for record in decoded {
loop {
match apply_record(&memtable, &record) {
Ok(()) => {
entries += 1;
break;
}
Err(MemTableError::Backpressure) => {
memtable = Arc::new(MemTable::with_limit(
self.options.memtable_limit_bytes,
));
tables.push(Arc::clone(&memtable));
}
Err(other) => return Err(other.into()),
}
}
}
}
Err(reason) => {
corrupted_frames += 1;
eprintln!("wal decode error at frame {frame_offset}: {reason}");
}
}
} else {
corrupted_frames += 1;
}
}
Ok(None) => break,
Err(err) => {
return Err(err);
}
}
}
truncated_bytes = truncated_bytes.max(wal_len.saturating_sub(last_good));
Ok(WalReplay {
memtables: tables,
entries,
corrupted_frames,
truncated_bytes,
})
}
}
struct WalReplay {
memtables: Vec<Arc<MemTable>>,
entries: usize,
corrupted_frames: usize,
truncated_bytes: u64,
}
impl WalReplay {
fn empty() -> Self {
Self {
memtables: vec![Arc::new(MemTable::new())],
entries: 0,
corrupted_frames: 0,
truncated_bytes: 0,
}
}
}
struct WalFrame {
payload: Vec<u8>,
total_len: usize,
valid_checksum: bool,
}
fn read_frame(
offset: u64,
file: &mut File,
scratch: &Mutex<Vec<u8>>,
) -> Result<Option<WalFrame>, RecoveryError> {
let mut header = [0u8; FRAME_HEADER_LEN];
match file.read_exact(&mut header) {
Ok(()) => {}
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
Err(err) => return Err(RecoveryError::Io(err)),
}
let payload_len = u32::from_le_bytes([header[0], header[1], header[2], header[3]]) as usize;
let frame_len = u32::from_le_bytes([header[4], header[5], header[6], header[7]]) as usize;
let checksum = u32::from_le_bytes([header[8], header[9], header[10], header[11]]);
if payload_len == 0 && frame_len == 0 && checksum == 0 {
return Ok(None);
}
if frame_len != FRAME_HEADER_LEN {
return Err(RecoveryError::WalCorruption {
offset,
reason: format!("unexpected frame header length {frame_len}"),
});
}
let mut payload = scratch.lock();
payload.resize(payload_len, 0);
if let Err(err) = file.read_exact(&mut payload[..]) {
return Err(RecoveryError::WalCorruption {
offset,
reason: format!("incomplete frame payload: {err}"),
});
}
let computed = (xxh3_64(&payload) & 0xffff_ffff) as u32;
let frame_total = align_up(payload_len + FRAME_HEADER_LEN, ALIGNMENT);
if frame_total > payload_len + FRAME_HEADER_LEN {
file.seek(SeekFrom::Current(
(frame_total - (payload_len + FRAME_HEADER_LEN)) as i64,
))?;
}
Ok(Some(WalFrame {
payload: payload.clone(),
total_len: frame_total,
valid_checksum: checksum == computed,
}))
}
fn decode_entries(payload: &[u8]) -> Result<Vec<WalRecord>, String> {
let mut cursor = 0usize;
let mut entries = Vec::new();
while cursor < payload.len() {
if payload.len() - cursor < 17 {
return Err("truncated wal entry".into());
}
let tombstone = payload[cursor] != 0;
cursor += 1;
let key_len = u32::from_le_bytes([
payload[cursor],
payload[cursor + 1],
payload[cursor + 2],
payload[cursor + 3],
]) as usize;
cursor += 4;
let value_len = u32::from_le_bytes([
payload[cursor],
payload[cursor + 1],
payload[cursor + 2],
payload[cursor + 3],
]) as usize;
cursor += 4;
cursor += 8;
if payload.len() < cursor + key_len + value_len {
return Err("wal entry extends beyond frame".into());
}
let key = payload[cursor..cursor + key_len].to_vec();
cursor += key_len;
let value = payload[cursor..cursor + value_len].to_vec();
cursor += value_len;
entries.push(WalRecord {
tombstone,
key,
value,
});
}
Ok(entries)
}
fn apply_record(table: &Arc<MemTable>, record: &WalRecord) -> Result<(), MemTableError> {
if record.tombstone {
table.delete(&record.key)?;
} else {
table.put(&record.key, record.value.clone())?;
}
Ok(())
}
fn align_up(len: usize, align: usize) -> usize {
if len % align == 0 {
len
} else {
len + (align - (len % align))
}
}
#[derive(Clone, Debug)]
struct WalRecord {
tombstone: bool,
key: Vec<u8>,
value: Vec<u8>,
}
pub fn open_remapped_manifest(
remap: &ManifestRemap,
) -> Result<FileManifest, crate::storage::StorageError> {
FileManifest::open(&remap.active)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn align_up_rounds_correctly() {
assert_eq!(align_up(0, 4096), 0);
assert_eq!(align_up(1, 4096), 4096);
assert_eq!(align_up(4096, 4096), 4096);
assert_eq!(align_up(4097, 4096), 8192);
}
}