use super::fs::SimFs;
use super::seed_from_env;
use crate::coordinate::{Coordinate, Region};
use crate::event::EventKind;
use crate::store::{AppendOptions, BatchAppendItem, CausationRef, Store, StoreConfig, SyncMode};
use std::sync::Arc;
pub(crate) const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
pub(crate) const KIND: EventKind = EventKind::custom(0xC, 2);
pub(crate) fn fold(digest: u64, token: u64) -> u64 {
let mut d = digest;
for byte in token.to_le_bytes() {
d ^= u64::from(byte);
d = d.wrapping_mul(FNV_PRIME);
}
d
}
pub(crate) enum Violation {
LostDurableCommit {
durable: usize,
recovered: usize,
},
UndeadEvent {
recovered: usize,
appended: usize,
},
BrokenHashChain {
global_sequence: u64,
},
NonCanonicalReopen(String),
}
impl std::fmt::Display for Violation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::LostDurableCommit { durable, recovered } => write!(
f,
"lost acknowledged-durable commit: recovered {recovered} visible < {durable} durable"
),
Self::UndeadEvent {
recovered,
appended,
} => write!(
f,
"undead/invented event: recovered {recovered} visible > {appended} appended"
),
Self::BrokenHashChain { global_sequence } => write!(
f,
"broken hash chain at recovered event global_sequence={global_sequence}"
),
Self::NonCanonicalReopen(reason) => {
write!(f, "non-canonical reopen failure: {reason}")
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct RecoveryOutcome {
pub(crate) digest: u64,
pub(crate) recovered_visible: usize,
pub(crate) durable_acked: usize,
}
pub(crate) fn run(seed: u64, steps: usize) -> Result<RecoveryOutcome, String> {
drive(seed, steps).map_err(|v| format!("DST violation (seed={seed}): {v}"))
}
fn drive(seed: u64, steps: usize) -> Result<RecoveryOutcome, Violation> {
let dir = tempfile::tempdir().map_err(|e| Violation::NonCanonicalReopen(e.to_string()))?;
let sim_fs = Arc::new(SimFs::new(seed, 0));
let coord = Coordinate::new("entity:dst", "scope:recovery")
.map_err(|e| Violation::NonCanonicalReopen(e.to_string()))?;
let plan = OpPlan::seeded(seed, steps);
let mut digest = fold(FNV_OFFSET, seed);
let appended;
let durable_acked;
{
let config = StoreConfig::new(dir.path())
.with_fs(Arc::clone(&sim_fs) as Arc<dyn crate::store::platform::fs::StoreFs>)
.with_sync_every_n_events(1_000_000)
.with_sync_mode(SyncMode::SyncAll);
let store =
Store::open(config).map_err(|e| Violation::NonCanonicalReopen(e.to_string()))?;
let (a, d, run_digest) = run_op_plan(&store, &coord, &plan, digest)?;
appended = a;
durable_acked = d;
digest = run_digest;
store.abandon_without_shutdown();
sim_fs.crash();
}
let reopen = Store::open(StoreConfig::new(dir.path()));
let store = match reopen {
Ok(store) => store,
Err(error) if is_canonical_refusal(&error) => {
digest = fold(digest, 0xCA11_AB1E);
return Ok(RecoveryOutcome {
digest,
recovered_visible: 0,
durable_acked,
});
}
Err(other) => return Err(Violation::NonCanonicalReopen(format!("{other:?}"))),
};
let recovered = recovered_user_events(&store);
let recovered_visible = recovered.len();
if recovered_visible < durable_acked {
return Err(Violation::LostDurableCommit {
durable: durable_acked,
recovered: recovered_visible,
});
}
if recovered_visible > appended {
return Err(Violation::UndeadEvent {
recovered: recovered_visible,
appended,
});
}
verify_hash_chain(&recovered)?;
digest = fold(fold(digest, recovered_visible as u64), durable_acked as u64);
for ev in &recovered {
digest = fold(digest, ev.event_hash_token);
}
Ok(RecoveryOutcome {
digest,
recovered_visible,
durable_acked,
})
}
pub(crate) struct RecoveredEvent {
pub(crate) global_sequence: u64,
pub(crate) prev_hash: [u8; 32],
pub(crate) event_hash: [u8; 32],
pub(crate) event_hash_token: u64,
}
#[derive(Clone, Copy)]
pub(crate) enum Op {
Append,
Batch(u32),
Sync,
}
pub(crate) struct OpPlan {
pub(crate) ops: Vec<Op>,
}
impl OpPlan {
pub(crate) fn seeded(seed: u64, steps: usize) -> Self {
let mut rng = fastrand::Rng::with_seed(seed);
let mut ops = Vec::with_capacity(steps);
for _ in 0..steps {
ops.push(match rng.u32(..) % 6 {
0..=2 => Op::Append,
3..=4 => Op::Batch(1 + rng.u32(..) % 3),
_ => Op::Sync,
});
}
Self { ops }
}
}
pub(crate) fn run_op_plan(
store: &Store,
coord: &Coordinate,
plan: &OpPlan,
mut digest: u64,
) -> Result<(usize, usize, u64), Violation> {
let mut appended = 0usize;
let mut durable_acked = 0usize;
for (idx, op) in plan.ops.iter().enumerate() {
match op {
Op::Append => {
let payload = serde_json::json!({ "seq": appended, "step": idx });
if store.append(coord, KIND, &payload).is_ok() {
appended += 1;
digest = fold(digest, appended as u64);
}
}
Op::Batch(n) => {
let items = batch_items(coord, appended, *n);
if store.append_batch(items).is_ok() {
appended += *n as usize;
digest = fold(fold(digest, 0xBA7C), appended as u64);
}
}
Op::Sync => {
if Store::sync(store).is_ok() {
durable_acked = appended;
digest = fold(digest, 0x5_4_C);
}
}
}
}
Ok((appended, durable_acked, digest))
}
pub(crate) fn batch_items(coord: &Coordinate, base: usize, n: u32) -> Vec<BatchAppendItem> {
(0..n)
.filter_map(|i| {
BatchAppendItem::new(
coord.clone(),
KIND,
&serde_json::json!({ "seq": base + i as usize }),
AppendOptions::default(),
CausationRef::None,
)
.ok()
})
.collect()
}
pub(crate) fn recovered_user_events(store: &Store) -> Vec<RecoveredEvent> {
let mut events: Vec<RecoveredEvent> = store
.query(&Region::all())
.into_iter()
.filter(|entry| {
!matches!(
entry.event_kind(),
EventKind::SYSTEM_OPEN_COMPLETED | EventKind::SYSTEM_CLOSE_COMPLETED
)
})
.map(|entry| {
let chain = entry.hash_chain();
RecoveredEvent {
global_sequence: entry.global_sequence(),
prev_hash: chain.prev_hash,
event_hash: chain.event_hash,
event_hash_token: u64::from_le_bytes([
chain.event_hash[0],
chain.event_hash[1],
chain.event_hash[2],
chain.event_hash[3],
chain.event_hash[4],
chain.event_hash[5],
chain.event_hash[6],
chain.event_hash[7],
]),
}
})
.collect();
events.sort_by_key(|e| e.global_sequence);
events
}
pub(crate) fn verify_hash_chain(events: &[RecoveredEvent]) -> Result<(), Violation> {
let mut prev: Option<[u8; 32]> = None;
for ev in events {
if let Some(prev_hash) = prev {
if ev.prev_hash != prev_hash {
return Err(Violation::BrokenHashChain {
global_sequence: ev.global_sequence,
});
}
}
prev = Some(ev.event_hash);
}
Ok(())
}
pub(crate) fn is_canonical_refusal(error: &crate::store::StoreError) -> bool {
use crate::store::StoreError;
matches!(
error,
StoreError::CorruptSegment { .. }
| StoreError::CorruptFrame { .. }
| StoreError::CrcMismatch { .. }
| StoreError::DataDirMalformed { .. }
| StoreError::MmapFutureVersion { .. }
| StoreError::IdempotencyFutureVersion { .. }
)
}
pub fn run_seeded_recovery(seed: u64, steps: usize) -> Result<RecoveryOutcomePublic, String> {
run(seed, steps).map(|o| RecoveryOutcomePublic {
digest: o.digest,
recovered_visible: o.recovered_visible,
durable_acked: o.durable_acked,
})
}
pub fn recovery_replay_seed(default: u64) -> u64 {
seed_from_env(default)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RecoveryOutcomePublic {
pub digest: u64,
pub recovered_visible: usize,
pub durable_acked: usize,
}
#[cfg(test)]
fn drive_torn_publish(seed: u64, steps: usize) -> Result<RecoveryOutcome, Violation> {
use super::fs::CrashOp;
use crate::store::platform::fs::StoreFs;
let dir = tempfile::tempdir().map_err(|e| Violation::NonCanonicalReopen(e.to_string()))?;
let sim_fs = Arc::new(SimFs::new(seed, 0));
let coord = Coordinate::new("entity:dst", "scope:torn-publish")
.map_err(|e| Violation::NonCanonicalReopen(e.to_string()))?;
let plan = OpPlan::seeded(seed, steps);
let mut digest = fold(FNV_OFFSET, seed);
let appended;
let durable_acked;
{
let config = StoreConfig::new(dir.path())
.with_fs(Arc::clone(&sim_fs) as Arc<dyn StoreFs>)
.with_sync_every_n_events(1_000_000)
.with_sync_mode(SyncMode::SyncAll);
let store =
Store::open(config).map_err(|e| Violation::NonCanonicalReopen(e.to_string()))?;
let (a, _plan_durable, run_digest) = run_op_plan(&store, &coord, &plan, digest)?;
appended = a;
digest = run_digest;
Store::sync(&store).map_err(|e| Violation::NonCanonicalReopen(e.to_string()))?;
durable_acked = appended;
digest = fold(digest, 0x5_4_C);
sim_fs.arm_fault_on(CrashOp::PersistTemp, 1);
let close_result = store.close();
debug_assert!(
close_result.is_err(),
"the armed PersistTemp fault must tear a cold-start artifact publish during close"
);
drop(close_result);
sim_fs.crash();
}
let reopen = Store::open(StoreConfig::new(dir.path()));
let store = match reopen {
Ok(store) => store,
Err(error) if is_canonical_refusal(&error) => {
digest = fold(digest, 0xCA11_AB1E);
return Ok(RecoveryOutcome {
digest,
recovered_visible: 0,
durable_acked,
});
}
Err(other) => return Err(Violation::NonCanonicalReopen(format!("{other:?}"))),
};
let recovered = recovered_user_events(&store);
let recovered_visible = recovered.len();
if recovered_visible < durable_acked {
return Err(Violation::LostDurableCommit {
durable: durable_acked,
recovered: recovered_visible,
});
}
if recovered_visible > appended {
return Err(Violation::UndeadEvent {
recovered: recovered_visible,
appended,
});
}
verify_hash_chain(&recovered)?;
digest = fold(fold(digest, recovered_visible as u64), durable_acked as u64);
for ev in &recovered {
digest = fold(digest, ev.event_hash_token);
}
Ok(RecoveryOutcome {
digest,
recovered_visible,
durable_acked,
})
}
#[cfg(test)]
fn run_torn_publish(seed: u64, steps: usize) -> Result<RecoveryOutcome, String> {
drive_torn_publish(seed, steps)
.map_err(|v| format!("torn-publish DST violation (seed={seed}): {v}"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn same_seed_recovers_identically() {
let a = run(0x5EED_0001, 48).expect("legal recovery");
let b = run(0x5EED_0001, 48).expect("legal recovery");
assert_eq!(
a, b,
"PROPERTY: identical seeds must recover the same state with the same digest"
);
}
#[test]
fn different_seeds_diverge() {
let a = run(0x5EED_0002, 48).expect("legal recovery");
let b = run(0x5EED_0003, 48).expect("legal recovery");
assert_ne!(
a.digest, b.digest,
"PROPERTY: distinct seeds should (almost surely) diverge in recovered digest"
);
}
#[test]
fn recovery_preserves_durable_prefix_and_legality() {
let outcome = run(0x5EED_0004, 64).expect("legal recovery");
assert!(
outcome.recovered_visible >= outcome.durable_acked,
"PROPERTY: every acknowledged-durable commit must survive the crash"
);
}
#[test]
fn torn_publish_same_seed_recovers_identically() {
let a = run_torn_publish(0x7091_0001, 48).expect("legal torn-publish recovery");
let b = run_torn_publish(0x7091_0001, 48).expect("legal torn-publish recovery");
assert_eq!(
a, b,
"PROPERTY: identical seeds must recover the same state with the same digest \
after a torn cold-start artifact publish"
);
}
#[test]
fn torn_publish_preserves_durable_prefix_via_full_scan() {
let outcome = run_torn_publish(0x7091_0002, 64).expect("legal torn-publish recovery");
assert!(
outcome.recovered_visible >= outcome.durable_acked,
"PROPERTY: a torn cold-start artifact is an optimization, not a correctness \
dependency — every acknowledged-durable commit must survive it"
);
}
}