use super::fs::SimFs;
use super::recovery::{
batch_items, fold, is_canonical_refusal, recovered_user_events, verify_hash_chain, Op, OpPlan,
Violation, FNV_OFFSET, KIND,
};
use super::seed_from_env;
use crate::coordinate::Coordinate;
use crate::store::fault::{FaultInjector, InjectionPoint};
use crate::store::{Store, StoreConfig, StoreError, SyncMode};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
const LIFECYCLE_ENTITY: &str = "batpak:store";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum Boundary {
SingleAppendFrame,
BatchCommitMarker,
BatchPostFsyncPrePublish,
SegmentRotationCreate,
}
impl Boundary {
pub(crate) const ALL: [Boundary; 4] = [
Boundary::SingleAppendFrame,
Boundary::BatchCommitMarker,
Boundary::BatchPostFsyncPrePublish,
Boundary::SegmentRotationCreate,
];
#[cfg(test)]
pub(crate) fn as_str(self) -> &'static str {
match self {
Boundary::SingleAppendFrame => "SingleAppendFrame",
Boundary::BatchCommitMarker => "BatchCommitMarker",
Boundary::BatchPostFsyncPrePublish => "BatchPostFsyncPrePublish",
Boundary::SegmentRotationCreate => "SegmentRotationCreate",
}
}
pub(crate) fn parse(raw: &str) -> Option<Self> {
match raw {
"SingleAppendFrame" => Some(Boundary::SingleAppendFrame),
"BatchCommitMarker" => Some(Boundary::BatchCommitMarker),
"BatchPostFsyncPrePublish" => Some(Boundary::BatchPostFsyncPrePublish),
"SegmentRotationCreate" => Some(Boundary::SegmentRotationCreate),
_ => None,
}
}
fn token(self) -> u64 {
match self {
Boundary::SingleAppendFrame => 0xB0_01,
Boundary::BatchCommitMarker => 0xB0_02,
Boundary::BatchPostFsyncPrePublish => 0xB0_03,
Boundary::SegmentRotationCreate => 0xB0_04,
}
}
fn matches(self, point: &InjectionPoint) -> bool {
match self {
Boundary::SingleAppendFrame => {
matches!(point, InjectionPoint::SingleAppendWritten { entity } if entity != LIFECYCLE_ENTITY)
}
Boundary::BatchCommitMarker => {
matches!(point, InjectionPoint::BatchCommitWritten { .. })
}
Boundary::BatchPostFsyncPrePublish => {
matches!(point, InjectionPoint::BatchPrePublish { .. })
}
Boundary::SegmentRotationCreate => {
matches!(point, InjectionPoint::SegmentRotationCreate { .. })
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum FaultMode {
HonestDiskCrash,
LyingDiskFsyncDrop {
one_in: u32,
},
CrashBeforeFsync {
boundary: Boundary,
},
}
impl FaultMode {
fn token(self) -> u64 {
match self {
FaultMode::HonestDiskCrash => fold(0xF0_DE_00, 0),
FaultMode::LyingDiskFsyncDrop { one_in } => fold(0xF0_DE_01, u64::from(one_in)),
FaultMode::CrashBeforeFsync { boundary } => fold(0xF0_DE_02, boundary.token()),
}
}
fn is_lying_disk(self) -> bool {
matches!(self, FaultMode::LyingDiskFsyncDrop { .. })
}
fn fsync_drop_one_in(self) -> u32 {
match self {
FaultMode::LyingDiskFsyncDrop { one_in } => one_in.max(1),
FaultMode::HonestDiskCrash | FaultMode::CrashBeforeFsync { .. } => 0,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum Classification {
CommittedPrefix,
RolledBack,
CanonicalRefusal,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct CellOutcome {
pub(crate) digest: u64,
pub(crate) classification: Classification,
pub(crate) recovered_visible: usize,
pub(crate) durable_acked: usize,
}
pub(crate) enum CellViolation {
LostDurableCommit {
durable: usize,
recovered: usize,
},
UndeadEvent {
recovered: usize,
appended: usize,
},
BrokenHashChain {
global_sequence: u64,
},
NonCanonicalReopen(String),
}
impl From<Violation> for CellViolation {
fn from(v: Violation) -> Self {
match v {
Violation::BrokenHashChain { global_sequence } => {
CellViolation::BrokenHashChain { global_sequence }
}
Violation::LostDurableCommit { durable, recovered } => {
CellViolation::LostDurableCommit { durable, recovered }
}
Violation::UndeadEvent {
recovered,
appended,
} => CellViolation::UndeadEvent {
recovered,
appended,
},
Violation::NonCanonicalReopen(reason) => CellViolation::NonCanonicalReopen(reason),
}
}
}
impl std::fmt::Display for CellViolation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::LostDurableCommit { durable, recovered } => write!(
f,
"lost acknowledged-durable commit: recovered {recovered} visible < {durable} durable \
(honest disk)"
),
Self::UndeadEvent {
recovered,
appended,
} => write!(
f,
"undead/invented event: recovered {recovered} visible > {appended} appended"
),
Self::BrokenHashChain { global_sequence } => write!(
f,
"broken hash chain at recovered event global_sequence={global_sequence}"
),
Self::NonCanonicalReopen(reason) => write!(f, "non-canonical reopen: {reason}"),
}
}
}
pub(crate) fn run(seed: u64, steps: usize, mode: FaultMode) -> Result<CellOutcome, String> {
drive(seed, steps, mode).map_err(|v| format!("B3 violation (seed={seed}, mode={mode:?}): {v}"))
}
struct DriveStats {
attempted: usize,
durable_acked: usize,
digest: u64,
}
fn drive(seed: u64, steps: usize, mode: FaultMode) -> Result<CellOutcome, CellViolation> {
let dir = tempfile::tempdir().map_err(|e| CellViolation::NonCanonicalReopen(e.to_string()))?;
let sim_fs = Arc::new(SimFs::new(seed, mode.fsync_drop_one_in()));
let coord = Coordinate::new("entity:b3", "scope:recovery")
.map_err(|e| CellViolation::NonCanonicalReopen(e.to_string()))?;
let plan = OpPlan::seeded(seed, steps);
let digest = fold(fold(FNV_OFFSET, seed), mode.token());
let stats = drive_until_crash(&dir, &sim_fs, &coord, &plan, mode, digest)?;
match Store::open(StoreConfig::new(dir.path())) {
Ok(store) => classify_open(&store, &stats, mode),
Err(error) if is_canonical_refusal(&error) => Ok(CellOutcome {
digest: fold(stats.digest, 0xCA11_AB1E),
classification: Classification::CanonicalRefusal,
recovered_visible: 0,
durable_acked: stats.durable_acked,
}),
Err(other) => Err(CellViolation::NonCanonicalReopen(format!("{other:?}"))),
}
}
fn is_injected_crash(err: &StoreError) -> bool {
matches!(
err,
StoreError::FaultInjected(_) | StoreError::BatchFailed { .. }
)
}
fn drive_ops(store: &Store, coord: &Coordinate, plan: &OpPlan, digest: u64) -> DriveStats {
let mut attempted = 0usize;
let mut succeeded = 0usize;
let mut durable_acked = 0usize;
let mut digest = digest;
for (idx, op) in plan.ops.iter().enumerate() {
match op {
Op::Append => {
let payload = serde_json::json!({ "seq": attempted, "step": idx });
attempted += 1;
digest = fold(digest, attempted as u64);
match store.append(coord, KIND, &payload) {
Ok(_) => succeeded += 1,
Err(e) if is_injected_crash(&e) => break,
Err(_) => {}
}
}
Op::Batch(n) => {
let items = batch_items(coord, attempted, *n);
attempted += *n as usize;
digest = fold(fold(digest, 0xBA7C), attempted as u64);
match store.append_batch(items) {
Ok(_) => succeeded += *n as usize,
Err(e) if is_injected_crash(&e) => break,
Err(_) => {}
}
}
Op::Sync => {
if Store::sync(store).is_ok() {
durable_acked = succeeded;
digest = fold(digest, 0x5_4_C);
}
}
}
}
DriveStats {
attempted,
durable_acked,
digest,
}
}
fn driven_config(dir: &tempfile::TempDir, sim_fs: &Arc<SimFs>) -> StoreConfig {
StoreConfig::new(dir.path())
.with_fs(Arc::clone(sim_fs) as Arc<dyn crate::store::platform::fs::StoreFs>)
.with_sync_every_n_events(1_000_000)
.with_sync_mode(SyncMode::SyncAll)
}
fn drive_until_crash(
dir: &tempfile::TempDir,
sim_fs: &Arc<SimFs>,
coord: &Coordinate,
plan: &OpPlan,
mode: FaultMode,
digest: u64,
) -> Result<DriveStats, CellViolation> {
let injector = match mode {
FaultMode::CrashBeforeFsync { boundary } => {
let baseline = Store::open(driven_config(dir, sim_fs))
.map_err(|e| CellViolation::NonCanonicalReopen(e.to_string()))?;
baseline
.close()
.map_err(|e| CellViolation::NonCanonicalReopen(e.to_string()))?;
Some(injector_for(boundary))
}
FaultMode::HonestDiskCrash | FaultMode::LyingDiskFsyncDrop { .. } => None,
};
let store = Store::open(driven_config(dir, sim_fs).with_fault_injector(injector))
.map_err(|e| CellViolation::NonCanonicalReopen(e.to_string()))?;
let stats = drive_ops(&store, coord, plan, digest);
store.abandon_without_shutdown();
sim_fs.crash();
Ok(stats)
}
struct OneShotInjector {
boundary: Boundary,
armed: AtomicBool,
}
impl FaultInjector for OneShotInjector {
fn check(&self, point: InjectionPoint) -> Option<StoreError> {
if !self.boundary.matches(&point) {
return None;
}
if self
.armed
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
Some(StoreError::FaultInjected(format!(
"B3: simulated crash at durability boundary at {point:?}"
)))
} else {
None
}
}
}
fn injector_for(boundary: Boundary) -> Arc<dyn FaultInjector> {
Arc::new(OneShotInjector {
boundary,
armed: AtomicBool::new(true),
})
}
fn classify_open(
store: &Store,
stats: &DriveStats,
mode: FaultMode,
) -> Result<CellOutcome, CellViolation> {
let recovered = recovered_user_events(store);
let recovered_visible = recovered.len();
if !mode.is_lying_disk() && recovered_visible < stats.durable_acked {
return Err(CellViolation::LostDurableCommit {
durable: stats.durable_acked,
recovered: recovered_visible,
});
}
if recovered_visible > stats.attempted {
return Err(CellViolation::UndeadEvent {
recovered: recovered_visible,
appended: stats.attempted,
});
}
verify_hash_chain(&recovered)?;
let mut digest = fold(
fold(stats.digest, recovered_visible as u64),
stats.durable_acked as u64,
);
for ev in &recovered {
digest = fold(digest, ev.event_hash_token);
}
let classification = if recovered_visible == 0 {
Classification::RolledBack
} else {
Classification::CommittedPrefix
};
Ok(CellOutcome {
digest,
classification,
recovered_visible,
durable_acked: stats.durable_acked,
})
}
pub(crate) fn all_modes() -> Vec<FaultMode> {
let mut modes = vec![
FaultMode::HonestDiskCrash,
FaultMode::LyingDiskFsyncDrop { one_in: 2 },
FaultMode::LyingDiskFsyncDrop { one_in: 5 },
];
for boundary in Boundary::ALL {
modes.push(FaultMode::CrashBeforeFsync { boundary });
}
modes
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecoveredClass {
CommittedPrefix,
RolledBack,
CanonicalRefusal,
}
impl From<Classification> for RecoveredClass {
fn from(c: Classification) -> Self {
match c {
Classification::CommittedPrefix => RecoveredClass::CommittedPrefix,
Classification::RolledBack => RecoveredClass::RolledBack,
Classification::CanonicalRefusal => RecoveredClass::CanonicalRefusal,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MatrixCell {
pub mode: String,
pub class: RecoveredClass,
pub digest: u64,
pub recovered_visible: usize,
pub durable_acked: usize,
}
fn mode_label(mode: FaultMode) -> String {
match mode {
FaultMode::HonestDiskCrash => "honest-disk-crash".to_string(),
FaultMode::LyingDiskFsyncDrop { one_in } => format!("lying-disk-fsync-drop-1-in-{one_in}"),
FaultMode::CrashBeforeFsync { boundary } => {
format!("crash-before-fsync@{boundary:?}")
}
}
}
pub fn run_recovery_matrix(seed: u64, steps: usize) -> Result<Vec<MatrixCell>, String> {
all_modes()
.into_iter()
.map(|mode| {
run(seed, steps, mode).map(|o| MatrixCell {
mode: mode_label(mode),
class: o.classification.into(),
digest: o.digest,
recovered_visible: o.recovered_visible,
durable_acked: o.durable_acked,
})
})
.collect()
}
pub fn matrix_replay_seed(default: u64) -> u64 {
seed_from_env(default)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn every_mode_recovers_legally() {
for mode in all_modes() {
let result = run(0x5EED_B301, 64, mode);
assert!(
result.is_ok(),
"mode {mode:?} must recover legally: {result:?}"
);
}
}
#[test]
fn same_seed_same_classification_per_mode() {
for mode in all_modes() {
let a = run(0x5EED_B302, 48, mode).expect("legal");
let b = run(0x5EED_B302, 48, mode).expect("legal");
assert_eq!(
a, b,
"PROPERTY: identical (seed, mode={mode:?}) must recover identically"
);
}
}
#[test]
fn lying_disk_relaxes_no_loss_but_not_undead() {
let mode = FaultMode::LyingDiskFsyncDrop { one_in: 2 };
assert!(
mode.is_lying_disk(),
"lying disk must relax the no-loss rule"
);
let outcome = run(0x5EED_B303, 64, mode).expect("legal under lying disk");
assert!(
outcome.recovered_visible <= 64 * 3,
"no undead events even under a lying disk"
);
}
#[test]
fn honest_disk_preserves_durable_prefix() {
let outcome = run(0x5EED_B304, 64, FaultMode::HonestDiskCrash).expect("legal");
assert!(
outcome.recovered_visible >= outcome.durable_acked,
"SACRED RULE: honest disk never loses an acknowledged-durable commit"
);
}
#[test]
fn sim_parent_dir_sync_fail_closed_model_honest_deferral() {
assert!(
Boundary::ALL.contains(&Boundary::SegmentRotationCreate),
"PROPERTY: segment-rotation-create boundary exercises the parent-dir-sync window"
);
}
}