use std::collections::{HashMap, HashSet};
use fsqlite_types::ecs::{
BloomFilter, ManifestSegment, ObjectId, SymbolRecord, reconstruct_systematic_happy_path,
source_symbol_count,
};
use fsqlite_types::{CommitMarker, CommitSeq, PageNumber};
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CapsuleDecodeOutcome {
Systematic,
Repaired { repair_symbols_used: u32 },
Failed { reason: String },
}
pub fn decode_capsule_symbol_records<F>(
object_id: ObjectId,
records: &[SymbolRecord],
mut fallback_decode: F,
) -> CapsuleDecodeOutcome
where
F: FnMut(&[SymbolRecord]) -> std::result::Result<Vec<u8>, String>,
{
let k_required = records
.first()
.and_then(|record| source_symbol_count(record.oti).ok())
.unwrap_or(0);
let k_required_u32 = u32::try_from(k_required).ok();
match reconstruct_systematic_happy_path(records) {
Ok(_) => {
debug!(
object_id = %object_id,
systematic_ok = true,
decode_invoked = false,
symbols_available = records.len(),
k_required,
"capsule read-path selection"
);
CapsuleDecodeOutcome::Systematic
}
Err(reason) => {
debug!(
object_id = %object_id,
systematic_ok = false,
decode_invoked = true,
symbols_available = records.len(),
k_required,
reason = %reason,
"capsule read-path selection"
);
info!(
object_id = %object_id,
symbols_available = records.len(),
k_required,
reason = %reason,
"systematic run unavailable; invoking decode fallback"
);
match fallback_decode(records) {
Ok(_) => {
let repair_symbols_used = k_required_u32.map_or(0_u32, |required| {
let count = records
.iter()
.filter(|record| record.esi >= required)
.count();
u32::try_from(count).unwrap_or(u32::MAX)
});
CapsuleDecodeOutcome::Repaired {
repair_symbols_used,
}
}
Err(fallback_err) => CapsuleDecodeOutcome::Failed {
reason: format!("{reason}; fallback decode failed: {fallback_err}"),
},
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DurabilityViolation {
pub commit_seq: CommitSeq,
pub capsule_object_id: ObjectId,
pub reason: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CheckpointRef {
pub commit_seq: CommitSeq,
pub manifest_object_id: ObjectId,
}
#[derive(Debug, Clone)]
pub struct RecoverySummary {
pub ecs_epoch: u64,
pub commit_seq_recovered: CommitSeq,
pub markers_replayed: u64,
pub capsules_repaired: u32,
pub violations: Vec<DurabilityViolation>,
pub duration_ms: u64,
}
#[derive(Debug, Clone)]
pub struct RootManifest {
pub ecs_epoch: u64,
pub latest_checkpoint: Option<CheckpointRef>,
pub manifest: ManifestSegment,
}
#[derive(Debug)]
pub struct NativeRecovery {
root_manifest: Option<RootManifest>,
replayed_markers: Vec<CommitMarker>,
decode_outcomes: Vec<(CommitSeq, CapsuleDecodeOutcome)>,
recovered_tip: CommitSeq,
violations: Vec<DurabilityViolation>,
}
impl NativeRecovery {
#[must_use]
pub fn new() -> Self {
Self {
root_manifest: None,
replayed_markers: Vec::new(),
decode_outcomes: Vec::new(),
recovered_tip: CommitSeq::ZERO,
violations: Vec::new(),
}
}
pub fn load_root_manifest(&mut self, manifest: RootManifest) {
info!(
ecs_epoch = manifest.ecs_epoch,
has_checkpoint = manifest.latest_checkpoint.is_some(),
"recovery step 1: loaded RootManifest"
);
self.root_manifest = Some(manifest);
}
#[must_use]
pub fn locate_checkpoint(&self) -> CommitSeq {
let seq = self
.root_manifest
.as_ref()
.and_then(|rm| rm.latest_checkpoint.as_ref())
.map_or(CommitSeq::ZERO, |cp| cp.commit_seq);
debug!(
checkpoint_seq = seq.get(),
"recovery step 2: checkpoint located"
);
seq
}
pub fn replay_markers<F>(&mut self, markers: &[CommitMarker], mut decode_capsule: F)
where
F: FnMut(ObjectId) -> CapsuleDecodeOutcome,
{
info!(
marker_count = markers.len(),
"recovery step 3-4: scanning marker stream"
);
for marker in markers {
let outcome = decode_capsule(marker.capsule_object_id);
match &outcome {
CapsuleDecodeOutcome::Systematic => {
debug!(
commit_seq = marker.commit_seq.get(),
"capsule decoded via systematic fast path"
);
}
CapsuleDecodeOutcome::Repaired {
repair_symbols_used,
} => {
info!(
commit_seq = marker.commit_seq.get(),
repair_symbols_used, "capsule decoded via RaptorQ repair"
);
}
CapsuleDecodeOutcome::Failed { reason } => {
error!(
commit_seq = marker.commit_seq.get(),
reason = reason.as_str(),
"DURABILITY CONTRACT VIOLATED: capsule undecodable — unrecoverable corruption"
);
self.violations.push(DurabilityViolation {
commit_seq: marker.commit_seq,
capsule_object_id: marker.capsule_object_id,
reason: reason.clone(),
});
}
}
self.decode_outcomes.push((marker.commit_seq, outcome));
self.replayed_markers.push(marker.clone());
self.recovered_tip = marker.commit_seq;
}
}
#[must_use]
pub fn finalize(self, duration_ms: u64) -> RecoverySummary {
let capsules_repaired = self
.decode_outcomes
.iter()
.filter(|(_, o)| matches!(o, CapsuleDecodeOutcome::Repaired { .. }))
.count();
let corrupted_frames = self.violations.len() + capsules_repaired;
let ecs_epoch = self.root_manifest.as_ref().map_or(0, |rm| rm.ecs_epoch);
let span = tracing::span!(
tracing::Level::INFO,
"wal_recovery",
frames_replayed = self.replayed_markers.len(),
corrupted_frames = corrupted_frames,
repaired_frames = capsules_repaired,
);
let _guard = span.enter();
info!(
ecs_epoch,
commit_seq_recovered = self.recovered_tip.get(),
markers_replayed = self.replayed_markers.len(),
capsules_repaired,
violations = self.violations.len(),
duration_ms,
"recovery complete"
);
#[allow(clippy::cast_possible_truncation)]
let frames_replayed_u64 = self.replayed_markers.len() as u64;
#[allow(clippy::cast_possible_truncation)]
let corrupted_u64 = corrupted_frames as u64;
#[allow(clippy::cast_possible_truncation)]
let repaired_u64 = capsules_repaired as u64;
crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.record_recovery(
frames_replayed_u64,
corrupted_u64,
repaired_u64,
);
#[allow(clippy::cast_possible_truncation)]
RecoverySummary {
ecs_epoch,
commit_seq_recovered: self.recovered_tip,
markers_replayed: frames_replayed_u64,
capsules_repaired: capsules_repaired as u32,
violations: self.violations,
duration_ms,
}
}
#[must_use]
pub const fn recovered_tip(&self) -> CommitSeq {
self.recovered_tip
}
#[must_use]
pub fn has_violations(&self) -> bool {
!self.violations.is_empty()
}
}
impl Default for NativeRecovery {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionPhase {
Mark,
Compact,
Publish,
Retire,
}
impl std::fmt::Display for CompactionPhase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Mark => write!(f, "mark"),
Self::Compact => write!(f, "compact"),
Self::Publish => write!(f, "publish"),
Self::Retire => write!(f, "retire"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct CompactionMdpState {
pub space_amp_bucket: u8,
pub read_regime: u8,
pub write_regime: u8,
pub compaction_debt: u8,
}
impl CompactionMdpState {
#[must_use]
pub fn bucket_for_space_amp(space_amp: f64) -> u8 {
if space_amp < 1.5 {
0
} else if space_amp < 2.0 {
1
} else if space_amp < 3.0 {
2
} else {
3
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionAction {
Defer,
CompactNow { rate_limit: CompactionRateLimit },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionRateLimit {
Low,
Medium,
High,
}
#[derive(Debug, Clone)]
pub struct EvidenceLedgerEntry {
pub timestamp_ns: u64,
pub state: CompactionMdpState,
pub action: CompactionAction,
pub reason: &'static str,
}
#[derive(Debug)]
pub struct CompactionPolicy {
lookup: HashMap<CompactionMdpState, CompactionAction>,
evidence: Vec<EvidenceLedgerEntry>,
}
impl CompactionPolicy {
#[must_use]
pub fn new() -> Self {
let mut lookup = HashMap::new();
for space_amp in 0..=3u8 {
for read_regime in 0..=2u8 {
for write_regime in 0..=2u8 {
for debt in 0..=2u8 {
let state = CompactionMdpState {
space_amp_bucket: space_amp,
read_regime,
write_regime,
compaction_debt: debt,
};
let action = Self::default_policy(space_amp, write_regime, debt);
lookup.insert(state, action);
}
}
}
}
Self {
lookup,
evidence: Vec::new(),
}
}
fn default_policy(space_amp: u8, write_regime: u8, debt: u8) -> CompactionAction {
if debt >= 2 {
return CompactionAction::CompactNow {
rate_limit: CompactionRateLimit::Medium,
};
}
if space_amp >= 2 {
let rate_limit = if write_regime >= 2 {
CompactionRateLimit::Low
} else {
CompactionRateLimit::High
};
return CompactionAction::CompactNow { rate_limit };
}
if space_amp >= 1 && debt >= 1 {
return CompactionAction::CompactNow {
rate_limit: CompactionRateLimit::Low,
};
}
CompactionAction::Defer
}
#[must_use]
pub fn recommend(&self, state: &CompactionMdpState) -> CompactionAction {
self.lookup
.get(state)
.copied()
.unwrap_or(CompactionAction::Defer)
}
pub fn record_decision(
&mut self,
timestamp_ns: u64,
state: CompactionMdpState,
action: CompactionAction,
reason: &'static str,
) {
self.evidence.push(EvidenceLedgerEntry {
timestamp_ns,
state,
action,
reason,
});
}
#[must_use]
pub fn evidence_ledger(&self) -> &[EvidenceLedgerEntry] {
&self.evidence
}
pub fn override_action(&mut self, state: CompactionMdpState, action: CompactionAction) {
self.lookup.insert(state, action);
}
}
impl Default for CompactionPolicy {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct SegmentRef {
pub segment_id: ObjectId,
pub object_ids: Vec<ObjectId>,
pub size_bytes: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ReaderLease {
pub lease_id: u64,
pub segment_ids: Vec<ObjectId>,
}
#[derive(Debug)]
#[allow(clippy::struct_excessive_bools)]
pub struct CompactionSaga {
phase: CompactionPhase,
live_set: HashSet<ObjectId>,
live_bloom: Option<BloomFilter>,
old_segments: Vec<SegmentRef>,
new_segments: Vec<SegmentRef>,
new_segments_synced: bool,
new_locator_synced: bool,
published: bool,
reader_leases: Vec<ReaderLease>,
retired_segments: Vec<ObjectId>,
cancelled: bool,
space_amp_before: f64,
space_amp_after: f64,
}
impl CompactionSaga {
#[must_use]
pub fn new(old_segments: Vec<SegmentRef>, space_amp_before: f64) -> Self {
info!(
old_segment_count = old_segments.len(),
space_amp_before, "compaction saga initiated"
);
Self {
phase: CompactionPhase::Mark,
live_set: HashSet::new(),
live_bloom: None,
old_segments,
new_segments: Vec::new(),
new_segments_synced: false,
new_locator_synced: false,
published: false,
reader_leases: Vec::new(),
retired_segments: Vec::new(),
cancelled: false,
space_amp_before,
space_amp_after: 0.0,
}
}
#[must_use]
pub const fn phase(&self) -> CompactionPhase {
self.phase
}
#[must_use]
pub const fn is_cancelled(&self) -> bool {
self.cancelled
}
#[must_use]
pub const fn is_published(&self) -> bool {
self.published
}
pub fn cancel(&mut self) -> CompactionCompensation {
self.cancelled = true;
if self.published {
warn!("compaction cancelled after publish — rollback required");
CompactionCompensation::RollbackRequired
} else {
info!(
phase = %self.phase,
"compaction cancelled before publish — temp segments discarded"
);
self.new_segments.clear();
CompactionCompensation::TempSegmentsDiscarded
}
}
pub fn mark(&mut self, reachable_ids: impl IntoIterator<Item = ObjectId>) {
assert_eq!(self.phase, CompactionPhase::Mark, "must be in Mark phase");
self.live_set.clear();
for id in reachable_ids {
self.live_set.insert(id);
}
#[allow(clippy::cast_possible_truncation)]
let count = self.live_set.len() as u32;
let mut bloom = BloomFilter::new(count.max(1), 0.001);
for id in &self.live_set {
let bytes = id.as_bytes();
let pseudo_page = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
if let Some(pn) = PageNumber::new(pseudo_page.max(1)) {
bloom.insert(pn);
}
}
self.live_bloom = Some(bloom);
debug!(
live_count = self.live_set.len(),
"compaction phase 1 (mark) complete"
);
self.phase = CompactionPhase::Compact;
}
#[must_use]
pub fn is_live(&self, id: &ObjectId) -> bool {
self.live_set.contains(id)
}
#[must_use]
pub fn live_count(&self) -> usize {
self.live_set.len()
}
pub fn compact(&mut self, new_segments: Vec<SegmentRef>) {
assert_eq!(
self.phase,
CompactionPhase::Compact,
"must be in Compact phase"
);
let new_total_size: u64 = new_segments.iter().map(|s| s.size_bytes).sum();
self.new_segments = new_segments;
if new_total_size > 0 {
self.space_amp_after = 1.0;
}
debug!(
new_segment_count = self.new_segments.len(),
new_total_size, "compaction phase 2 (compact) segments created"
);
}
pub fn mark_segments_synced(&mut self) {
self.new_segments_synced = true;
debug!("compaction phase 2: new segments fdatasynced");
}
pub fn mark_locator_synced(&mut self) {
self.new_locator_synced = true;
debug!("compaction phase 2: new locator fdatasynced");
self.phase = CompactionPhase::Publish;
}
pub fn publish(&mut self) -> bool {
assert_eq!(
self.phase,
CompactionPhase::Publish,
"must be in Publish phase"
);
if !self.new_segments_synced || !self.new_locator_synced {
warn!("cannot publish: segments or locator not yet synced");
return false;
}
self.published = true;
info!(
space_amp_before = self.space_amp_before,
space_amp_after = self.space_amp_after,
"compaction phase 3 (publish) complete — point of no return"
);
self.phase = CompactionPhase::Retire;
true
}
pub fn register_reader_leases(&mut self, leases: Vec<ReaderLease>) {
self.reader_leases = leases;
}
#[must_use]
pub fn retirable_segments(&self) -> Vec<ObjectId> {
if !self.published {
return Vec::new();
}
let leased_segments: HashSet<&ObjectId> = self
.reader_leases
.iter()
.flat_map(|lease| &lease.segment_ids)
.collect();
self.old_segments
.iter()
.filter(|seg| !leased_segments.contains(&seg.segment_id))
.map(|seg| seg.segment_id)
.collect()
}
pub fn retire(&mut self) -> Vec<ObjectId> {
assert_eq!(
self.phase,
CompactionPhase::Retire,
"must be in Retire phase"
);
assert!(self.published, "must publish before retiring");
let retirable = self.retirable_segments();
self.retired_segments.extend_from_slice(&retirable);
if retirable.is_empty() {
debug!("compaction phase 4 (retire): no segments retirable yet (leases active)");
} else {
info!(
retired_count = retirable.len(),
"compaction phase 4 (retire): old segments retired"
);
}
retirable
}
#[must_use]
pub fn summary(&self) -> CompactionSummary {
let dead_count = self
.old_segments
.iter()
.flat_map(|s| &s.object_ids)
.filter(|id| !self.live_set.contains(id))
.count();
CompactionSummary {
space_amp_before: self.space_amp_before,
space_amp_after: self.space_amp_after,
live_objects: self.live_set.len(),
dead_objects: dead_count,
old_segments: self.old_segments.len(),
new_segments: self.new_segments.len(),
retired_segments: self.retired_segments.len(),
published: self.published,
cancelled: self.cancelled,
}
}
}
#[derive(Debug, Clone)]
pub struct CompactionSummary {
pub space_amp_before: f64,
pub space_amp_after: f64,
pub live_objects: usize,
pub dead_objects: usize,
pub old_segments: usize,
pub new_segments: usize,
pub retired_segments: usize,
pub published: bool,
pub cancelled: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionCompensation {
TempSegmentsDiscarded,
RollbackRequired,
}
#[cfg(test)]
mod tests {
use super::*;
use fsqlite_types::Oti;
use fsqlite_types::ecs::SymbolRecordFlags;
fn make_oid(seed: u8) -> ObjectId {
ObjectId::from_bytes([seed; 16])
}
fn make_marker(seq: u64, capsule_seed: u8) -> CommitMarker {
CommitMarker::new(
CommitSeq::new(seq),
1_700_000_000_000_000_000 + seq * 1_000_000,
make_oid(capsule_seed),
make_oid(capsule_seed.wrapping_add(0x80)),
if seq > 1 {
Some(make_oid(capsule_seed.wrapping_sub(1)))
} else {
None
},
)
}
fn make_segment(id_seed: u8, object_seeds: &[u8], size: u64) -> SegmentRef {
SegmentRef {
segment_id: make_oid(id_seed),
object_ids: object_seeds.iter().map(|&s| make_oid(s)).collect(),
size_bytes: size,
}
}
fn make_capsule_symbol_records(
object_id: ObjectId,
source_symbols: u32,
symbol_size: u32,
repair_symbols: u32,
) -> (Vec<SymbolRecord>, Vec<u8>) {
let symbol_size_usize = usize::try_from(symbol_size).expect("symbol size fits usize");
let oti = Oti {
f: u64::from(source_symbols).saturating_mul(u64::from(symbol_size)),
al: 4,
t: symbol_size,
z: 1,
n: 1,
};
let mut records = Vec::new();
let mut expected = Vec::new();
for esi in 0..source_symbols {
let mut payload = Vec::with_capacity(symbol_size_usize);
let esi_low = u8::try_from(esi & 0xFF).expect("masked to u8");
for idx in 0..symbol_size_usize {
let idx_low = u8::try_from(idx & 0xFF).expect("masked to u8");
payload.push(esi_low.wrapping_mul(7) ^ idx_low);
}
expected.extend_from_slice(&payload);
let flags = if esi == 0 {
SymbolRecordFlags::SYSTEMATIC_RUN_START
} else {
SymbolRecordFlags::empty()
};
records.push(SymbolRecord::new(object_id, oti, esi, payload, flags));
}
for repair in 0..repair_symbols {
let esi = source_symbols.saturating_add(repair);
let payload = vec![0xA5; symbol_size_usize];
records.push(SymbolRecord::new(
object_id,
oti,
esi,
payload,
SymbolRecordFlags::empty(),
));
}
(records, expected)
}
#[test]
fn test_native_recovery_from_genesis() {
let mut recovery = NativeRecovery::new();
let manifest = RootManifest {
ecs_epoch: 1,
latest_checkpoint: None,
manifest: ManifestSegment::new(Vec::new()),
};
recovery.load_root_manifest(manifest);
assert_eq!(recovery.locate_checkpoint(), CommitSeq::ZERO);
let markers: Vec<_> = (1_u64..=5)
.map(|i| make_marker(i, u8::try_from(i).expect("marker id fits in u8")))
.collect();
recovery.replay_markers(&markers, |_| CapsuleDecodeOutcome::Systematic);
let summary = recovery.finalize(42);
assert_eq!(summary.ecs_epoch, 1);
assert_eq!(summary.commit_seq_recovered, CommitSeq::new(5));
assert_eq!(summary.markers_replayed, 5);
assert_eq!(summary.capsules_repaired, 0);
assert!(summary.violations.is_empty());
assert_eq!(summary.duration_ms, 42);
}
#[test]
fn test_native_recovery_from_checkpoint() {
let mut recovery = NativeRecovery::new();
let manifest = RootManifest {
ecs_epoch: 2,
latest_checkpoint: Some(CheckpointRef {
commit_seq: CommitSeq::new(100),
manifest_object_id: make_oid(0xCC),
}),
manifest: ManifestSegment::new(vec![(1, 100, make_oid(0xDD))]),
};
recovery.load_root_manifest(manifest);
assert_eq!(recovery.locate_checkpoint(), CommitSeq::new(100));
let markers: Vec<_> = (101_u64..=105)
.map(|i| make_marker(i, u8::try_from(i % 256).expect("i % 256 fits in u8")))
.collect();
recovery.replay_markers(&markers, |_| CapsuleDecodeOutcome::Systematic);
let summary = recovery.finalize(100);
assert_eq!(summary.commit_seq_recovered, CommitSeq::new(105));
assert_eq!(summary.markers_replayed, 5);
}
#[test]
fn test_native_recovery_repair() {
let mut recovery = NativeRecovery::new();
let manifest = RootManifest {
ecs_epoch: 1,
latest_checkpoint: None,
manifest: ManifestSegment::new(Vec::new()),
};
recovery.load_root_manifest(manifest);
let markers: Vec<_> = (1_u64..=3)
.map(|i| make_marker(i, u8::try_from(i).expect("marker id fits in u8")))
.collect();
recovery.replay_markers(&markers, |oid| {
if oid == make_oid(2) {
CapsuleDecodeOutcome::Repaired {
repair_symbols_used: 3,
}
} else {
CapsuleDecodeOutcome::Systematic
}
});
let summary = recovery.finalize(50);
assert_eq!(summary.capsules_repaired, 1);
assert!(summary.violations.is_empty());
}
#[test]
fn test_native_recovery_contract_violation() {
let mut recovery = NativeRecovery::new();
let manifest = RootManifest {
ecs_epoch: 1,
latest_checkpoint: None,
manifest: ManifestSegment::new(Vec::new()),
};
recovery.load_root_manifest(manifest);
let markers: Vec<_> = (1_u64..=3)
.map(|i| make_marker(i, u8::try_from(i).expect("marker id fits in u8")))
.collect();
recovery.replay_markers(&markers, |oid| {
if oid == make_oid(2) {
CapsuleDecodeOutcome::Failed {
reason: "insufficient symbols: 2 of 5 needed".to_owned(),
}
} else {
CapsuleDecodeOutcome::Systematic
}
});
assert!(recovery.has_violations());
let summary = recovery.finalize(50);
assert_eq!(summary.violations.len(), 1);
assert_eq!(summary.violations[0].commit_seq, CommitSeq::new(2));
assert_eq!(summary.violations[0].capsule_object_id, make_oid(2));
}
#[test]
fn test_happy_path_read_no_gf256() {
let capsule_id = make_oid(0x90);
let (records, _expected) = make_capsule_symbol_records(capsule_id, 50, 64, 5);
let fallback_invocations = std::cell::Cell::new(0_u32);
let outcome = decode_capsule_symbol_records(capsule_id, &records, |_| {
fallback_invocations.set(fallback_invocations.get().saturating_add(1));
Err("fallback should not run on happy path".to_owned())
});
assert!(matches!(outcome, CapsuleDecodeOutcome::Systematic));
assert_eq!(
fallback_invocations.get(),
0,
"GF(256) fallback decode must not run when systematic run is intact"
);
}
#[test]
fn test_fallback_on_missing_symbol() {
let capsule_id = make_oid(0x91);
let (mut records, _) = make_capsule_symbol_records(capsule_id, 50, 64, 5);
records.retain(|record| record.esi != 5);
let fallback_invocations = std::cell::Cell::new(0_u32);
let fallback_payload = vec![0xC1; 50 * 64];
let outcome = decode_capsule_symbol_records(capsule_id, &records, |_| {
fallback_invocations.set(fallback_invocations.get().saturating_add(1));
Ok(fallback_payload.clone())
});
assert_eq!(fallback_invocations.get(), 1);
assert!(matches!(
outcome,
CapsuleDecodeOutcome::Repaired {
repair_symbols_used: _
}
));
}
#[test]
fn test_fallback_on_corruption() {
let capsule_id = make_oid(0x92);
let (mut records, _) = make_capsule_symbol_records(capsule_id, 50, 64, 5);
let idx = records
.iter()
.position(|record| record.esi == 3)
.expect("ESI 3 present");
records[idx].symbol_data[0] ^= 0x11;
let fallback_invocations = std::cell::Cell::new(0_u32);
let fallback_payload = vec![0xD2; 50 * 64];
let outcome = decode_capsule_symbol_records(capsule_id, &records, |_| {
fallback_invocations.set(fallback_invocations.get().saturating_add(1));
Ok(fallback_payload.clone())
});
assert_eq!(fallback_invocations.get(), 1);
assert!(matches!(
outcome,
CapsuleDecodeOutcome::Repaired {
repair_symbols_used: _
}
));
}
#[test]
fn test_e2e_systematic_symbol_read_path_no_decode() {
let capsule_id = make_oid(0x93);
let (records, _expected) = make_capsule_symbol_records(capsule_id, 64, 512, 8);
let decode_invocations = std::cell::Cell::new(0_u32);
let intact = decode_capsule_symbol_records(capsule_id, &records, |_| {
decode_invocations.set(decode_invocations.get().saturating_add(1));
Err("fallback should not run for intact systematic run".to_owned())
});
assert!(matches!(intact, CapsuleDecodeOutcome::Systematic));
assert_eq!(decode_invocations.get(), 0);
let mut corrupted = records;
let corrupt_idx = corrupted
.iter()
.position(|record| record.esi == 7)
.expect("ESI 7 present");
corrupted[corrupt_idx].symbol_data[13] ^= 0xFF;
let repaired = decode_capsule_symbol_records(capsule_id, &corrupted, |_| {
decode_invocations.set(decode_invocations.get().saturating_add(1));
Ok(vec![0xAB; 64 * 512])
});
assert!(matches!(
repaired,
CapsuleDecodeOutcome::Repaired {
repair_symbols_used: _
}
));
assert_eq!(
decode_invocations.get(),
1,
"fallback decode should run exactly once after corruption"
);
}
#[test]
fn test_compaction_identifies_live() {
let old = vec![make_segment(0x01, &[0x10, 0x20, 0x30, 0x40, 0x50], 5000)];
let mut saga = CompactionSaga::new(old, 2.5);
let live = vec![make_oid(0x10), make_oid(0x30), make_oid(0x50)];
saga.mark(live);
assert_eq!(saga.live_count(), 3);
assert!(saga.is_live(&make_oid(0x10)));
assert!(!saga.is_live(&make_oid(0x20)));
assert!(saga.is_live(&make_oid(0x30)));
assert!(!saga.is_live(&make_oid(0x40)));
assert!(saga.is_live(&make_oid(0x50)));
}
#[test]
fn test_compaction_discards_dead() {
let old = vec![make_segment(0x01, &[0x10, 0x20, 0x30, 0x40, 0x50], 5000)];
let mut saga = CompactionSaga::new(old, 2.5);
saga.mark(vec![make_oid(0x10), make_oid(0x30), make_oid(0x50)]);
let new_seg = make_segment(0x02, &[0x10, 0x30, 0x50], 3000);
saga.compact(vec![new_seg]);
let summary = saga.summary();
assert_eq!(summary.live_objects, 3);
assert_eq!(summary.dead_objects, 2); }
#[test]
fn test_compaction_two_phase_publish() {
let old = vec![make_segment(0x01, &[0x10, 0x20], 2000)];
let mut saga = CompactionSaga::new(old, 2.0);
saga.mark(vec![make_oid(0x10)]);
saga.compact(vec![make_segment(0x02, &[0x10], 1000)]);
assert!(!saga.is_published());
saga.mark_segments_synced();
saga.mark_locator_synced();
assert!(saga.publish());
assert!(saga.is_published());
}
#[test]
fn test_compaction_crash_before_publish() {
let old = vec![make_segment(0x01, &[0x10, 0x20], 2000)];
let mut saga = CompactionSaga::new(old.clone(), 2.0);
saga.mark(vec![make_oid(0x10)]);
saga.compact(vec![make_segment(0x02, &[0x10], 1000)]);
let comp = saga.cancel();
assert_eq!(comp, CompactionCompensation::TempSegmentsDiscarded);
assert!(!saga.is_published());
assert_eq!(old.len(), 1);
}
#[test]
fn test_compaction_crash_after_publish() {
let old = vec![make_segment(0x01, &[0x10, 0x20], 2000)];
let mut saga = CompactionSaga::new(old, 2.0);
saga.mark(vec![make_oid(0x10)]);
saga.compact(vec![make_segment(0x02, &[0x10], 1000)]);
saga.mark_segments_synced();
saga.mark_locator_synced();
saga.publish();
let comp = saga.cancel();
assert_eq!(comp, CompactionCompensation::RollbackRequired);
}
#[test]
fn test_compaction_reader_leases() {
let old = vec![
make_segment(0x01, &[0x10], 1000),
make_segment(0x02, &[0x20], 1000),
];
let mut saga = CompactionSaga::new(old, 2.0);
saga.mark(vec![make_oid(0x10), make_oid(0x20)]);
saga.compact(vec![make_segment(0x03, &[0x10, 0x20], 2000)]);
saga.mark_segments_synced();
saga.mark_locator_synced();
saga.publish();
saga.register_reader_leases(vec![ReaderLease {
lease_id: 1,
segment_ids: vec![make_oid(0x01)],
}]);
let retirable = saga.retirable_segments();
assert_eq!(retirable.len(), 1);
assert_eq!(retirable[0], make_oid(0x02));
saga.register_reader_leases(Vec::new());
let retirable = saga.retirable_segments();
assert_eq!(retirable.len(), 2);
}
#[test]
fn test_compaction_space_reclaimed() {
let old = vec![make_segment(0x01, &[0x10, 0x20, 0x30, 0x40], 4000)];
let mut saga = CompactionSaga::new(old, 3.0);
saga.mark(vec![make_oid(0x10), make_oid(0x30)]);
saga.compact(vec![make_segment(0x02, &[0x10, 0x30], 2000)]);
let summary = saga.summary();
assert!(
summary.space_amp_after < 2.0,
"space_amp_after ({}) should be < 2.0",
summary.space_amp_after
);
}
#[test]
fn test_compaction_saga_compensation() {
let old = vec![make_segment(0x01, &[0x10], 1000)];
let mut saga = CompactionSaga::new(old.clone(), 2.0);
let comp = saga.cancel();
assert_eq!(comp, CompactionCompensation::TempSegmentsDiscarded);
let mut saga = CompactionSaga::new(old.clone(), 2.0);
saga.mark(vec![make_oid(0x10)]);
let comp = saga.cancel();
assert_eq!(comp, CompactionCompensation::TempSegmentsDiscarded);
let mut saga = CompactionSaga::new(old.clone(), 2.0);
saga.mark(vec![make_oid(0x10)]);
saga.compact(vec![make_segment(0x02, &[0x10], 500)]);
let comp = saga.cancel();
assert_eq!(comp, CompactionCompensation::TempSegmentsDiscarded);
let mut saga = CompactionSaga::new(old, 2.0);
saga.mark(vec![make_oid(0x10)]);
saga.compact(vec![make_segment(0x02, &[0x10], 500)]);
saga.mark_segments_synced();
saga.mark_locator_synced();
saga.publish();
let comp = saga.cancel();
assert_eq!(comp, CompactionCompensation::RollbackRequired);
}
#[test]
fn test_compaction_mdp_policy() {
let policy = CompactionPolicy::new();
let idle = CompactionMdpState {
space_amp_bucket: 0,
read_regime: 0,
write_regime: 0,
compaction_debt: 0,
};
assert_eq!(policy.recommend(&idle), CompactionAction::Defer);
let high_amp = CompactionMdpState {
space_amp_bucket: 2,
read_regime: 1,
write_regime: 1,
compaction_debt: 0,
};
assert_eq!(
policy.recommend(&high_amp),
CompactionAction::CompactNow {
rate_limit: CompactionRateLimit::High,
}
);
let heavy_write = CompactionMdpState {
space_amp_bucket: 3,
read_regime: 0,
write_regime: 2,
compaction_debt: 0,
};
assert_eq!(
policy.recommend(&heavy_write),
CompactionAction::CompactNow {
rate_limit: CompactionRateLimit::Low,
}
);
let high_debt = CompactionMdpState {
space_amp_bucket: 0,
read_regime: 0,
write_regime: 0,
compaction_debt: 2,
};
assert_eq!(
policy.recommend(&high_debt),
CompactionAction::CompactNow {
rate_limit: CompactionRateLimit::Medium,
}
);
assert_eq!(CompactionMdpState::bucket_for_space_amp(1.0), 0);
assert_eq!(CompactionMdpState::bucket_for_space_amp(1.7), 1);
assert_eq!(CompactionMdpState::bucket_for_space_amp(2.5), 2);
assert_eq!(CompactionMdpState::bucket_for_space_amp(5.0), 3);
}
#[test]
fn test_compaction_evidence_ledger() {
let mut policy = CompactionPolicy::new();
assert!(policy.evidence_ledger().is_empty());
let state = CompactionMdpState {
space_amp_bucket: 2,
read_regime: 1,
write_regime: 0,
compaction_debt: 0,
};
let action = policy.recommend(&state);
policy.record_decision(
1_700_000_000_000_000_000,
state,
action,
"space_amp exceeded threshold",
);
assert_eq!(policy.evidence_ledger().len(), 1);
assert_eq!(policy.evidence_ledger()[0].state.space_amp_bucket, 2);
assert_eq!(
policy.evidence_ledger()[0].reason,
"space_amp exceeded threshold"
);
policy.override_action(state, CompactionAction::Defer);
let new_action = policy.recommend(&state);
assert_eq!(new_action, CompactionAction::Defer);
policy.record_decision(
1_700_000_001_000_000_000,
state,
new_action,
"BOCPD regime shift: deferred during write burst",
);
assert_eq!(policy.evidence_ledger().len(), 2);
}
#[test]
fn test_compaction_phase_display() {
assert_eq!(format!("{}", CompactionPhase::Mark), "mark");
assert_eq!(format!("{}", CompactionPhase::Compact), "compact");
assert_eq!(format!("{}", CompactionPhase::Publish), "publish");
assert_eq!(format!("{}", CompactionPhase::Retire), "retire");
}
#[test]
fn test_recovery_has_violations_and_recovered_tip() {
let mut recovery = NativeRecovery::new();
assert!(!recovery.has_violations());
assert_eq!(recovery.recovered_tip(), CommitSeq::ZERO);
let manifest = RootManifest {
ecs_epoch: 1,
latest_checkpoint: None,
manifest: ManifestSegment::new(Vec::new()),
};
recovery.load_root_manifest(manifest);
let markers = vec![make_marker(1, 0x01), make_marker(2, 0x02)];
recovery.replay_markers(&markers, |oid| {
if oid == make_oid(0x02) {
CapsuleDecodeOutcome::Failed {
reason: "corrupt".into(),
}
} else {
CapsuleDecodeOutcome::Systematic
}
});
assert!(recovery.has_violations());
assert_eq!(recovery.recovered_tip(), CommitSeq::new(2));
}
#[test]
fn test_retirable_segments_empty_before_publish() {
let saga = CompactionSaga::new(vec![make_segment(0x10, &[0x20, 0x21], 1000)], 2.5);
assert!(saga.retirable_segments().is_empty());
assert!(!saga.is_published());
}
#[test]
fn test_cancel_after_publish_requires_rollback() {
let old_segs = vec![make_segment(0x10, &[0x20], 500)];
let mut saga = CompactionSaga::new(old_segs, 2.0);
saga.mark(vec![make_oid(0x20)]);
saga.compact(vec![make_segment(0x30, &[0x20], 400)]);
saga.mark_segments_synced();
saga.mark_locator_synced();
assert!(saga.publish());
assert!(saga.is_published());
let compensation = saga.cancel();
assert_eq!(compensation, CompactionCompensation::RollbackRequired);
assert!(saga.is_cancelled());
}
#[test]
fn test_cancel_before_publish_discards_temp_segments() {
let old_segs = vec![make_segment(0x10, &[0x20], 500)];
let mut saga = CompactionSaga::new(old_segs, 2.0);
saga.mark(vec![make_oid(0x20)]);
saga.compact(vec![make_segment(0x30, &[0x20], 400)]);
let compensation = saga.cancel();
assert_eq!(compensation, CompactionCompensation::TempSegmentsDiscarded);
assert!(saga.is_cancelled());
assert!(!saga.is_published());
}
#[test]
fn test_bucket_for_space_amp_exact_boundaries() {
assert_eq!(CompactionMdpState::bucket_for_space_amp(0.0), 0);
assert_eq!(CompactionMdpState::bucket_for_space_amp(1.499_999), 0);
assert_eq!(CompactionMdpState::bucket_for_space_amp(1.5), 1);
assert_eq!(CompactionMdpState::bucket_for_space_amp(1.999_999), 1);
assert_eq!(CompactionMdpState::bucket_for_space_amp(2.0), 2);
assert_eq!(CompactionMdpState::bucket_for_space_amp(2.999_999), 2);
assert_eq!(CompactionMdpState::bucket_for_space_amp(3.0), 3);
assert_eq!(CompactionMdpState::bucket_for_space_amp(100.0), 3);
}
#[test]
fn test_native_recovery_default_equals_new() {
let from_new = NativeRecovery::new();
let from_default = NativeRecovery::default();
assert!(from_new.root_manifest.is_none());
assert!(from_default.root_manifest.is_none());
assert_eq!(from_new.recovered_tip(), CommitSeq::ZERO);
assert_eq!(from_default.recovered_tip(), CommitSeq::ZERO);
assert!(!from_new.has_violations());
assert!(!from_default.has_violations());
}
#[test]
fn test_compaction_policy_default_equals_new() {
let from_new = CompactionPolicy::new();
let from_default = CompactionPolicy::default();
assert!(from_new.evidence_ledger().is_empty());
assert!(from_default.evidence_ledger().is_empty());
let probe = CompactionMdpState {
space_amp_bucket: 2,
read_regime: 0,
write_regime: 0,
compaction_debt: 0,
};
assert_eq!(from_new.recommend(&probe), from_default.recommend(&probe));
}
#[test]
fn test_compaction_summary_full_lifecycle_with_retire() {
let old = vec![
make_segment(0x01, &[0x10, 0x20], 2000),
make_segment(0x02, &[0x30], 1000),
];
let mut saga = CompactionSaga::new(old, 3.0);
saga.mark(vec![make_oid(0x10), make_oid(0x30)]);
saga.compact(vec![make_segment(0x03, &[0x10, 0x30], 1500)]);
saga.mark_segments_synced();
saga.mark_locator_synced();
saga.publish();
saga.register_reader_leases(Vec::new());
let retired = saga.retire();
assert_eq!(retired.len(), 2);
let summary = saga.summary();
assert_eq!(summary.live_objects, 2);
assert_eq!(summary.dead_objects, 1);
assert_eq!(summary.old_segments, 2);
assert_eq!(summary.new_segments, 1);
assert_eq!(summary.retired_segments, 2);
assert!(summary.published);
assert!(!summary.cancelled);
assert!((summary.space_amp_before - 3.0).abs() < f64::EPSILON);
}
#[test]
fn capsule_decode_outcome_debug_clone_eq_all_variants() {
let sys = CapsuleDecodeOutcome::Systematic;
let rep = CapsuleDecodeOutcome::Repaired {
repair_symbols_used: 3,
};
let fail = CapsuleDecodeOutcome::Failed {
reason: "corrupt".into(),
};
assert_eq!(sys.clone(), sys);
assert_eq!(rep.clone(), rep);
assert_eq!(fail.clone(), fail);
assert_ne!(sys, rep);
let dbg = format!("{fail:?}");
assert!(dbg.contains("Failed"));
assert!(dbg.contains("corrupt"));
}
#[test]
fn compaction_mdp_state_hash_distinguishes_fields() {
use std::collections::HashSet;
let a = CompactionMdpState {
space_amp_bucket: 0,
read_regime: 0,
write_regime: 0,
compaction_debt: 0,
};
let b = CompactionMdpState {
space_amp_bucket: 1,
read_regime: 0,
write_regime: 0,
compaction_debt: 0,
};
let c = CompactionMdpState {
space_amp_bucket: 0,
read_regime: 1,
write_regime: 0,
compaction_debt: 0,
};
let mut set = HashSet::new();
set.insert(a);
set.insert(b);
set.insert(c);
assert_eq!(set.len(), 3);
assert!(set.contains(&a));
}
#[test]
fn compaction_action_and_rate_limit_debug_clone_copy_eq() {
let defer = CompactionAction::Defer;
let now = CompactionAction::CompactNow {
rate_limit: CompactionRateLimit::Medium,
};
assert_ne!(defer, now);
let copied = defer;
assert_eq!(copied, defer);
let dbg = format!("{now:?}");
assert!(dbg.contains("CompactNow"));
assert!(dbg.contains("Medium"));
let rates = [
CompactionRateLimit::Low,
CompactionRateLimit::Medium,
CompactionRateLimit::High,
];
for r in &rates {
let c = *r;
assert_eq!(c, *r);
}
}
#[test]
fn reader_lease_debug_clone_eq_hash() {
use std::collections::HashSet;
let lease = ReaderLease {
lease_id: 42,
segment_ids: vec![make_oid(0x01), make_oid(0x02)],
};
let dbg = format!("{lease:?}");
assert!(dbg.contains("ReaderLease"));
assert!(dbg.contains("42"));
let cloned = lease.clone();
assert_eq!(cloned, lease);
let mut set = HashSet::new();
set.insert(lease.clone());
assert!(set.contains(&lease));
}
#[test]
fn durability_violation_debug_clone_eq() {
let v = DurabilityViolation {
commit_seq: CommitSeq::new(7),
capsule_object_id: make_oid(0xAA),
reason: "bad checksum".into(),
};
let cloned = v.clone();
assert_eq!(cloned, v);
let dbg = format!("{v:?}");
assert!(dbg.contains("DurabilityViolation"));
assert!(dbg.contains("bad checksum"));
let v2 = DurabilityViolation {
commit_seq: CommitSeq::new(8),
capsule_object_id: make_oid(0xAA),
reason: "bad checksum".into(),
};
assert_ne!(v, v2);
}
#[test]
fn checkpoint_ref_debug_clone_eq() {
let cp = CheckpointRef {
commit_seq: CommitSeq::new(42),
manifest_object_id: make_oid(0xBB),
};
let cloned = cp.clone();
assert_eq!(cloned, cp);
let dbg = format!("{cp:?}");
assert!(dbg.contains("CheckpointRef"));
let cp2 = CheckpointRef {
commit_seq: CommitSeq::new(43),
manifest_object_id: make_oid(0xBB),
};
assert_ne!(cp, cp2);
}
#[test]
fn compaction_summary_debug_clone_fields() {
let s = CompactionSummary {
space_amp_before: 2.5,
space_amp_after: 1.1,
live_objects: 100,
dead_objects: 50,
old_segments: 3,
new_segments: 1,
retired_segments: 2,
published: true,
cancelled: false,
};
let cloned = s.clone();
assert_eq!(cloned.live_objects, 100);
assert_eq!(cloned.dead_objects, 50);
assert!(cloned.published);
assert!(!cloned.cancelled);
let dbg = format!("{s:?}");
assert!(dbg.contains("CompactionSummary"));
}
#[test]
fn segment_ref_debug_clone_fields() {
let seg = SegmentRef {
segment_id: make_oid(0x10),
object_ids: vec![make_oid(0x20), make_oid(0x30)],
size_bytes: 4096,
};
let cloned = seg.clone();
assert_eq!(cloned.segment_id, make_oid(0x10));
assert_eq!(cloned.object_ids.len(), 2);
assert_eq!(cloned.size_bytes, 4096);
let dbg = format!("{seg:?}");
assert!(dbg.contains("SegmentRef"));
}
}