use crate::engine::array::store::ArrayStore;
use crate::engine::array::wal::{ArrayDeletePayload, ArrayPutCell, ArrayPutPayload};
use crate::engine::array::write::{stamp_delete_cells, stamp_put_cells};
use nodedb_array::ArrayError;
#[derive(Debug, thiserror::Error)]
pub enum RecoveryError {
#[error(transparent)]
Array(#[from] ArrayError),
#[error(transparent)]
Engine(#[from] crate::engine::array::engine::ArrayEngineError),
#[error("recovery: unknown array {array}")]
UnknownArray { array: String },
}
#[derive(Debug, Default)]
pub struct RecoveryStats {
pub puts_applied: usize,
pub puts_skipped: usize,
pub deletes_applied: usize,
pub deletes_skipped: usize,
}
pub enum RecoveryRecord {
Put {
lsn: u64,
payload: ArrayPutPayload,
},
Delete {
lsn: u64,
payload: ArrayDeletePayload,
},
Flush {
lsn: u64,
array: nodedb_array::types::ArrayId,
},
}
pub struct Recovery<'a> {
store: &'a mut ArrayStore,
pub stats: RecoveryStats,
}
impl<'a> Recovery<'a> {
pub fn new(store: &'a mut ArrayStore) -> Self {
Self {
store,
stats: RecoveryStats::default(),
}
}
pub fn apply_record(&mut self, rec: RecoveryRecord) -> Result<(), RecoveryError> {
let durable = self.store.manifest().durable_lsn;
match rec {
RecoveryRecord::Put { lsn, payload } => {
if lsn <= durable {
self.stats.puts_skipped += 1;
return Ok(());
}
let cells: Vec<ArrayPutCell> = payload.cells;
debug_assert!(
cells.iter().all(|c| c.system_from_ms > 0),
"recovery: ArrayPutCell.system_from_ms must be the leader-stamped \
value (> 0), not a default — check that the WAL decoder does not \
zero-fill this field on version mismatch"
);
stamp_put_cells(self.store, cells, lsn)?;
self.stats.puts_applied += 1;
}
RecoveryRecord::Delete { lsn, payload } => {
if lsn <= durable {
self.stats.deletes_skipped += 1;
return Ok(());
}
debug_assert!(
payload.cells.iter().all(|c| c.system_from_ms > 0),
"recovery: ArrayDeleteCell.system_from_ms must be the leader-stamped \
value (> 0), not a default"
);
stamp_delete_cells(self.store, payload.cells, lsn)?;
self.stats.deletes_applied += 1;
}
RecoveryRecord::Flush { lsn, .. } => {
let m = self.store.manifest_mut();
m.durable_lsn = m.durable_lsn.max(lsn);
}
}
Ok(())
}
}