use std::collections::HashMap;
use std::io;
use std::path::{Path, PathBuf};
use std::time::Instant;
use super::wal::{Lsn, RankRegime, WalConfig, WalError, WalReader, WalRecord};
use log::warn;
#[derive(Debug)]
pub enum RecoveryError {
Io(io::Error),
CorruptedWal(String),
NoCheckpoint,
InvalidCheckpoint { lsn: Lsn, reason: String },
TransactionInconsistency { tx_id: u64, reason: String },
RecoveryFailed(String),
}
impl std::fmt::Display for RecoveryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RecoveryError::Io(e) => write!(f, "I/O error: {}", e),
RecoveryError::CorruptedWal(msg) => write!(f, "Corrupted WAL: {}", msg),
RecoveryError::NoCheckpoint => write!(f, "No checkpoint found in WAL"),
RecoveryError::InvalidCheckpoint { lsn, reason } => {
write!(f, "Invalid checkpoint at LSN {}: {}", lsn, reason)
}
RecoveryError::TransactionInconsistency { tx_id, reason } => {
write!(f, "Transaction {} inconsistency: {}", tx_id, reason)
}
RecoveryError::RecoveryFailed(msg) => write!(f, "Recovery failed: {}", msg),
}
}
}
impl std::error::Error for RecoveryError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
RecoveryError::Io(e) => Some(e),
_ => None,
}
}
}
impl From<io::Error> for RecoveryError {
fn from(e: io::Error) -> Self {
RecoveryError::Io(e)
}
}
impl From<WalError> for RecoveryError {
fn from(e: WalError) -> Self {
match e {
WalError::Io(io_err) => RecoveryError::Io(io_err),
WalError::CorruptedRecord(msg) => RecoveryError::CorruptedWal(msg),
WalError::UnexpectedEof => RecoveryError::CorruptedWal("Unexpected EOF".into()),
WalError::InvalidRecordType(t) => {
RecoveryError::CorruptedWal(format!("Invalid record type: {}", t))
}
WalError::AlreadyExists => RecoveryError::RecoveryFailed("WAL already exists".into()),
WalError::NotFound => RecoveryError::NoCheckpoint,
WalError::ParentNotFound(path) => RecoveryError::RecoveryFailed(format!(
"Parent directory not found: {}",
path.display()
)),
WalError::InvalidRegimeStamp(msg) => {
RecoveryError::RecoveryFailed(format!("Invalid WAL regime stamp: {}", msg))
}
}
}
}
pub type Result<T> = std::result::Result<T, RecoveryError>;
#[derive(Debug, Clone)]
pub enum RecoveredOperation {
Insert {
lsn: Lsn,
term: Vec<u8>,
value: Option<Vec<u8>>,
},
Remove {
lsn: Lsn,
term: Vec<u8>,
},
Increment {
lsn: Lsn,
term: Vec<u8>,
delta: i64,
result: Option<i64>,
},
Upsert {
lsn: Lsn,
term: Vec<u8>,
value: Vec<u8>,
},
CompareAndSwap {
lsn: Lsn,
term: Vec<u8>,
new_value: Vec<u8>,
success: bool,
},
}
impl RecoveredOperation {
pub fn lsn(&self) -> Lsn {
match self {
Self::Insert { lsn, .. }
| Self::Remove { lsn, .. }
| Self::Increment { lsn, .. }
| Self::Upsert { lsn, .. }
| Self::CompareAndSwap { lsn, .. } => *lsn,
}
}
pub fn term(&self) -> &[u8] {
match self {
Self::Insert { term, .. }
| Self::Remove { term, .. }
| Self::Increment { term, .. }
| Self::Upsert { term, .. }
| Self::CompareAndSwap { term, .. } => term,
}
}
}
pub fn reconcile_lww(
recovered_ops: Vec<(Lsn, WalRecord)>,
loaded_from_disk: bool,
checkpoint_lsn: Lsn,
rank_regime: RankRegime,
) -> Vec<RecoveredOperation> {
reconcile_lww_with_regime(recovered_ops, loaded_from_disk, checkpoint_lsn, move |_| {
rank_regime
})
}
pub fn reconcile_lww_with_regime<R: Fn(Lsn) -> RankRegime>(
recovered_ops: Vec<(Lsn, WalRecord)>,
loaded_from_disk: bool,
checkpoint_lsn: Lsn,
regime_of: R,
) -> Vec<RecoveredOperation> {
let mut rank: HashMap<Lsn, u64> = HashMap::new();
for (_marker_lsn, record) in &recovered_ops {
if let WalRecord::CommitRank {
data_lsn,
generation,
..
} = record
{
rank.insert(*data_lsn, *generation);
}
}
let mut stamped: Vec<(u64, Lsn, RecoveredOperation)> = Vec::with_capacity(recovered_ops.len());
for (lsn, record) in recovered_ops {
if loaded_from_disk && checkpoint_lsn > 0 && lsn <= checkpoint_lsn {
continue;
}
let g = match rank.get(&lsn).copied() {
Some(commit_seq) => commit_seq,
None => match regime_of(lsn) {
RankRegime::Owned => lsn,
RankRegime::Overlay => continue,
},
};
for op in recovered_operations_from_record(lsn, record) {
stamped.push((g, lsn, op));
}
}
stamped.sort_by(|a, b| (a.0, a.1).cmp(&(b.0, b.1)));
stamped.into_iter().map(|(_, _, op)| op).collect()
}
pub fn recovered_operations_from_record(lsn: Lsn, record: WalRecord) -> Vec<RecoveredOperation> {
match record {
WalRecord::Insert { term, value } => {
vec![RecoveredOperation::Insert { lsn, term, value }]
}
WalRecord::Remove { term } => {
vec![RecoveredOperation::Remove { lsn, term }]
}
WalRecord::Increment {
term,
delta,
result,
} => vec![RecoveredOperation::Increment {
lsn,
term,
delta,
result: Some(result),
}],
WalRecord::Upsert { term, value } => {
vec![RecoveredOperation::Upsert { lsn, term, value }]
}
WalRecord::CompareAndSwap {
term,
new_value,
success,
..
} => {
if success {
vec![RecoveredOperation::CompareAndSwap {
lsn,
term,
new_value,
success,
}]
} else {
vec![]
}
}
WalRecord::BatchInsert { entries } => entries
.into_iter()
.map(|(term, value)| RecoveredOperation::Insert { lsn, term, value })
.collect(),
WalRecord::BatchIncrement { entries } => entries
.into_iter()
.map(|(term, delta)| RecoveredOperation::Increment {
lsn,
term,
delta,
result: None,
})
.collect(),
WalRecord::BeginTx { .. }
| WalRecord::CommitTx { .. }
| WalRecord::AbortTx { .. }
| WalRecord::Checkpoint { .. }
| WalRecord::VersionUpdate { .. }
| WalRecord::VersionDurable { .. }
| WalRecord::VersionGc { .. }
| WalRecord::CommitRank { .. } => vec![],
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TransactionState {
InProgress,
Committed,
Aborted,
}
#[derive(Debug, Default)]
struct PendingTransaction {
state: Option<TransactionState>,
operations: Vec<RecoveredOperation>,
begin_lsn: Option<Lsn>,
}
#[derive(Debug, Clone, Default)]
pub struct RecoveryStats {
pub records_scanned: u64,
pub valid_records: u64,
pub corrupted_records: u64,
pub committed_transactions: u64,
pub aborted_transactions: u64,
pub incomplete_transactions: u64,
pub insert_operations: u64,
pub remove_operations: u64,
pub checkpoint_lsn: Option<Lsn>,
pub duration_ms: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecoveryMode {
Normal,
RebuildFromWal,
RepairInPlace,
CreatedNew,
}
impl RecoveryMode {
pub fn is_normal(&self) -> bool {
matches!(self, RecoveryMode::Normal)
}
pub fn recovered(&self) -> bool {
!self.is_normal()
}
}
#[derive(Debug, Clone)]
pub struct RecoveryReport {
pub mode: RecoveryMode,
pub records_replayed: u64,
pub terms_recovered: u64,
pub corrupted_file: Option<PathBuf>,
pub corruption_reason: Option<String>,
pub duration_ms: u64,
pub archive_segments_used: Vec<PathBuf>,
}
impl RecoveryReport {
pub fn normal() -> Self {
Self {
mode: RecoveryMode::Normal,
records_replayed: 0,
terms_recovered: 0,
corrupted_file: None,
corruption_reason: None,
duration_ms: 0,
archive_segments_used: Vec::new(),
}
}
pub fn created_new() -> Self {
Self {
mode: RecoveryMode::CreatedNew,
records_replayed: 0,
terms_recovered: 0,
corrupted_file: None,
corruption_reason: None,
duration_ms: 0,
archive_segments_used: Vec::new(),
}
}
pub fn rebuild_from_wal(
corrupted_file: PathBuf,
corruption_reason: String,
records_replayed: u64,
terms_recovered: u64,
archive_segments_used: Vec<PathBuf>,
duration_ms: u64,
) -> Self {
Self {
mode: RecoveryMode::RebuildFromWal,
records_replayed,
terms_recovered,
corrupted_file: Some(corrupted_file),
corruption_reason: Some(corruption_reason),
duration_ms,
archive_segments_used,
}
}
}
#[derive(Debug, Clone)]
pub enum CorruptionType {
InvalidHeader(String),
ArenaChecksum {
arena_id: u32,
expected: u32,
found: u32,
},
Truncated { expected: usize, actual: usize },
InvalidRootDescriptor(String),
IoError(String),
}
impl std::fmt::Display for CorruptionType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CorruptionType::InvalidHeader(msg) => write!(f, "Invalid header: {}", msg),
CorruptionType::ArenaChecksum {
arena_id,
expected,
found,
} => {
write!(
f,
"Arena {} checksum mismatch: expected {:#x}, found {:#x}",
arena_id, expected, found
)
}
CorruptionType::Truncated { expected, actual } => {
write!(
f,
"File truncated: expected {} bytes, found {}",
expected, actual
)
}
CorruptionType::InvalidRootDescriptor(msg) => {
write!(f, "Invalid root descriptor: {}", msg)
}
CorruptionType::IoError(msg) => write!(f, "I/O error: {}", msg),
}
}
}
#[derive(Debug)]
pub struct RecoveredState {
operations: Vec<RecoveredOperation>,
pub next_lsn: Lsn,
pub stats: RecoveryStats,
}
impl RecoveredState {
pub fn operations(&self) -> impl Iterator<Item = &RecoveredOperation> {
self.operations.iter()
}
pub fn operation_count(&self) -> usize {
self.operations.len()
}
pub fn into_operations(self) -> Vec<RecoveredOperation> {
self.operations
}
}
pub struct RecoveryManager {
wal_path: PathBuf,
}
impl RecoveryManager {
pub fn new<P: AsRef<Path>>(wal_path: P) -> Self {
RecoveryManager {
wal_path: wal_path.as_ref().to_path_buf(),
}
}
pub fn needs_recovery(&self) -> Result<bool> {
if !self.wal_path.exists() {
return Ok(false);
}
let wal_reader = match WalReader::new(&self.wal_path) {
Ok(r) => r,
Err(_) => return Ok(false), };
match wal_reader.iter().next() {
Some(Ok(_)) => Ok(true),
Some(Err(_)) => Ok(false), None => Ok(false), }
}
pub fn recover(&self) -> Result<RecoveredState> {
let start_time = Instant::now();
let mut stats = RecoveryStats::default();
if !self.wal_path.exists() {
return Ok(RecoveredState {
operations: Vec::new(),
next_lsn: 1,
stats,
});
}
let (checkpoint_lsn, transactions) = self.analysis_phase(&mut stats)?;
stats.checkpoint_lsn = checkpoint_lsn;
let (operations, next_lsn) = self.redo_phase(checkpoint_lsn, &transactions, &mut stats)?;
stats.duration_ms = start_time.elapsed().as_millis() as u64;
Ok(RecoveredState {
operations,
next_lsn,
stats,
})
}
fn analysis_phase(
&self,
stats: &mut RecoveryStats,
) -> Result<(Option<Lsn>, HashMap<u64, TransactionState>)> {
let wal_reader = WalReader::new(&self.wal_path)?;
let mut checkpoint_lsn: Option<Lsn> = None;
let mut transactions: HashMap<u64, TransactionState> = HashMap::new();
for result in wal_reader.iter() {
stats.records_scanned += 1;
let (_lsn, record) = match result {
Ok(r) => {
stats.valid_records += 1;
r
}
Err(e) => {
stats.corrupted_records += 1;
warn!(
"Corrupted record during analysis; stopping at durable prefix: {:?}",
e
);
break;
}
};
match record {
WalRecord::Checkpoint {
checkpoint_lsn: cp_lsn,
..
} => {
checkpoint_lsn = Some(cp_lsn);
}
WalRecord::BeginTx { tx_id } => {
transactions.insert(tx_id, TransactionState::InProgress);
}
WalRecord::CommitTx { tx_id } => {
if let Some(state) = transactions.get_mut(&tx_id) {
*state = TransactionState::Committed;
stats.committed_transactions += 1;
}
}
WalRecord::AbortTx { tx_id } => {
if let Some(state) = transactions.get_mut(&tx_id) {
*state = TransactionState::Aborted;
stats.aborted_transactions += 1;
}
}
_ => {}
}
}
for state in transactions.values() {
if *state == TransactionState::InProgress {
stats.incomplete_transactions += 1;
}
}
Ok((checkpoint_lsn, transactions))
}
fn redo_phase(
&self,
_checkpoint_lsn: Option<Lsn>,
_transactions: &HashMap<u64, TransactionState>,
stats: &mut RecoveryStats,
) -> Result<(Vec<RecoveredOperation>, Lsn)> {
let wal_reader = WalReader::new(&self.wal_path)?;
let mut operations: Vec<RecoveredOperation> = Vec::new();
let mut next_lsn: Lsn = 1;
let mut current_tx: Option<u64> = None;
let mut pending_tx_ops: HashMap<u64, Vec<RecoveredOperation>> = HashMap::new();
for result in wal_reader.iter() {
let (lsn, record) = match result {
Ok(r) => r,
Err(e) => {
warn!(
"Corrupted record during redo; stopping at durable prefix: {:?}",
e
);
break;
}
};
next_lsn = lsn + 1;
match record {
WalRecord::BeginTx { tx_id } => {
current_tx = Some(tx_id);
pending_tx_ops.entry(tx_id).or_default();
}
WalRecord::CommitTx { tx_id } => {
if let Some(ops) = pending_tx_ops.remove(&tx_id) {
operations.extend(ops);
}
if current_tx == Some(tx_id) {
current_tx = None;
}
}
WalRecord::AbortTx { tx_id } => {
pending_tx_ops.remove(&tx_id);
if current_tx == Some(tx_id) {
current_tx = None;
}
}
record => {
let ops = recovered_operations_from_record(lsn, record);
if let Some(tx_id) = current_tx {
pending_tx_ops.entry(tx_id).or_default().extend(ops);
} else {
operations.extend(ops);
}
}
}
}
for op in &operations {
match op {
RecoveredOperation::Insert { .. } => stats.insert_operations += 1,
RecoveredOperation::Remove { .. } => stats.remove_operations += 1,
RecoveredOperation::Increment { .. } => stats.insert_operations += 1,
RecoveredOperation::Upsert { .. } => stats.insert_operations += 1,
RecoveredOperation::CompareAndSwap { .. } => stats.insert_operations += 1,
}
}
Ok((operations, next_lsn))
}
pub fn recover_with_callback<F>(&self, mut callback: F) -> Result<RecoveryStats>
where
F: FnMut(RecoveredOperation) -> Result<()>,
{
let state = self.recover()?;
for op in state.operations {
callback(op)?;
}
Ok(state.stats)
}
}
pub struct IncrementalRecovery {
reader: WalReader,
#[allow(dead_code)]
transactions: HashMap<u64, TransactionState>,
current_tx: Option<u64>,
pending_ops: Vec<RecoveredOperation>,
next_lsn: Lsn,
stopped_at_corruption: bool,
#[allow(dead_code)]
analysis_complete: bool,
}
impl IncrementalRecovery {
pub fn new<P: AsRef<Path>>(wal_path: P) -> Result<Self> {
let wal_reader = WalReader::new(wal_path)?;
Ok(IncrementalRecovery {
reader: wal_reader,
transactions: HashMap::new(),
current_tx: None,
pending_ops: Vec::new(),
next_lsn: 1,
stopped_at_corruption: false,
analysis_complete: false,
})
}
pub fn next_batch(&mut self, max_ops: usize) -> Result<Option<Vec<RecoveredOperation>>> {
if self.stopped_at_corruption {
return Ok(None);
}
let mut batch = Vec::with_capacity(max_ops);
while batch.len() < max_ops {
match self.reader.next_record() {
Some(Ok((lsn, record))) => {
self.next_lsn = lsn + 1;
if let Some(ops) = self.process_record(lsn, record)? {
batch.extend(ops);
}
}
Some(Err(_)) => {
self.stopped_at_corruption = true;
break;
}
None => {
if batch.is_empty() {
return Ok(None);
}
break;
}
}
}
if batch.is_empty() {
Ok(None)
} else {
Ok(Some(batch))
}
}
fn process_record(
&mut self,
lsn: Lsn,
record: WalRecord,
) -> Result<Option<Vec<RecoveredOperation>>> {
match record {
WalRecord::BeginTx { tx_id } => {
self.current_tx = Some(tx_id);
self.pending_ops.clear();
Ok(None)
}
WalRecord::CommitTx { tx_id } => {
if self.current_tx == Some(tx_id) {
let ops = std::mem::take(&mut self.pending_ops);
self.current_tx = None;
Ok(Some(ops))
} else {
Ok(None)
}
}
WalRecord::AbortTx { tx_id } => {
if self.current_tx == Some(tx_id) {
self.pending_ops.clear();
self.current_tx = None;
}
Ok(None)
}
record => {
let ops = recovered_operations_from_record(lsn, record);
if self.current_tx.is_some() {
self.pending_ops.extend(ops);
Ok(None)
} else {
Ok((!ops.is_empty()).then_some(ops))
}
}
}
}
pub fn next_lsn(&self) -> Lsn {
self.next_lsn
}
}
pub fn apply_to_trie<V, F>(
operations: impl IntoIterator<Item = RecoveredOperation>,
mut insert_fn: F,
) -> std::result::Result<usize, String>
where
F: FnMut(&[u8], Option<&[u8]>) -> std::result::Result<(), String>,
{
let mut count = 0;
for op in operations {
match op {
RecoveredOperation::Insert {
lsn: _,
term,
value,
} => {
insert_fn(&term, value.as_deref())?;
count += 1;
}
RecoveredOperation::Remove { lsn: _, term } => {
insert_fn(&term, None)?;
count += 1;
}
RecoveredOperation::Increment {
lsn: _,
term,
delta: _,
result,
} => {
let value_bytes = result.unwrap_or(0).to_le_bytes();
insert_fn(&term, Some(&value_bytes))?;
count += 1;
}
RecoveredOperation::Upsert {
lsn: _,
term,
value,
} => {
insert_fn(&term, Some(&value))?;
count += 1;
}
RecoveredOperation::CompareAndSwap {
lsn: _,
term,
new_value,
success,
} => {
if success {
insert_fn(&term, Some(&new_value))?;
count += 1;
}
}
}
}
Ok(count)
}
pub fn detect_corruption<P: AsRef<Path>>(
path: P,
check_arenas: bool,
) -> std::result::Result<Option<CorruptionType>, RecoveryError> {
use std::fs::File;
use std::io::Read;
let path = path.as_ref();
if !path.exists() {
return Ok(None); }
let mut file = File::open(path).map_err(|e| RecoveryError::Io(e))?;
let mut header_bytes = [0u8; 64];
match file.read_exact(&mut header_bytes) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
let actual_size = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
return Ok(Some(CorruptionType::Truncated {
expected: 64,
actual: actual_size,
}));
}
Err(e) => return Err(RecoveryError::Io(e)),
}
let magic_u64 = u64::from_le_bytes([
header_bytes[0],
header_bytes[1],
header_bytes[2],
header_bytes[3],
header_bytes[4],
header_bytes[5],
header_bytes[6],
header_bytes[7],
]);
const DISK_MANAGER_MAGIC: u64 = 0x5041_5254_0001_0000;
if magic_u64 == DISK_MANAGER_MAGIC {
return Ok(None);
}
let magic_4 = &header_bytes[0..4];
if magic_4 != b"PART" && magic_4 != b"ARTC" {
return Ok(Some(CorruptionType::InvalidHeader(format!(
"Invalid magic: u64={:#018x} (bytes {:?})",
magic_u64,
&header_bytes[0..8]
))));
}
let version = header_bytes[4];
if version == 0 || version > 2 {
return Ok(Some(CorruptionType::InvalidHeader(format!(
"Unsupported version: {}",
version
))));
}
if version >= 2 {
let stored_checksum = u32::from_le_bytes([
header_bytes[32],
header_bytes[33],
header_bytes[34],
header_bytes[35],
]);
let computed_checksum = crc32_header(&header_bytes[0..32]);
if stored_checksum != computed_checksum {
return Ok(Some(CorruptionType::InvalidHeader(format!(
"Header checksum mismatch: stored {:#x}, computed {:#x}",
stored_checksum, computed_checksum
))));
}
}
if check_arenas {
let file_size = file.metadata().map(|m| m.len()).unwrap_or(0);
if file_size < 64 {
return Ok(Some(CorruptionType::Truncated {
expected: 64,
actual: file_size as usize,
}));
}
}
Ok(None)
}
fn crc32_header(data: &[u8]) -> u32 {
let mut crc: u32 = 0xFFFFFFFF;
for byte in data {
crc ^= *byte as u32;
for _ in 0..8 {
if crc & 1 != 0 {
crc = (crc >> 1) ^ 0xEDB88320;
} else {
crc >>= 1;
}
}
}
!crc
}
pub fn find_wal_archive_segments<P: AsRef<Path>>(archive_dir: P) -> Vec<PathBuf> {
find_wal_segments_in_dir(archive_dir)
}
pub fn find_wal_pending_segments<P: AsRef<Path>>(pending_dir: P) -> Vec<PathBuf> {
find_wal_segments_in_dir(pending_dir)
}
fn find_wal_segments_in_dir<P: AsRef<Path>>(dir: P) -> Vec<PathBuf> {
let dir = dir.as_ref();
if !dir.exists() {
return Vec::new();
}
let mut segments: Vec<_> = std::fs::read_dir(dir)
.ok()
.into_iter()
.flatten()
.filter_map(|entry| entry.ok())
.filter(|entry| {
let path = entry.path();
path.extension().and_then(|e| e.to_str()) == Some("segment")
})
.map(|entry| entry.path())
.collect();
segments.sort();
segments
}
pub fn collect_all_wal_segments(
wal_path: &Path,
archive_dir: &Path,
pending_dir: &Path,
) -> Vec<PathBuf> {
let parent = wal_path.parent().unwrap_or(Path::new("."));
let mut all_segments = Vec::new();
let archive_path = if archive_dir.is_absolute() {
archive_dir.to_path_buf()
} else {
parent.join(archive_dir)
};
all_segments.extend(find_wal_segments_in_dir(&archive_path));
let pending_path = if pending_dir.is_absolute() {
pending_dir.to_path_buf()
} else {
parent.join(pending_dir)
};
all_segments.extend(find_wal_segments_in_dir(&pending_path));
if wal_path.exists() {
if let Ok(metadata) = std::fs::metadata(wal_path) {
if metadata.len() > super::wal::WalHeader::SIZE as u64 {
all_segments.push(wal_path.to_path_buf());
}
}
}
sort_segments_by_lsn(&mut all_segments);
all_segments
}
pub fn collect_retained_wal_segments_for_rebuild(
wal_path: &Path,
config: &WalConfig,
pending_dir: &Path,
) -> std::result::Result<Vec<PathBuf>, io::Error> {
let mut segments = collect_all_wal_segments(wal_path, &config.archive_dir, pending_dir);
if let Some(active_index) = segments.iter().position(|path| path == wal_path) {
let parent = wal_path.parent().unwrap_or(Path::new("."));
let archive_dir = if config.archive_dir.is_absolute() {
config.archive_dir.clone()
} else {
parent.join(&config.archive_dir)
};
std::fs::create_dir_all(&archive_dir)?;
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let retained_path = archive_dir.join(format!(
"wal_recovery_active_{}_{}.segment",
std::process::id(),
nanos
));
std::fs::rename(wal_path, &retained_path)?;
segments[active_index] = retained_path;
}
Ok(segments)
}
pub fn get_segment_first_lsn(segment_path: &Path) -> Option<super::wal::Lsn> {
let mut reader = super::wal::WalReader::new(segment_path).ok()?;
reader
.next_record()
.and_then(|r| r.ok())
.map(|(lsn, _)| lsn)
}
pub fn sort_segments_by_lsn(segments: &mut [PathBuf]) {
segments.sort_by(|a, b| {
let lsn_a = get_segment_first_lsn(a);
let lsn_b = get_segment_first_lsn(b);
match (lsn_a, lsn_b) {
(Some(a), Some(b)) => a.cmp(&b),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => a.cmp(b), }
});
}
pub fn rebuild_from_wal_segments<F>(
segments: &[PathBuf],
mut apply_fn: F,
) -> std::result::Result<(u64, u64), RecoveryError>
where
F: FnMut(RecoveredOperation) -> std::result::Result<(), String>,
{
let mut records_replayed: u64 = 0;
let mut terms_recovered: u64 = 0;
'segments: for segment_path in segments {
let reader = match WalReader::new(segment_path) {
Ok(r) => r,
Err(_) => continue, };
for result in reader.iter() {
let (lsn, record) = match result {
Ok(r) => r,
Err(e) => {
warn!(
"Corrupted WAL segment record during rebuild; stopping at durable prefix: {:?}",
e
);
break 'segments;
}
};
records_replayed += 1;
for op in recovered_operations_from_record(lsn, record) {
if let Err(error) = apply_fn(op) {
warn!(
"Recovered operation failed during rebuild; stopping at durable prefix: {}",
error
);
break 'segments;
}
terms_recovered += 1;
}
}
}
Ok((records_replayed, terms_recovered))
}
pub fn rebuild_from_wal_segments_regime_aware<F>(
segments: &[PathBuf],
mut apply_fn: F,
) -> std::result::Result<(u64, u64), RecoveryError>
where
F: FnMut(RecoveredOperation) -> std::result::Result<(), String>,
{
let any_overlay = segments.iter().any(|seg| {
WalReader::read_header(seg)
.map(|h| h.regime() == RankRegime::Overlay)
.unwrap_or(false)
});
if !any_overlay {
return rebuild_from_wal_segments(segments, apply_fn);
}
let mut all_records: Vec<(Lsn, WalRecord)> = Vec::new();
let mut regime_by_lsn: HashMap<Lsn, RankRegime> = HashMap::new();
'segments: for segment_path in segments {
let seg_regime = WalReader::read_header(segment_path)
.map(|h| h.regime())
.unwrap_or(RankRegime::Owned);
let reader = match WalReader::new(segment_path) {
Ok(r) => r,
Err(_) => continue, };
for result in reader.iter() {
let (lsn, record) = match result {
Ok(r) => r,
Err(e) => {
warn!(
"Corrupted WAL segment record during rebuild; stopping at durable prefix: {:?}",
e
);
break 'segments;
}
};
regime_by_lsn.insert(lsn, seg_regime);
all_records.push((lsn, record));
}
}
if let Some(min_lsn) = all_records.iter().map(|(lsn, _)| *lsn).min() {
if min_lsn > 1 {
return Err(RecoveryError::RecoveryFailed(format!(
"archive rebuild has a prefix gap: lowest WAL LSN is {min_lsn} (> 1) — the \
pre-{min_lsn} prefix was pruned while the base image is unavailable, so the \
archives cannot fully reconstruct the trie. Refusing a silent incomplete \
rebuild (RES-3)."
)));
}
}
let records_replayed = all_records.len() as u64;
let winners = reconcile_lww_with_regime(all_records, false, 0, |lsn| {
regime_by_lsn
.get(&lsn)
.copied()
.unwrap_or(RankRegime::Owned)
});
let mut terms_recovered: u64 = 0;
for op in winners {
if let Err(error) = apply_fn(op) {
warn!(
"Recovered operation failed during rebuild; stopping at durable prefix: {}",
error
);
break;
}
terms_recovered += 1;
}
Ok((records_replayed, terms_recovered))
}
#[cfg(test)]
mod tests {
use super::super::wal::{WalRecord, WalWriter};
use super::*;
use tempfile::tempdir;
#[test]
fn reconcile_regime_aware_drops_overlay_orphans_keeps_owned_and_ranked() {
let insert = |term: &[u8]| WalRecord::Insert {
term: term.to_vec(),
value: None,
};
let commit_rank = |data_lsn: Lsn, term: &[u8], generation: u64| WalRecord::CommitRank {
data_lsn,
term: term.to_vec(),
generation,
};
let inserted = |ops: &[RecoveredOperation]| -> Vec<Vec<u8>> {
ops.iter()
.filter_map(|op| match op {
RecoveredOperation::Insert { term, .. } => Some(term.clone()),
_ => None,
})
.collect()
};
let winners = reconcile_lww_with_regime(vec![(1u64, insert(b"orphan"))], false, 0, |_| {
RankRegime::Overlay
});
assert!(
inserted(&winners).is_empty(),
"Overlay must drop an unranked orphan, got {:?}",
inserted(&winners)
);
let winners = reconcile_lww_with_regime(vec![(1u64, insert(b"kept"))], false, 0, |_| {
RankRegime::Owned
});
assert_eq!(
inserted(&winners),
vec![b"kept".to_vec()],
"Owned must keep an unranked record (in-order replay)"
);
let winners = reconcile_lww_with_regime(
vec![
(1u64, insert(b"acked")),
(2u64, commit_rank(1, b"acked", 7)),
],
false,
0,
|_| RankRegime::Overlay,
);
assert_eq!(
inserted(&winners),
vec![b"acked".to_vec()],
"Overlay must keep a ranked (acked) record"
);
let mixed = vec![
(1u64, insert(b"owned_unranked")), (2u64, insert(b"overlay_orphan")), (3u64, insert(b"overlay_acked")), (4u64, commit_rank(3, b"overlay_acked", 9)),
];
let regime_of = |lsn: Lsn| {
if lsn <= 1 {
RankRegime::Owned
} else {
RankRegime::Overlay
}
};
let mut surviving = inserted(&reconcile_lww_with_regime(mixed, false, 0, regime_of));
surviving.sort();
assert_eq!(
surviving,
vec![b"overlay_acked".to_vec(), b"owned_unranked".to_vec()],
"mixed archive must drop only the Overlay orphan"
);
}
#[test]
fn res3_rebuild_fails_loud_on_pruned_prefix_gap() {
use super::super::wal::{WalConfig, WalRecord, WalWriter};
use std::path::Path;
let dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("target/test-tmp/res3_gap");
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).expect("scratch dir");
let wal_path = dir.join("wal.log");
let cfg = WalConfig::with_archive_dir(dir.join("archive"));
let writer = WalWriter::create(&wal_path).expect("create wal");
writer
.set_overlay_regime()
.expect("stamp Overlay on empty WAL");
for t in ["a", "b", "c"] {
writer
.append(WalRecord::Insert {
term: t.as_bytes().to_vec(),
value: None,
})
.expect("append");
}
writer.sync().expect("sync");
let seg1 = writer.rotate_to_archive(&cfg).expect("rotate1"); for t in ["d", "e", "f"] {
writer
.append(WalRecord::Insert {
term: t.as_bytes().to_vec(),
value: None,
})
.expect("append");
}
writer.sync().expect("sync");
let seg2 = writer.rotate_to_archive(&cfg).expect("rotate2");
std::fs::remove_file(&seg1).expect("prune seg1");
let result = rebuild_from_wal_segments_regime_aware(&[seg2], |_op| Ok(()));
assert!(
matches!(result, Err(RecoveryError::RecoveryFailed(_))),
"RES-3: a pruned-prefix Overlay archive must fail loud, got {result:?}"
);
}
#[test]
fn test_recovery_empty_wal() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("empty.wal");
let manager = RecoveryManager::new(&wal_path);
assert!(!manager.needs_recovery().expect("needs_recovery"));
let state = manager.recover().expect("recover");
assert_eq!(state.operation_count(), 0);
assert_eq!(state.next_lsn, 1);
}
#[test]
fn test_recovery_simple_operations() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("test.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
writer
.append(WalRecord::Insert {
term: b"hello".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::Insert {
term: b"world".to_vec(),
value: Some(b"value".to_vec()),
})
.expect("append insert");
writer
.append(WalRecord::Remove {
term: b"hello".to_vec(),
})
.expect("append remove");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
assert!(manager.needs_recovery().expect("needs_recovery"));
let state = manager.recover().expect("recover");
assert_eq!(state.operation_count(), 3);
let ops: Vec<_> = state.operations().collect();
match &ops[0] {
RecoveredOperation::Insert { term, value, .. } => {
assert_eq!(term, b"hello");
assert!(value.is_none());
}
_ => panic!("Expected insert"),
}
match &ops[1] {
RecoveredOperation::Insert { term, value, .. } => {
assert_eq!(term, b"world");
assert_eq!(value.as_deref(), Some(b"value".as_slice()));
}
_ => panic!("Expected insert"),
}
match &ops[2] {
RecoveredOperation::Remove { term, .. } => {
assert_eq!(term, b"hello");
}
_ => panic!("Expected remove"),
}
}
#[test]
fn test_recovery_with_transactions() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("tx.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
writer
.append(WalRecord::BeginTx { tx_id: 1 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"committed".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::CommitTx { tx_id: 1 })
.expect("append commit");
writer
.append(WalRecord::BeginTx { tx_id: 2 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"aborted".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::AbortTx { tx_id: 2 })
.expect("append abort");
writer
.append(WalRecord::BeginTx { tx_id: 3 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"incomplete".to_vec(),
value: None,
})
.expect("append insert");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
let state = manager.recover().expect("recover");
assert_eq!(state.operation_count(), 1);
let ops: Vec<_> = state.into_operations();
match &ops[0] {
RecoveredOperation::Insert { term, .. } => {
assert_eq!(term, b"committed");
}
_ => panic!("Expected insert"),
}
}
#[test]
fn test_recovery_stats() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("stats.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
for i in 0..3 {
writer
.append(WalRecord::Insert {
term: format!("term{}", i).into_bytes(),
value: None,
})
.expect("append");
}
writer
.append(WalRecord::BeginTx { tx_id: 1 })
.expect("append");
writer
.append(WalRecord::Insert {
term: b"tx1".to_vec(),
value: None,
})
.expect("append");
writer
.append(WalRecord::Remove {
term: b"tx1".to_vec(),
})
.expect("append");
writer
.append(WalRecord::CommitTx { tx_id: 1 })
.expect("append");
writer.checkpoint(8).expect("checkpoint");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
let state = manager.recover().expect("recover");
assert_eq!(state.operation_count(), 5);
assert_eq!(state.stats.committed_transactions, 1);
assert!(state.stats.checkpoint_lsn.is_some());
}
#[test]
fn test_incremental_recovery() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("incremental.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
for i in 0..10 {
writer
.append(WalRecord::Insert {
term: format!("term{}", i).into_bytes(),
value: None,
})
.expect("append");
}
writer.sync().expect("sync");
}
let mut recovery = IncrementalRecovery::new(&wal_path).expect("create recovery");
let mut total = 0;
while let Some(batch) = recovery.next_batch(3).expect("next_batch") {
total += batch.len();
}
assert_eq!(total, 10);
}
#[test]
fn test_recovery_with_callback() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("callback.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
for i in 0..5 {
writer
.append(WalRecord::Insert {
term: format!("term{}", i).into_bytes(),
value: None,
})
.expect("append");
}
writer.sync().expect("sync");
}
let mut collected = Vec::new();
let manager = RecoveryManager::new(&wal_path);
manager
.recover_with_callback(|op| {
collected.push(op);
Ok(())
})
.expect("recover_with_callback");
assert_eq!(collected.len(), 5);
}
#[test]
fn test_find_wal_pending_segments() {
let dir = tempdir().expect("create tempdir");
let pending_dir = dir.path().join("wal_pending");
std::fs::create_dir_all(&pending_dir).expect("create pending dir");
for i in 0..3 {
let segment_name = format!("wal_pending_{:012}.segment", i * 1000);
std::fs::write(pending_dir.join(segment_name), b"dummy").expect("write segment");
}
std::fs::write(pending_dir.join("other.txt"), b"other").expect("write other");
let segments = find_wal_pending_segments(&pending_dir);
assert_eq!(segments.len(), 3);
for i in 0..3 {
let expected = format!("wal_pending_{:012}.segment", i * 1000);
assert!(
segments[i].file_name().unwrap().to_str().unwrap() == expected,
"Expected {} but got {:?}",
expected,
segments[i]
);
}
}
#[test]
fn test_collect_all_wal_segments() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("test.wal");
let archive_dir = dir.path().join("wal_archive");
let pending_dir = dir.path().join("wal_pending");
std::fs::create_dir_all(&archive_dir).expect("create archive dir");
std::fs::create_dir_all(&pending_dir).expect("create pending dir");
let write_segment = |path: &std::path::Path, first_lsn: u64| {
let wal = WalWriter::create(path).expect("create segment WAL");
for _ in 1..first_lsn {
wal.allocate_lsn();
}
wal.append(WalRecord::Insert {
term: format!("term-{first_lsn}").into_bytes(),
value: None,
})
.expect("append segment record");
wal.sync().expect("sync segment WAL");
};
for i in 0..2 {
let segment_name = format!("wal_{:012}.segment", i * 1000);
write_segment(&archive_dir.join(segment_name), i + 1);
}
for i in 2..4 {
let segment_name = format!("wal_pending_{:012}.segment", i * 1000);
write_segment(&pending_dir.join(segment_name), i + 1);
}
let wal = WalWriter::create(&wal_path).expect("create WAL");
for _ in 1..5 {
wal.allocate_lsn();
}
wal.append(WalRecord::Insert {
term: b"test".to_vec(),
value: None,
})
.expect("append");
wal.sync().expect("sync");
drop(wal);
let segments = collect_all_wal_segments(
&wal_path,
std::path::Path::new("wal_archive"),
std::path::Path::new("wal_pending"),
);
assert_eq!(segments.len(), 5);
assert!(segments[0].to_string_lossy().contains("wal_archive"));
assert!(segments[1].to_string_lossy().contains("wal_archive"));
assert!(segments[2].to_string_lossy().contains("wal_pending"));
assert!(segments[3].to_string_lossy().contains("wal_pending"));
assert_eq!(segments[4], wal_path);
}
#[test]
fn test_get_segment_first_lsn() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("test.wal");
{
let wal = WalWriter::create(&wal_path).expect("create WAL");
for i in 0..5 {
wal.append(WalRecord::Insert {
term: format!("term{}", i).into_bytes(),
value: None,
})
.expect("append");
}
wal.sync().expect("sync");
}
let first_lsn = get_segment_first_lsn(&wal_path);
assert_eq!(first_lsn, Some(1), "First LSN should be 1");
let nonexistent = get_segment_first_lsn(std::path::Path::new("/nonexistent/path.wal"));
assert_eq!(nonexistent, None);
}
#[test]
fn test_sort_segments_by_lsn() {
let dir = tempdir().expect("create tempdir");
let mut segments = Vec::new();
let wal_path_10 = dir.path().join("wal_10.wal");
{
let wal = WalWriter::create(&wal_path_10).expect("create WAL");
for _ in 0..9 {
wal.allocate_lsn();
}
wal.append(WalRecord::Insert {
term: b"at_10".to_vec(),
value: None,
})
.expect("append");
wal.sync().expect("sync");
}
segments.push(wal_path_10);
let wal_path_1 = dir.path().join("wal_1.wal");
{
let wal = WalWriter::create(&wal_path_1).expect("create WAL");
wal.append(WalRecord::Insert {
term: b"at_1".to_vec(),
value: None,
})
.expect("append");
wal.sync().expect("sync");
}
segments.push(wal_path_1.clone());
let wal_path_5 = dir.path().join("wal_5.wal");
{
let wal = WalWriter::create(&wal_path_5).expect("create WAL");
for _ in 0..4 {
wal.allocate_lsn();
}
wal.append(WalRecord::Insert {
term: b"at_5".to_vec(),
value: None,
})
.expect("append");
wal.sync().expect("sync");
}
segments.push(wal_path_5.clone());
sort_segments_by_lsn(&mut segments);
assert_eq!(
segments[0], wal_path_1,
"First should be segment with LSN 1"
);
assert_eq!(
segments[1], wal_path_5,
"Second should be segment with LSN 5"
);
}
#[test]
fn test_recovery_commit_unknown_tx() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("orphan_commit.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
writer
.append(WalRecord::Insert {
term: b"normal".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::CommitTx { tx_id: 99 })
.expect("append orphan commit");
writer
.append(WalRecord::Insert {
term: b"after_orphan".to_vec(),
value: None,
})
.expect("append insert");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
let state = manager.recover().expect("recover should succeed");
assert_eq!(state.operation_count(), 2);
let ops: Vec<_> = state.into_operations();
match &ops[0] {
RecoveredOperation::Insert { term, .. } => assert_eq!(term, b"normal"),
_ => panic!("Expected Insert"),
}
match &ops[1] {
RecoveredOperation::Insert { term, .. } => assert_eq!(term, b"after_orphan"),
_ => panic!("Expected Insert"),
}
}
#[test]
fn test_recovery_abort_unknown_tx() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("orphan_abort.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
writer
.append(WalRecord::Insert {
term: b"before".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::AbortTx { tx_id: 42 })
.expect("append orphan abort");
writer
.append(WalRecord::Insert {
term: b"after".to_vec(),
value: None,
})
.expect("append insert");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
let state = manager.recover().expect("recover should succeed");
assert_eq!(state.operation_count(), 2);
}
#[test]
fn test_recovery_commit_no_pending_ops() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("empty_tx.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
writer
.append(WalRecord::BeginTx { tx_id: 1 })
.expect("append begin");
writer
.append(WalRecord::CommitTx { tx_id: 1 })
.expect("append commit");
writer
.append(WalRecord::Insert {
term: b"non_tx".to_vec(),
value: None,
})
.expect("append insert");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
let state = manager.recover().expect("recover should succeed");
assert_eq!(state.operation_count(), 1);
assert_eq!(state.stats.committed_transactions, 1);
let ops: Vec<_> = state.into_operations();
match &ops[0] {
RecoveredOperation::Insert { term, .. } => assert_eq!(term, b"non_tx"),
_ => panic!("Expected Insert"),
}
}
#[test]
fn test_recovery_nested_transaction_ids() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("interleaved_tx.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
writer
.append(WalRecord::BeginTx { tx_id: 1 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"tx1_op1".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::BeginTx { tx_id: 2 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"tx2_op1".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::CommitTx { tx_id: 2 })
.expect("append commit");
writer
.append(WalRecord::Insert {
term: b"after_tx2_commit".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::CommitTx { tx_id: 1 })
.expect("append commit");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
let state = manager.recover().expect("recover should succeed");
assert_eq!(state.operation_count(), 3);
assert_eq!(state.stats.committed_transactions, 2);
}
#[test]
fn test_recovery_tx_mismatch_on_commit() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("tx_mismatch_commit.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
writer
.append(WalRecord::BeginTx { tx_id: 1 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"tx1_data".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::BeginTx { tx_id: 2 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"tx2_data".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::CommitTx { tx_id: 1 })
.expect("append commit");
writer
.append(WalRecord::AbortTx { tx_id: 2 })
.expect("append abort");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
let state = manager.recover().expect("recover should succeed");
assert_eq!(state.operation_count(), 1);
let ops: Vec<_> = state.into_operations();
match &ops[0] {
RecoveredOperation::Insert { term, .. } => assert_eq!(term, b"tx1_data"),
_ => panic!("Expected Insert"),
}
}
#[test]
fn test_recovery_tx_mismatch_on_abort() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("tx_mismatch_abort.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
writer
.append(WalRecord::BeginTx { tx_id: 1 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"tx1_data".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::BeginTx { tx_id: 2 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"tx2_data".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::AbortTx { tx_id: 1 })
.expect("append abort");
writer
.append(WalRecord::CommitTx { tx_id: 2 })
.expect("append commit");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
let state = manager.recover().expect("recover should succeed");
assert_eq!(state.operation_count(), 1);
let ops: Vec<_> = state.into_operations();
match &ops[0] {
RecoveredOperation::Insert { term, .. } => assert_eq!(term, b"tx2_data"),
_ => panic!("Expected Insert"),
}
}
#[test]
fn test_recovery_double_commit() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("double_commit.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
writer
.append(WalRecord::BeginTx { tx_id: 1 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"tx1_data".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::CommitTx { tx_id: 1 })
.expect("append first commit");
writer
.append(WalRecord::CommitTx { tx_id: 1 })
.expect("append second commit");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
let state = manager.recover().expect("recover should succeed");
assert_eq!(state.operation_count(), 1);
}
#[test]
fn test_recovery_double_abort() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("double_abort.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
writer
.append(WalRecord::BeginTx { tx_id: 1 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"tx1_data".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::AbortTx { tx_id: 1 })
.expect("append first abort");
writer
.append(WalRecord::AbortTx { tx_id: 1 })
.expect("append second abort");
writer
.append(WalRecord::Insert {
term: b"after_abort".to_vec(),
value: None,
})
.expect("append insert");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
let state = manager.recover().expect("recover should succeed");
assert_eq!(state.operation_count(), 1);
let ops: Vec<_> = state.into_operations();
match &ops[0] {
RecoveredOperation::Insert { term, .. } => assert_eq!(term, b"after_abort"),
_ => panic!("Expected Insert"),
}
}
#[test]
fn test_recovery_commit_then_abort_same_tx() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("commit_then_abort.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
writer
.append(WalRecord::BeginTx { tx_id: 1 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"tx1_data".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::CommitTx { tx_id: 1 })
.expect("append commit");
writer
.append(WalRecord::AbortTx { tx_id: 1 })
.expect("append abort");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
let state = manager.recover().expect("recover should succeed");
assert_eq!(state.operation_count(), 1);
let ops: Vec<_> = state.into_operations();
match &ops[0] {
RecoveredOperation::Insert { term, .. } => assert_eq!(term, b"tx1_data"),
_ => panic!("Expected Insert"),
}
}
#[test]
fn test_recovery_abort_then_commit_same_tx() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("abort_then_commit.wal");
{
let writer = WalWriter::create(&wal_path).expect("create writer");
writer
.append(WalRecord::BeginTx { tx_id: 1 })
.expect("append begin");
writer
.append(WalRecord::Insert {
term: b"tx1_data".to_vec(),
value: None,
})
.expect("append insert");
writer
.append(WalRecord::AbortTx { tx_id: 1 })
.expect("append abort");
writer
.append(WalRecord::CommitTx { tx_id: 1 })
.expect("append commit");
writer
.append(WalRecord::Insert {
term: b"after".to_vec(),
value: None,
})
.expect("append insert");
writer.sync().expect("sync");
}
let manager = RecoveryManager::new(&wal_path);
let state = manager.recover().expect("recover should succeed");
assert_eq!(state.operation_count(), 1);
let ops: Vec<_> = state.into_operations();
match &ops[0] {
RecoveredOperation::Insert { term, .. } => assert_eq!(term, b"after"),
_ => panic!("Expected Insert"),
}
}
#[test]
fn test_recovery_error_display() {
let io_err = RecoveryError::Io(io::Error::new(io::ErrorKind::Other, "test"));
assert!(format!("{}", io_err).contains("I/O error"));
let corrupted = RecoveryError::CorruptedWal("test corruption".into());
assert!(format!("{}", corrupted).contains("Corrupted WAL"));
assert!(format!("{}", corrupted).contains("test corruption"));
let no_checkpoint = RecoveryError::NoCheckpoint;
assert!(format!("{}", no_checkpoint).contains("No checkpoint"));
let invalid_cp = RecoveryError::InvalidCheckpoint {
lsn: 42,
reason: "bad data".into(),
};
let display = format!("{}", invalid_cp);
assert!(display.contains("42"));
assert!(display.contains("bad data"));
let tx_inconsistency = RecoveryError::TransactionInconsistency {
tx_id: 123,
reason: "missing begin".into(),
};
let display = format!("{}", tx_inconsistency);
assert!(display.contains("123"));
assert!(display.contains("missing begin"));
let failed = RecoveryError::RecoveryFailed("general failure".into());
assert!(format!("{}", failed).contains("general failure"));
use std::error::Error;
let io_err = RecoveryError::Io(io::Error::new(io::ErrorKind::Other, "test"));
assert!(io_err.source().is_some());
let corrupted = RecoveryError::CorruptedWal("test".into());
assert!(corrupted.source().is_none());
}
#[test]
fn test_recovery_from_wal_error() {
use super::super::wal::WalError;
let io_err: RecoveryError =
WalError::Io(io::Error::new(io::ErrorKind::Other, "test")).into();
assert!(matches!(io_err, RecoveryError::Io(_)));
let corrupted: RecoveryError = WalError::CorruptedRecord("bad record".into()).into();
assert!(matches!(corrupted, RecoveryError::CorruptedWal(_)));
let eof: RecoveryError = WalError::UnexpectedEof.into();
assert!(matches!(eof, RecoveryError::CorruptedWal(_)));
let invalid_type: RecoveryError = WalError::InvalidRecordType(99).into();
assert!(matches!(invalid_type, RecoveryError::CorruptedWal(_)));
let already_exists: RecoveryError = WalError::AlreadyExists.into();
assert!(matches!(already_exists, RecoveryError::RecoveryFailed(_)));
let not_found: RecoveryError = WalError::NotFound.into();
assert!(matches!(not_found, RecoveryError::NoCheckpoint));
let parent_not_found: RecoveryError =
WalError::ParentNotFound(PathBuf::from("/test")).into();
assert!(matches!(parent_not_found, RecoveryError::RecoveryFailed(_)));
}
#[test]
fn test_detect_corruption_missing_file() {
let nonexistent = Path::new("/nonexistent/path/to/file.art");
let result = detect_corruption(nonexistent, false).expect("should succeed");
assert!(
result.is_none(),
"Missing file should not report corruption"
);
}
#[test]
fn test_detect_corruption_truncated_file() {
let dir = tempdir().expect("create tempdir");
let path = dir.path().join("truncated.art");
std::fs::write(&path, &[0u8; 32]).expect("write truncated file");
let result = detect_corruption(&path, false).expect("should succeed");
assert!(result.is_some(), "Truncated file should report corruption");
match result.unwrap() {
CorruptionType::Truncated { expected, actual } => {
assert_eq!(expected, 64);
assert_eq!(actual, 32);
}
other => panic!("Expected Truncated, got {:?}", other),
}
}
#[test]
fn test_detect_corruption_invalid_magic() {
let dir = tempdir().expect("create tempdir");
let path = dir.path().join("bad_magic.art");
let mut data = vec![0u8; 128];
data[0..4].copy_from_slice(b"XXXX"); std::fs::write(&path, &data).expect("write file with bad magic");
let result = detect_corruption(&path, false).expect("should succeed");
assert!(result.is_some(), "Invalid magic should report corruption");
match result.unwrap() {
CorruptionType::InvalidHeader(msg) => {
assert!(
msg.contains("Invalid magic"),
"Message should mention magic: {}",
msg
);
}
other => panic!("Expected InvalidHeader, got {:?}", other),
}
}
#[test]
fn test_detect_corruption_valid_disk_manager_magic() {
let dir = tempdir().expect("create tempdir");
let path = dir.path().join("valid_disk_magic.art");
let mut data = vec![0u8; 128];
const DISK_MANAGER_MAGIC: u64 = 0x5041_5254_0001_0000;
data[0..8].copy_from_slice(&DISK_MANAGER_MAGIC.to_le_bytes());
std::fs::write(&path, &data).expect("write file with valid magic");
let result = detect_corruption(&path, false).expect("should succeed");
assert!(
result.is_none(),
"Valid DiskManager magic should not report corruption"
);
}
#[test]
fn test_detect_corruption_valid_part_magic() {
let dir = tempdir().expect("create tempdir");
let path = dir.path().join("valid_part_magic.art");
let mut data = vec![0u8; 128];
data[0..4].copy_from_slice(b"PART");
data[4] = 1; std::fs::write(&path, &data).expect("write file with PART magic");
let result = detect_corruption(&path, false).expect("should succeed");
assert!(
result.is_none(),
"Valid PART magic should not report corruption"
);
}
#[test]
fn test_detect_corruption_invalid_version() {
let dir = tempdir().expect("create tempdir");
let path = dir.path().join("bad_version.art");
let mut data = vec![0u8; 128];
data[0..4].copy_from_slice(b"PART");
data[4] = 0; std::fs::write(&path, &data).expect("write file with invalid version");
let result = detect_corruption(&path, false).expect("should succeed");
assert!(result.is_some(), "Invalid version should report corruption");
match result.unwrap() {
CorruptionType::InvalidHeader(msg) => {
assert!(
msg.contains("version"),
"Message should mention version: {}",
msg
);
}
other => panic!("Expected InvalidHeader for version, got {:?}", other),
}
}
#[test]
fn test_detect_corruption_version_too_high() {
let dir = tempdir().expect("create tempdir");
let path = dir.path().join("future_version.art");
let mut data = vec![0u8; 128];
data[0..4].copy_from_slice(b"PART");
data[4] = 3; std::fs::write(&path, &data).expect("write file with future version");
let result = detect_corruption(&path, false).expect("should succeed");
assert!(
result.is_some(),
"Unsupported version should report corruption"
);
match result.unwrap() {
CorruptionType::InvalidHeader(msg) => {
assert!(
msg.contains("version"),
"Message should mention version: {}",
msg
);
}
other => panic!("Expected InvalidHeader for version, got {:?}", other),
}
}
#[test]
fn test_detect_corruption_v2_header_checksum_mismatch() {
let dir = tempdir().expect("create tempdir");
let path = dir.path().join("bad_checksum.art");
let mut data = vec![0u8; 128];
data[0..4].copy_from_slice(b"PART");
data[4] = 2; data[32..36].copy_from_slice(&0xDEADBEEFu32.to_le_bytes());
std::fs::write(&path, &data).expect("write file with bad checksum");
let result = detect_corruption(&path, false).expect("should succeed");
assert!(
result.is_some(),
"Checksum mismatch should report corruption"
);
match result.unwrap() {
CorruptionType::InvalidHeader(msg) => {
assert!(
msg.contains("checksum"),
"Message should mention checksum: {}",
msg
);
}
other => panic!("Expected InvalidHeader for checksum, got {:?}", other),
}
}
#[test]
fn test_detect_corruption_with_arena_check_small_file() {
let dir = tempdir().expect("create tempdir");
let path = dir.path().join("small_with_arena_check.art");
let mut data = vec![0u8; 64];
const DISK_MANAGER_MAGIC: u64 = 0x5041_5254_0001_0000;
data[0..8].copy_from_slice(&DISK_MANAGER_MAGIC.to_le_bytes());
std::fs::write(&path, &data).expect("write small file");
let result = detect_corruption(&path, true).expect("should succeed");
assert!(
result.is_none(),
"Small but valid file should pass: {:?}",
result
);
}
#[test]
fn test_recovery_mode_methods() {
assert!(RecoveryMode::Normal.is_normal());
assert!(!RecoveryMode::Normal.recovered());
assert!(!RecoveryMode::RebuildFromWal.is_normal());
assert!(RecoveryMode::RebuildFromWal.recovered());
assert!(!RecoveryMode::RepairInPlace.is_normal());
assert!(RecoveryMode::RepairInPlace.recovered());
assert!(!RecoveryMode::CreatedNew.is_normal());
assert!(RecoveryMode::CreatedNew.recovered());
}
#[test]
fn test_recovery_report_constructors() {
let normal = RecoveryReport::normal();
assert!(normal.mode.is_normal());
assert_eq!(normal.records_replayed, 0);
assert!(normal.corrupted_file.is_none());
assert!(normal.archive_segments_used.is_empty());
let created_new = RecoveryReport::created_new();
assert!(matches!(created_new.mode, RecoveryMode::CreatedNew));
assert_eq!(created_new.records_replayed, 0);
let rebuild = RecoveryReport::rebuild_from_wal(
PathBuf::from("/test/file.art"),
"test corruption".to_string(),
100,
50,
vec![PathBuf::from("/archive/seg1.segment")],
500,
);
assert!(matches!(rebuild.mode, RecoveryMode::RebuildFromWal));
assert_eq!(rebuild.records_replayed, 100);
assert_eq!(rebuild.terms_recovered, 50);
assert!(rebuild.corrupted_file.is_some());
assert!(rebuild.corruption_reason.is_some());
assert_eq!(rebuild.archive_segments_used.len(), 1);
assert_eq!(rebuild.duration_ms, 500);
}
#[test]
fn test_corruption_type_display() {
let header = CorruptionType::InvalidHeader("bad magic".to_string());
assert!(format!("{}", header).contains("Invalid header"));
assert!(format!("{}", header).contains("bad magic"));
let arena = CorruptionType::ArenaChecksum {
arena_id: 5,
expected: 0x12345678,
found: 0xDEADBEEF,
};
let arena_str = format!("{}", arena);
assert!(arena_str.contains("Arena 5"));
assert!(arena_str.contains("12345678"));
assert!(arena_str.contains("deadbeef"));
let truncated = CorruptionType::Truncated {
expected: 1000,
actual: 500,
};
let trunc_str = format!("{}", truncated);
assert!(trunc_str.contains("truncated"));
assert!(trunc_str.contains("1000"));
assert!(trunc_str.contains("500"));
let root = CorruptionType::InvalidRootDescriptor("bad root".to_string());
assert!(format!("{}", root).contains("Invalid root descriptor"));
assert!(format!("{}", root).contains("bad root"));
let io = CorruptionType::IoError("read failed".to_string());
assert!(format!("{}", io).contains("I/O error"));
assert!(format!("{}", io).contains("read failed"));
}
#[test]
fn test_find_wal_segments_nonexistent_dir() {
let nonexistent = Path::new("/nonexistent/wal_archive");
let segments = find_wal_archive_segments(nonexistent);
assert!(
segments.is_empty(),
"Nonexistent dir should return empty vec"
);
let pending = find_wal_pending_segments(nonexistent);
assert!(
pending.is_empty(),
"Nonexistent pending dir should return empty vec"
);
}
#[test]
fn test_crc32_header_computation() {
let empty: &[u8] = &[];
let crc_empty = crc32_header(empty);
assert_eq!(crc_empty, 0x00000000, "CRC of empty input should be 0");
let test_data = b"test";
let crc_test = crc32_header(test_data);
assert_ne!(crc_test, 0, "CRC should not be zero for non-empty input");
let crc_test2 = crc32_header(test_data);
assert_eq!(crc_test, crc_test2, "Same input should produce same CRC");
let other_data = b"other";
let crc_other = crc32_header(other_data);
assert_ne!(
crc_test, crc_other,
"Different input should produce different CRC"
);
}
#[test]
fn test_rebuild_from_wal_segments_empty() {
let segments: Vec<PathBuf> = vec![];
let mut count = 0;
let result = rebuild_from_wal_segments(&segments, |_op| {
count += 1;
Ok(())
});
let (records, terms) = result.expect("should succeed");
assert_eq!(records, 0);
assert_eq!(terms, 0);
assert_eq!(count, 0);
}
#[test]
fn test_rebuild_from_wal_segments_with_records() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("rebuild.wal");
{
let wal = WalWriter::create(&wal_path).expect("create WAL");
wal.append(WalRecord::Insert {
term: b"insert1".to_vec(),
value: None,
})
.expect("append insert");
wal.append(WalRecord::Insert {
term: b"insert2".to_vec(),
value: Some(b"value".to_vec()),
})
.expect("append insert with value");
wal.append(WalRecord::Remove {
term: b"remove1".to_vec(),
})
.expect("append remove");
wal.append(WalRecord::Increment {
term: b"counter".to_vec(),
delta: 5,
result: 10,
})
.expect("append increment");
wal.append(WalRecord::Upsert {
term: b"upsert1".to_vec(),
value: b"new_value".to_vec(),
})
.expect("append upsert");
wal.append(WalRecord::CompareAndSwap {
term: b"cas1".to_vec(),
expected: None,
new_value: b"cas_value".to_vec(),
success: true,
})
.expect("append successful CAS");
wal.append(WalRecord::CompareAndSwap {
term: b"cas2".to_vec(),
expected: Some(b"wrong".to_vec()),
new_value: b"cas_value2".to_vec(),
success: false,
})
.expect("append failed CAS");
wal.append(WalRecord::BeginTx { tx_id: 1 })
.expect("append begin");
wal.append(WalRecord::CommitTx { tx_id: 1 })
.expect("append commit");
wal.checkpoint(9).expect("checkpoint");
wal.sync().expect("sync");
}
let segments = vec![wal_path];
let mut operations = Vec::new();
let result = rebuild_from_wal_segments(&segments, |op| {
operations.push(op);
Ok(())
});
let (records, terms) = result.expect("should succeed");
assert_eq!(records, 10, "Should process all records");
assert_eq!(terms, 6, "Should recover 6 terms");
assert_eq!(operations.len(), 6, "Should have 6 operations");
}
#[test]
fn test_rebuild_from_wal_segments_batch_records() {
let dir = tempdir().expect("create tempdir");
let wal_path = dir.path().join("batch_rebuild.wal");
{
let wal = WalWriter::create(&wal_path).expect("create WAL");
wal.append(WalRecord::BatchInsert {
entries: vec![
(b"batch1".to_vec(), None),
(b"batch2".to_vec(), Some(b"val".to_vec())),
(b"batch3".to_vec(), None),
],
})
.expect("append batch insert");
wal.append(WalRecord::BatchIncrement {
entries: vec![(b"counter1".to_vec(), 1), (b"counter2".to_vec(), 2)],
})
.expect("append batch increment");
wal.sync().expect("sync");
}
let segments = vec![wal_path];
let mut operations = Vec::new();
let result = rebuild_from_wal_segments(&segments, |op| {
operations.push(op);
Ok(())
});
let (records, terms) = result.expect("should succeed");
assert_eq!(records, 2, "Should process 2 batch records");
assert_eq!(terms, 5, "Should recover 5 terms from batches");
assert_eq!(operations.len(), 5, "Should have 5 expanded operations");
}
}
#[cfg(test)]
mod reconcile_lww_tests {
use super::*;
fn ins(term: &[u8]) -> WalRecord {
WalRecord::Insert {
term: term.to_vec(),
value: None,
}
}
fn rem(term: &[u8]) -> WalRecord {
WalRecord::Remove {
term: term.to_vec(),
}
}
fn rank(data_lsn: Lsn, term: &[u8], generation: u64) -> WalRecord {
WalRecord::CommitRank {
data_lsn,
term: term.to_vec(),
generation,
}
}
fn project(ops: &[RecoveredOperation]) -> Vec<(&'static str, Vec<u8>)> {
ops.iter()
.map(|op| match op {
RecoveredOperation::Insert { term, .. } => ("ins", term.clone()),
RecoveredOperation::Remove { term, .. } => ("rem", term.clone()),
RecoveredOperation::Increment { term, .. } => ("inc", term.clone()),
RecoveredOperation::Upsert { term, .. } => ("ups", term.clone()),
RecoveredOperation::CompareAndSwap { term, .. } => ("cas", term.clone()),
})
.collect()
}
fn last_effect<'a>(ops: &'a [RecoveredOperation], term: &[u8]) -> Option<&'a str> {
ops.iter().rev().find_map(|op| match op {
RecoveredOperation::Insert { term: t, .. } if t == term => Some("ins"),
RecoveredOperation::Remove { term: t, .. } if t == term => Some("rem"),
RecoveredOperation::Increment { term: t, .. } if t == term => Some("inc"),
RecoveredOperation::Upsert { term: t, .. } if t == term => Some("ups"),
RecoveredOperation::CompareAndSwap { term: t, .. } if t == term => Some("cas"),
_ => None,
})
}
#[test]
fn rankless_wal_applies_in_lsn_order() {
let recovered = vec![
(10, ins(b"a")),
(20, ins(b"b")),
(30, rem(b"a")), (40, ins(b"a")), ];
let winners = reconcile_lww(
recovered,
false,
0,
crate::persistent_artrie_core::wal::RankRegime::Owned,
);
assert_eq!(
project(&winners),
vec![
("ins", b"a".to_vec()),
("ins", b"b".to_vec()),
("rem", b"a".to_vec()),
("ins", b"a".to_vec()),
]
);
assert_eq!(last_effect(&winners, b"a"), Some("ins")); assert_eq!(last_effect(&winners, b"b"), Some("ins"));
}
#[test]
fn overlay_drops_unranked_orphan_no_resurrection() {
let recovered = vec![
(10, ins(b"t")), (12, rem(b"t")), (13, rank(12, b"t", 1)), ];
let overlay = reconcile_lww(
recovered.clone(),
false,
0,
crate::persistent_artrie_core::wal::RankRegime::Overlay,
);
assert_eq!(
last_effect(&overlay, b"t"),
Some("rem"),
"Overlay must DROP the unranked orphan ⇒ the term stays removed"
);
let owned = reconcile_lww(
recovered,
false,
0,
crate::persistent_artrie_core::wal::RankRegime::Owned,
);
assert_eq!(
last_effect(&owned, b"t"),
Some("ins"),
"Owned KEEPS the orphan @lsn ⇒ it out-sorts the ranked remove ⇒ resurrection (control)"
);
}
#[test]
fn commit_rank_reorders_against_lsn_s019() {
let recovered = vec![
(352, ins(b"s019")),
(356, rem(b"s019")),
(357, rank(356, b"s019", 2)),
(358, rank(352, b"s019", 5)),
];
let winners = reconcile_lww(
recovered,
false,
0,
crate::persistent_artrie_core::wal::RankRegime::Owned,
);
assert_eq!(
project(&winners),
vec![("rem", b"s019".to_vec()), ("ins", b"s019".to_vec())]
);
assert_eq!(
last_effect(&winners, b"s019"),
Some("ins"),
"the Insert (gen 5) is applied last ⇒ final present, NOT lost"
);
let rankless = vec![(352, ins(b"s019")), (356, rem(b"s019"))];
assert_eq!(
last_effect(
&reconcile_lww(
rankless,
false,
0,
crate::persistent_artrie_core::wal::RankRegime::Owned
),
b"s019"
),
Some("rem"),
"rank-less LSN order applies the Remove last (the s019 loss)"
);
}
#[test]
fn commit_rank_reorders_against_lsn_resurrection_polarity() {
let recovered = vec![
(352, rem(b"s019")),
(356, ins(b"s019")),
(357, rank(356, b"s019", 3)),
(358, rank(352, b"s019", 9)),
];
let winners = reconcile_lww(
recovered,
false,
0,
crate::persistent_artrie_core::wal::RankRegime::Owned,
);
assert_eq!(
project(&winners),
vec![("ins", b"s019".to_vec()), ("rem", b"s019".to_vec())]
);
assert_eq!(
last_effect(&winners, b"s019"),
Some("rem"),
"the Remove (gen 9) is applied last ⇒ final absent, no resurrection"
);
}
#[test]
fn generation_ties_broken_by_lsn() {
let recovered = vec![
(10, rem(b"x")),
(20, ins(b"x")),
(30, rank(10, b"x", 7)),
(31, rank(20, b"x", 7)), ];
assert_eq!(
last_effect(
&reconcile_lww(
recovered,
false,
0,
crate::persistent_artrie_core::wal::RankRegime::Owned
),
b"x"
),
Some("ins")
);
}
#[test]
fn checkpoint_lsn_skips_data_records_at_or_below_watermark() {
let recovered = vec![
(5, ins(b"old")), (15, ins(b"new")), ];
let winners = reconcile_lww(
recovered.clone(),
true,
10,
crate::persistent_artrie_core::wal::RankRegime::Owned,
);
assert_eq!(project(&winners), vec![("ins", b"new".to_vec())]);
let winners_no_disk = reconcile_lww(
recovered,
false,
10,
crate::persistent_artrie_core::wal::RankRegime::Owned,
);
assert_eq!(
project(&winners_no_disk),
vec![("ins", b"old".to_vec()), ("ins", b"new".to_vec())]
);
}
#[test]
fn batch_increments_same_key_are_all_kept() {
let bi = |term: &[u8], delta: i64| WalRecord::BatchIncrement {
entries: vec![(term.to_vec(), delta)],
};
let recovered = vec![
(10, bi(b"ctr", 1)),
(20, bi(b"ctr", 1)),
(30, bi(b"ctr", 1)),
];
let winners = reconcile_lww(
recovered,
false,
0,
crate::persistent_artrie_core::wal::RankRegime::Owned,
);
assert_eq!(
winners.len(),
3,
"all three accumulating increments must survive (sum = 3, not 1)"
);
for op in &winners {
assert!(matches!(
op,
RecoveredOperation::Increment {
result: None,
delta: 1,
..
}
));
}
}
#[test]
fn d6_increment_outcome_distinguishes_absolute_zero_from_delta() {
let abs = recovered_operations_from_record(
7,
WalRecord::Increment {
term: b"k".to_vec(),
delta: -5,
result: 0,
},
);
assert!(
matches!(
abs.as_slice(),
[RecoveredOperation::Increment {
result: Some(0),
delta: -5,
..
}]
),
"absolute-set-to-0 must recover as Some(0) (set), NOT None (delta-accumulate)"
);
let dlt = recovered_operations_from_record(
8,
WalRecord::BatchIncrement {
entries: vec![(b"k".to_vec(), 3)],
},
);
assert!(
matches!(
dlt.as_slice(),
[RecoveredOperation::Increment {
result: None,
delta: 3,
..
}]
),
"a BatchIncrement delta must recover as None (delta-accumulate)"
);
}
#[test]
fn distinct_invalid_utf8_terms_do_not_collide() {
let a = vec![0xFF, 0x01];
let b = vec![0xFF, 0x02];
let recovered = vec![(10, ins(&a)), (20, ins(&b))];
let winners = reconcile_lww(
recovered,
false,
0,
crate::persistent_artrie_core::wal::RankRegime::Owned,
);
let terms: std::collections::BTreeSet<Vec<u8>> =
winners.iter().map(|op| op.term().to_vec()).collect();
assert_eq!(terms.len(), 2, "two distinct raw-byte terms = two winners");
assert!(terms.contains(&a) && terms.contains(&b));
}
#[test]
fn commit_rank_record_is_membership_no_op() {
let recovered = vec![(10, rank(99, b"ghost", 3))];
assert!(
reconcile_lww(
recovered,
false,
0,
crate::persistent_artrie_core::wal::RankRegime::Owned
)
.is_empty(),
"a lone CommitRank produces no operation"
);
}
}