Skip to main content

coding_agent_search/
crash_replay.rs

1//! Deterministic crash/replay harness for state-machine proof tests.
2//!
3//! The harness is intentionally small and data-only: production code exposes
4//! named checkpoints, tests simulate a crash at each checkpoint, then restart
5//! and verify invariants. The resulting report can be saved as a JSON artifact
6//! for later replay or review.
7
8use serde::{Deserialize, Serialize};
9use std::collections::BTreeSet;
10use std::error::Error;
11use std::fmt;
12use std::fs;
13use std::io;
14use std::path::Path;
15
16pub const CRASH_REPLAY_SCHEMA_VERSION: &str = "1";
17
18#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
19pub struct CrashReplayCheckpoint {
20    pub id: String,
21    pub ordinal: u32,
22    pub description: String,
23}
24
25impl CrashReplayCheckpoint {
26    pub fn new(ordinal: u32, id: impl Into<String>, description: impl Into<String>) -> Self {
27        Self {
28            id: id.into(),
29            ordinal,
30            description: description.into(),
31        }
32    }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum CrashReplayPhase {
38    AdvanceToCheckpoint,
39    InjectCrash,
40    Restart,
41    CheckInvariants,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45pub struct CrashReplayEvent {
46    pub checkpoint_id: String,
47    pub phase: CrashReplayPhase,
48    pub ok: bool,
49    pub detail: String,
50}
51
52impl CrashReplayEvent {
53    fn ok(
54        checkpoint: &CrashReplayCheckpoint,
55        phase: CrashReplayPhase,
56        detail: impl Into<String>,
57    ) -> Self {
58        Self {
59            checkpoint_id: checkpoint.id.clone(),
60            phase,
61            ok: true,
62            detail: detail.into(),
63        }
64    }
65
66    fn failed(
67        checkpoint: &CrashReplayCheckpoint,
68        phase: CrashReplayPhase,
69        detail: impl Into<String>,
70    ) -> Self {
71        Self {
72            checkpoint_id: checkpoint.id.clone(),
73            phase,
74            ok: false,
75            detail: detail.into(),
76        }
77    }
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct CrashReplayInvariant {
82    pub checkpoint_id: String,
83    pub name: String,
84    pub passed: bool,
85    pub detail: String,
86}
87
88impl CrashReplayInvariant {
89    pub fn passed(
90        checkpoint: &CrashReplayCheckpoint,
91        name: impl Into<String>,
92        detail: impl Into<String>,
93    ) -> Self {
94        Self {
95            checkpoint_id: checkpoint.id.clone(),
96            name: name.into(),
97            passed: true,
98            detail: detail.into(),
99        }
100    }
101
102    pub fn failed(
103        checkpoint: &CrashReplayCheckpoint,
104        name: impl Into<String>,
105        detail: impl Into<String>,
106    ) -> Self {
107        Self {
108            checkpoint_id: checkpoint.id.clone(),
109            name: name.into(),
110            passed: false,
111            detail: detail.into(),
112        }
113    }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
117#[serde(rename_all = "snake_case")]
118pub enum CrashReplayVerdict {
119    Clean,
120    Failed,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
124pub struct CrashReplayReport {
125    pub schema_version: String,
126    pub scenario_id: String,
127    pub state_machine: String,
128    pub verdict: CrashReplayVerdict,
129    pub checkpoints: Vec<CrashReplayCheckpoint>,
130    pub events: Vec<CrashReplayEvent>,
131    pub invariants: Vec<CrashReplayInvariant>,
132}
133
134impl CrashReplayReport {
135    pub fn validate(&self) -> Result<(), CrashReplayValidationError> {
136        if self.schema_version != CRASH_REPLAY_SCHEMA_VERSION {
137            return Err(CrashReplayValidationError::UnsupportedSchemaVersion {
138                expected: CRASH_REPLAY_SCHEMA_VERSION,
139                actual: self.schema_version.clone(),
140            });
141        }
142        if self.scenario_id.trim().is_empty() {
143            return Err(CrashReplayValidationError::EmptyScenarioId);
144        }
145        if self.state_machine.trim().is_empty() {
146            return Err(CrashReplayValidationError::EmptyStateMachine);
147        }
148        if self.checkpoints.is_empty() {
149            return Err(CrashReplayValidationError::NoCheckpoints);
150        }
151        if self.verdict == CrashReplayVerdict::Clean && self.invariants.is_empty() {
152            return Err(CrashReplayValidationError::CleanReportWithoutInvariants);
153        }
154
155        let mut checkpoint_ids = BTreeSet::new();
156        let mut previous_ordinal = None;
157        for (index, checkpoint) in self.checkpoints.iter().enumerate() {
158            if checkpoint.id.trim().is_empty() {
159                return Err(CrashReplayValidationError::EmptyCheckpointId { index });
160            }
161            if checkpoint.description.trim().is_empty() {
162                return Err(CrashReplayValidationError::EmptyCheckpointDescription { index });
163            }
164            if let Some(previous) = previous_ordinal
165                && checkpoint.ordinal <= previous
166            {
167                return Err(CrashReplayValidationError::NonMonotoneCheckpointOrdinal {
168                    index,
169                    previous,
170                    current: checkpoint.ordinal,
171                });
172            }
173            if !checkpoint_ids.insert(checkpoint.id.as_str()) {
174                return Err(CrashReplayValidationError::DuplicateCheckpointId {
175                    index,
176                    checkpoint_id: checkpoint.id.clone(),
177                });
178            }
179            previous_ordinal = Some(checkpoint.ordinal);
180        }
181
182        let mut checked_checkpoints = BTreeSet::new();
183        for (index, event) in self.events.iter().enumerate() {
184            if event.checkpoint_id.trim().is_empty() {
185                return Err(CrashReplayValidationError::EmptyEventCheckpointId { index });
186            }
187            if !checkpoint_ids.contains(event.checkpoint_id.as_str()) {
188                return Err(CrashReplayValidationError::UnknownEventCheckpoint {
189                    index,
190                    checkpoint_id: event.checkpoint_id.clone(),
191                });
192            }
193            if event.detail.trim().is_empty() {
194                return Err(CrashReplayValidationError::EmptyEventDetail { index });
195            }
196            if event.ok && event.phase == CrashReplayPhase::CheckInvariants {
197                checked_checkpoints.insert(event.checkpoint_id.as_str());
198            }
199        }
200
201        let mut invariant_checkpoints = BTreeSet::new();
202        for (index, invariant) in self.invariants.iter().enumerate() {
203            if invariant.checkpoint_id.trim().is_empty() {
204                return Err(CrashReplayValidationError::EmptyInvariantCheckpointId { index });
205            }
206            if !checkpoint_ids.contains(invariant.checkpoint_id.as_str()) {
207                return Err(CrashReplayValidationError::UnknownInvariantCheckpoint {
208                    index,
209                    checkpoint_id: invariant.checkpoint_id.clone(),
210                });
211            }
212            if invariant.name.trim().is_empty() {
213                return Err(CrashReplayValidationError::EmptyInvariantName { index });
214            }
215            if invariant.detail.trim().is_empty() {
216                return Err(CrashReplayValidationError::EmptyInvariantDetail { index });
217            }
218            if invariant.passed {
219                invariant_checkpoints.insert(invariant.checkpoint_id.as_str());
220            }
221        }
222        if self.verdict == CrashReplayVerdict::Clean
223            && (self.events.iter().any(|event| !event.ok)
224                || self.invariants.iter().any(|invariant| !invariant.passed))
225        {
226            return Err(CrashReplayValidationError::CleanReportContainsFailure);
227        }
228        if self.verdict == CrashReplayVerdict::Clean {
229            if self.events.is_empty() {
230                return Err(CrashReplayValidationError::CleanReportWithoutEvents);
231            }
232            for checkpoint in &self.checkpoints {
233                if !checked_checkpoints.contains(checkpoint.id.as_str()) {
234                    return Err(
235                        CrashReplayValidationError::CleanReportMissingCheckpointEvent {
236                            checkpoint_id: checkpoint.id.clone(),
237                        },
238                    );
239                }
240                if !invariant_checkpoints.contains(checkpoint.id.as_str()) {
241                    return Err(
242                        CrashReplayValidationError::CleanReportMissingCheckpointInvariant {
243                            checkpoint_id: checkpoint.id.clone(),
244                        },
245                    );
246                }
247            }
248        }
249
250        Ok(())
251    }
252
253    pub fn save_json(&self, path: &Path) -> Result<(), CrashReplayIoError> {
254        self.validate()?;
255        if let Some(parent) = path
256            .parent()
257            .filter(|parent| !parent.as_os_str().is_empty())
258        {
259            fs::create_dir_all(parent)?;
260        }
261        let json = serde_json::to_vec_pretty(self)?;
262        fs::write(path, json)?;
263        Ok(())
264    }
265
266    pub fn load_json(path: &Path) -> Result<Self, CrashReplayIoError> {
267        let bytes = fs::read(path)?;
268        let report: Self = serde_json::from_slice(&bytes)?;
269        report.validate()?;
270        Ok(report)
271    }
272}
273
274#[derive(Debug, Clone, PartialEq, Eq)]
275pub struct CrashReplayError {
276    pub action: String,
277    pub detail: String,
278}
279
280impl CrashReplayError {
281    pub fn new(action: impl Into<String>, detail: impl Into<String>) -> Self {
282        Self {
283            action: action.into(),
284            detail: detail.into(),
285        }
286    }
287
288    pub fn from_error(action: impl Into<String>, error: impl fmt::Display) -> Self {
289        Self::new(action, error.to_string())
290    }
291}
292
293impl fmt::Display for CrashReplayError {
294    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
295        write!(f, "{}: {}", self.action, self.detail)
296    }
297}
298
299impl Error for CrashReplayError {}
300
301#[derive(Debug)]
302pub enum CrashReplayValidationError {
303    UnsupportedSchemaVersion {
304        expected: &'static str,
305        actual: String,
306    },
307    EmptyScenarioId,
308    EmptyStateMachine,
309    NoCheckpoints,
310    EmptyCheckpointId {
311        index: usize,
312    },
313    EmptyCheckpointDescription {
314        index: usize,
315    },
316    DuplicateCheckpointId {
317        index: usize,
318        checkpoint_id: String,
319    },
320    NonMonotoneCheckpointOrdinal {
321        index: usize,
322        previous: u32,
323        current: u32,
324    },
325    CleanReportWithoutInvariants,
326    CleanReportWithoutEvents,
327    CleanReportContainsFailure,
328    CleanReportMissingCheckpointEvent {
329        checkpoint_id: String,
330    },
331    CleanReportMissingCheckpointInvariant {
332        checkpoint_id: String,
333    },
334    EmptyEventCheckpointId {
335        index: usize,
336    },
337    UnknownEventCheckpoint {
338        index: usize,
339        checkpoint_id: String,
340    },
341    EmptyEventDetail {
342        index: usize,
343    },
344    EmptyInvariantCheckpointId {
345        index: usize,
346    },
347    UnknownInvariantCheckpoint {
348        index: usize,
349        checkpoint_id: String,
350    },
351    EmptyInvariantName {
352        index: usize,
353    },
354    EmptyInvariantDetail {
355        index: usize,
356    },
357}
358
359impl fmt::Display for CrashReplayValidationError {
360    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361        match self {
362            Self::UnsupportedSchemaVersion { expected, actual } => {
363                write!(
364                    f,
365                    "unsupported crash replay schema version {actual}; expected {expected}"
366                )
367            }
368            Self::EmptyScenarioId => write!(f, "crash replay scenario_id cannot be empty"),
369            Self::EmptyStateMachine => write!(f, "crash replay state_machine cannot be empty"),
370            Self::NoCheckpoints => write!(f, "crash replay report must include checkpoints"),
371            Self::EmptyCheckpointId { index } => {
372                write!(f, "crash replay checkpoint #{index} has an empty id")
373            }
374            Self::EmptyCheckpointDescription { index } => write!(
375                f,
376                "crash replay checkpoint #{index} has an empty description"
377            ),
378            Self::DuplicateCheckpointId {
379                index,
380                checkpoint_id,
381            } => write!(
382                f,
383                "crash replay checkpoint #{index} duplicates checkpoint id {checkpoint_id}"
384            ),
385            Self::NonMonotoneCheckpointOrdinal {
386                index,
387                previous,
388                current,
389            } => write!(
390                f,
391                "crash replay checkpoint #{index} ordinal {current} must be greater than previous ordinal {previous}"
392            ),
393            Self::CleanReportWithoutInvariants => {
394                write!(f, "clean crash replay report must include invariants")
395            }
396            Self::CleanReportWithoutEvents => {
397                write!(f, "clean crash replay report must include events")
398            }
399            Self::CleanReportContainsFailure => {
400                write!(
401                    f,
402                    "clean crash replay report contains failed events or invariants"
403                )
404            }
405            Self::CleanReportMissingCheckpointEvent { checkpoint_id } => write!(
406                f,
407                "clean crash replay report has no successful invariant-check event for checkpoint {checkpoint_id}"
408            ),
409            Self::CleanReportMissingCheckpointInvariant { checkpoint_id } => write!(
410                f,
411                "clean crash replay report has no passing invariant for checkpoint {checkpoint_id}"
412            ),
413            Self::EmptyEventCheckpointId { index } => {
414                write!(f, "crash replay event #{index} has an empty checkpoint id")
415            }
416            Self::UnknownEventCheckpoint {
417                index,
418                checkpoint_id,
419            } => write!(
420                f,
421                "crash replay event #{index} references unknown checkpoint {checkpoint_id}"
422            ),
423            Self::EmptyEventDetail { index } => {
424                write!(f, "crash replay event #{index} has an empty detail")
425            }
426            Self::EmptyInvariantCheckpointId { index } => write!(
427                f,
428                "crash replay invariant #{index} has an empty checkpoint id"
429            ),
430            Self::UnknownInvariantCheckpoint {
431                index,
432                checkpoint_id,
433            } => write!(
434                f,
435                "crash replay invariant #{index} references unknown checkpoint {checkpoint_id}"
436            ),
437            Self::EmptyInvariantName { index } => {
438                write!(f, "crash replay invariant #{index} has an empty name")
439            }
440            Self::EmptyInvariantDetail { index } => {
441                write!(f, "crash replay invariant #{index} has an empty detail")
442            }
443        }
444    }
445}
446
447impl Error for CrashReplayValidationError {}
448
449#[derive(Debug)]
450pub enum CrashReplayIoError {
451    Io(io::Error),
452    Json(serde_json::Error),
453    Validation(CrashReplayValidationError),
454}
455
456impl fmt::Display for CrashReplayIoError {
457    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
458        match self {
459            Self::Io(err) => write!(f, "crash replay I/O error: {err}"),
460            Self::Json(err) => write!(f, "crash replay JSON error: {err}"),
461            Self::Validation(err) => write!(f, "crash replay validation error: {err}"),
462        }
463    }
464}
465
466impl Error for CrashReplayIoError {
467    fn source(&self) -> Option<&(dyn Error + 'static)> {
468        match self {
469            Self::Io(err) => Some(err),
470            Self::Json(err) => Some(err),
471            Self::Validation(err) => Some(err),
472        }
473    }
474}
475
476impl From<io::Error> for CrashReplayIoError {
477    fn from(err: io::Error) -> Self {
478        Self::Io(err)
479    }
480}
481
482impl From<serde_json::Error> for CrashReplayIoError {
483    fn from(err: serde_json::Error) -> Self {
484        Self::Json(err)
485    }
486}
487
488impl From<CrashReplayValidationError> for CrashReplayIoError {
489    fn from(err: CrashReplayValidationError) -> Self {
490        Self::Validation(err)
491    }
492}
493
494pub fn replay_named_checkpoints<S, MakeState, Advance, Restart, Check>(
495    scenario_id: impl Into<String>,
496    state_machine: impl Into<String>,
497    mut checkpoints: Vec<CrashReplayCheckpoint>,
498    mut make_state: MakeState,
499    mut advance_to_checkpoint: Advance,
500    mut restart: Restart,
501    mut check_invariants: Check,
502) -> CrashReplayReport
503where
504    MakeState: FnMut() -> Result<S, CrashReplayError>,
505    Advance: FnMut(&mut S, &CrashReplayCheckpoint) -> Result<(), CrashReplayError>,
506    Restart: FnMut(&mut S) -> Result<(), CrashReplayError>,
507    Check: FnMut(&S, &CrashReplayCheckpoint) -> Vec<CrashReplayInvariant>,
508{
509    checkpoints.sort_by_key(|checkpoint| checkpoint.ordinal);
510    let mut report = CrashReplayReport {
511        schema_version: CRASH_REPLAY_SCHEMA_VERSION.to_string(),
512        scenario_id: scenario_id.into(),
513        state_machine: state_machine.into(),
514        verdict: CrashReplayVerdict::Clean,
515        checkpoints: checkpoints.clone(),
516        events: Vec::new(),
517        invariants: Vec::new(),
518    };
519
520    if checkpoints.is_empty() {
521        report.verdict = CrashReplayVerdict::Failed;
522        return report;
523    }
524
525    for checkpoint in checkpoints {
526        let mut state = match make_state() {
527            Ok(state) => state,
528            Err(err) => {
529                report.verdict = CrashReplayVerdict::Failed;
530                report.events.push(CrashReplayEvent::failed(
531                    &checkpoint,
532                    CrashReplayPhase::AdvanceToCheckpoint,
533                    format!("failed creating fresh state: {err}"),
534                ));
535                continue;
536            }
537        };
538
539        match advance_to_checkpoint(&mut state, &checkpoint) {
540            Ok(()) => report.events.push(CrashReplayEvent::ok(
541                &checkpoint,
542                CrashReplayPhase::AdvanceToCheckpoint,
543                "advanced to checkpoint",
544            )),
545            Err(err) => {
546                report.verdict = CrashReplayVerdict::Failed;
547                report.events.push(CrashReplayEvent::failed(
548                    &checkpoint,
549                    CrashReplayPhase::AdvanceToCheckpoint,
550                    err.to_string(),
551                ));
552                continue;
553            }
554        }
555
556        report.events.push(CrashReplayEvent::ok(
557            &checkpoint,
558            CrashReplayPhase::InjectCrash,
559            "simulated process stop at named checkpoint",
560        ));
561
562        match restart(&mut state) {
563            Ok(()) => report.events.push(CrashReplayEvent::ok(
564                &checkpoint,
565                CrashReplayPhase::Restart,
566                "restart action completed",
567            )),
568            Err(err) => {
569                report.verdict = CrashReplayVerdict::Failed;
570                report.events.push(CrashReplayEvent::failed(
571                    &checkpoint,
572                    CrashReplayPhase::Restart,
573                    err.to_string(),
574                ));
575                continue;
576            }
577        }
578
579        let invariants = check_invariants(&state, &checkpoint);
580        if invariants.is_empty() {
581            report.verdict = CrashReplayVerdict::Failed;
582            report.events.push(CrashReplayEvent::failed(
583                &checkpoint,
584                CrashReplayPhase::CheckInvariants,
585                "checkpoint produced no invariants",
586            ));
587            continue;
588        }
589
590        let failed = invariants.iter().any(|invariant| !invariant.passed);
591        if failed {
592            report.verdict = CrashReplayVerdict::Failed;
593        }
594        report.events.push(CrashReplayEvent {
595            checkpoint_id: checkpoint.id.clone(),
596            phase: CrashReplayPhase::CheckInvariants,
597            ok: !failed,
598            detail: format!("{} invariant(s) evaluated", invariants.len()),
599        });
600        report.invariants.extend(invariants);
601    }
602
603    report
604}
605
606#[cfg(test)]
607mod tests {
608    use super::*;
609    use crate::policy_registry::{
610        PolicyControllerStatus, PolicyFallbackState, policy_registry_snapshot,
611    };
612    use crate::search::policy::{
613        CHUNKING_STRATEGY_VERSION, SEMANTIC_SCHEMA_VERSION, SemanticPolicy,
614    };
615    use crate::search::semantic_manifest::{
616        ArtifactRecord, BuildCheckpoint, SemanticManifest, TierKind,
617    };
618    use serde_json::{Value, json};
619    use std::path::PathBuf;
620    use tempfile::TempDir;
621
622    #[derive(Debug)]
623    struct SemanticReplayState {
624        temp_dir: TempDir,
625        loaded: Option<SemanticManifest>,
626    }
627
628    impl SemanticReplayState {
629        fn data_dir(&self) -> &Path {
630            self.temp_dir.path()
631        }
632    }
633
634    fn semantic_checkpoint() -> BuildCheckpoint {
635        BuildCheckpoint {
636            tier: TierKind::Fast,
637            embedder_id: "fnv1a-384".to_string(),
638            last_offset: 8,
639            docs_embedded: 13,
640            conversations_processed: 2,
641            total_conversations: 5,
642            db_fingerprint: "semantic-fp".to_string(),
643            schema_version: SEMANTIC_SCHEMA_VERSION,
644            chunking_version: CHUNKING_STRATEGY_VERSION,
645            saved_at_ms: 1_700_000_000_000,
646            last_message_id: None,
647        }
648    }
649
650    fn semantic_artifact() -> ArtifactRecord {
651        ArtifactRecord {
652            tier: TierKind::Fast,
653            embedder_id: "fnv1a-384".to_string(),
654            model_revision: "hash".to_string(),
655            schema_version: SEMANTIC_SCHEMA_VERSION,
656            chunking_version: CHUNKING_STRATEGY_VERSION,
657            dimension: 384,
658            doc_count: 13,
659            conversation_count: 5,
660            db_fingerprint: "semantic-fp".to_string(),
661            index_path: "vector_index/fast.fsvi".to_string(),
662            size_bytes: 4096,
663            started_at_ms: 1_700_000_000_000,
664            completed_at_ms: 1_700_000_060_000,
665            ready: true,
666        }
667    }
668
669    #[test]
670    fn semantic_manifest_state_machine_replays_checkpoint_and_publish_crashes() {
671        let checkpoints = vec![
672            CrashReplayCheckpoint::new(
673                10,
674                "semantic_checkpoint_saved",
675                "semantic checkpoint persisted before artifact publish",
676            ),
677            CrashReplayCheckpoint::new(
678                20,
679                "semantic_artifact_published",
680                "semantic artifact published and checkpoint cleared",
681            ),
682        ];
683
684        let report =
685            replay_named_checkpoints(
686                "semantic-manifest-save-restart",
687                "semantic_manifest",
688                checkpoints,
689                || {
690                    Ok(SemanticReplayState {
691                        temp_dir: tempfile::tempdir()
692                            .map_err(|err| CrashReplayError::from_error("create tempdir", err))?,
693                        loaded: None,
694                    })
695                },
696                |state, checkpoint| {
697                    let mut manifest = SemanticManifest::default();
698                    manifest.refresh_backlog(5, "semantic-fp");
699                    manifest.save_checkpoint(semantic_checkpoint());
700                    if checkpoint.id == "semantic_artifact_published" {
701                        manifest.publish_artifact(semantic_artifact());
702                    }
703                    manifest
704                        .save(state.data_dir())
705                        .map_err(|err| CrashReplayError::from_error("save semantic manifest", err))
706                },
707                |state| {
708                    state.loaded = SemanticManifest::load(state.data_dir()).map_err(|err| {
709                        CrashReplayError::from_error("load semantic manifest", err)
710                    })?;
711                    Ok(())
712                },
713                |state, checkpoint| {
714                    let mut invariants = Vec::new();
715                    let Some(manifest) = &state.loaded else {
716                        return vec![CrashReplayInvariant::failed(
717                            checkpoint,
718                            "semantic_manifest_loaded",
719                            "manifest did not load after restart",
720                        )];
721                    };
722
723                    invariants.push(CrashReplayInvariant::passed(
724                        checkpoint,
725                        "semantic_manifest_loaded",
726                        "manifest loaded after restart",
727                    ));
728                    match checkpoint.id.as_str() {
729                        "semantic_checkpoint_saved" => {
730                            invariants.push(if manifest.checkpoint.is_some()
731                            && manifest.fast_tier.is_none()
732                        {
733                            CrashReplayInvariant::passed(
734                                checkpoint,
735                                "checkpoint_without_torn_artifact",
736                                "restart sees resumable checkpoint and no half-published artifact",
737                            )
738                        } else {
739                            CrashReplayInvariant::failed(
740                                checkpoint,
741                                "checkpoint_without_torn_artifact",
742                                format!(
743                                    "checkpoint={:?} fast_tier={:?}",
744                                    manifest.checkpoint, manifest.fast_tier
745                                ),
746                            )
747                        });
748                        }
749                        "semantic_artifact_published" => {
750                            invariants.push(if manifest.checkpoint.is_none()
751                            && manifest.fast_tier.as_ref().is_some_and(|artifact| artifact.ready)
752                        {
753                            CrashReplayInvariant::passed(
754                                checkpoint,
755                                "published_artifact_clears_checkpoint",
756                                "restart sees ready artifact and no stale matching checkpoint",
757                            )
758                        } else {
759                            CrashReplayInvariant::failed(
760                                checkpoint,
761                                "published_artifact_clears_checkpoint",
762                                format!(
763                                    "checkpoint={:?} fast_tier={:?}",
764                                    manifest.checkpoint, manifest.fast_tier
765                                ),
766                            )
767                        });
768                        }
769                        _ => invariants.push(CrashReplayInvariant::failed(
770                            checkpoint,
771                            "known_checkpoint",
772                            "unexpected semantic checkpoint",
773                        )),
774                    }
775                    invariants
776                },
777            );
778
779        assert_eq!(report.verdict, CrashReplayVerdict::Clean);
780        assert_eq!(report.checkpoints.len(), 2);
781        assert_eq!(report.invariants.len(), 4);
782        assert!(
783            report.validate().is_ok(),
784            "semantic replay report should validate: {report:?}"
785        );
786    }
787
788    #[derive(Debug)]
789    struct PolicyReplayState {
790        pipeline: Value,
791        semantic_available: bool,
792        semantic_fallback_mode: Option<&'static str>,
793        snapshot_statuses: Vec<(String, PolicyControllerStatus, PolicyFallbackState)>,
794    }
795
796    fn policy_pipeline_fixture(mode: &str, reason: &str) -> Value {
797        json!({
798            "pipeline_channel_size": 128,
799            "pipeline_max_message_bytes_in_flight": 1048576,
800            "page_prep_workers": 12,
801            "staged_merge_workers": 4,
802            "staged_shard_builders": 8,
803            "controller_mode": "auto",
804            "controller_restore_clear_samples": 3,
805            "controller_restore_hold_ms": 5000,
806            "controller_loadavg_high_watermark_1m": 1.75,
807            "controller_loadavg_low_watermark_1m": 0.75,
808            "runtime": {
809                "controller_mode": mode,
810                "controller_reason": reason
811            }
812        })
813    }
814
815    #[test]
816    fn policy_registry_state_machine_replays_deterministic_controller_snapshots() {
817        let checkpoints = vec![
818            CrashReplayCheckpoint::new(
819                10,
820                "semantic_fallback_snapshot",
821                "semantic controller reports lexical fallback",
822            ),
823            CrashReplayCheckpoint::new(
824                20,
825                "lexical_throttle_snapshot",
826                "lexical rebuild controller reports pressure fallback",
827            ),
828        ];
829
830        let report = replay_named_checkpoints(
831            "policy-registry-recompute-restart",
832            "policy_registry",
833            checkpoints,
834            || {
835                Ok(PolicyReplayState {
836                    pipeline: policy_pipeline_fixture("steady", "pipeline settings active"),
837                    semantic_available: true,
838                    semantic_fallback_mode: None,
839                    snapshot_statuses: Vec::new(),
840                })
841            },
842            |state, checkpoint| {
843                match checkpoint.id.as_str() {
844                    "semantic_fallback_snapshot" => {
845                        state.semantic_available = false;
846                        state.semantic_fallback_mode = Some("lexical");
847                    }
848                    "lexical_throttle_snapshot" => {
849                        state.pipeline =
850                            policy_pipeline_fixture("throttled", "load pressure reduced workers");
851                    }
852                    _ => {
853                        return Err(CrashReplayError::new(
854                            "advance policy checkpoint",
855                            "unknown checkpoint",
856                        ));
857                    }
858                }
859                Ok(())
860            },
861            |state| {
862                let policy = SemanticPolicy::compiled_defaults();
863                let snapshot = policy_registry_snapshot(
864                    &policy,
865                    state.semantic_available,
866                    state.semantic_fallback_mode,
867                    &state.pipeline,
868                );
869                state.snapshot_statuses = snapshot
870                    .controllers
871                    .into_iter()
872                    .map(|controller| {
873                        (
874                            controller.controller_id,
875                            controller.status,
876                            controller.fallback_state,
877                        )
878                    })
879                    .collect();
880                Ok(())
881            },
882            |state, checkpoint| {
883                let ids: Vec<_> = state
884                    .snapshot_statuses
885                    .iter()
886                    .map(|(id, _, _)| id.as_str())
887                    .collect();
888                let mut invariants =
889                    vec![if ids == ["lexical_rebuild_pipeline", "semantic_search"] {
890                        CrashReplayInvariant::passed(
891                            checkpoint,
892                            "controller_ids_sorted",
893                            "controller ids are deterministic and sorted",
894                        )
895                    } else {
896                        CrashReplayInvariant::failed(
897                            checkpoint,
898                            "controller_ids_sorted",
899                            format!("unexpected controller ids: {ids:?}"),
900                        )
901                    }];
902
903                let expected_controller = match checkpoint.id.as_str() {
904                    "semantic_fallback_snapshot" => "semantic_search",
905                    "lexical_throttle_snapshot" => "lexical_rebuild_pipeline",
906                    _ => "unknown",
907                };
908                let controller = state
909                    .snapshot_statuses
910                    .iter()
911                    .find(|(id, _, _)| id == expected_controller);
912                invariants.push(match controller {
913                    Some((
914                        _id,
915                        PolicyControllerStatus::Fallback,
916                        PolicyFallbackState::Conservative,
917                    )) => CrashReplayInvariant::passed(
918                        checkpoint,
919                        "conservative_fallback_reported",
920                        "checkpoint recompute reports conservative fallback",
921                    ),
922                    other => CrashReplayInvariant::failed(
923                        checkpoint,
924                        "conservative_fallback_reported",
925                        format!("unexpected controller status: {other:?}"),
926                    ),
927                });
928                invariants
929            },
930        );
931
932        assert_eq!(report.verdict, CrashReplayVerdict::Clean);
933        assert!(
934            report.validate().is_ok(),
935            "policy replay report should validate: {report:?}"
936        );
937    }
938
939    #[derive(Debug)]
940    struct LexicalPublishFixtureState {
941        temp_dir: TempDir,
942        live_path: PathBuf,
943        staged_path: PathBuf,
944        backup_path: PathBuf,
945    }
946
947    impl LexicalPublishFixtureState {
948        fn new() -> Result<Self, CrashReplayError> {
949            let temp_dir = tempfile::tempdir()
950                .map_err(|err| CrashReplayError::from_error("create tempdir", err))?;
951            let live_path = temp_dir.path().join("live-generation.txt");
952            let staged_path = temp_dir.path().join("staged-generation.txt");
953            let backup_path = temp_dir.path().join("live-generation.bak");
954            fs::write(&live_path, "old-generation")
955                .map_err(|err| CrashReplayError::from_error("seed live generation", err))?;
956            Ok(Self {
957                temp_dir,
958                live_path,
959                staged_path,
960                backup_path,
961            })
962        }
963
964        fn write_staged(&self) -> Result<(), CrashReplayError> {
965            fs::write(&self.staged_path, "new-generation")
966                .map_err(|err| CrashReplayError::from_error("write staged generation", err))
967        }
968
969        fn park_live(&self) -> Result<(), CrashReplayError> {
970            fs::rename(&self.live_path, &self.backup_path)
971                .map_err(|err| CrashReplayError::from_error("park live generation", err))
972        }
973
974        fn publish_staged(&self) -> Result<(), CrashReplayError> {
975            fs::rename(&self.staged_path, &self.live_path)
976                .map_err(|err| CrashReplayError::from_error("publish staged generation", err))
977        }
978    }
979
980    #[test]
981    fn lexical_publish_fixture_replays_park_and_swap_crash_windows() {
982        let checkpoints = vec![
983            CrashReplayCheckpoint::new(
984                10,
985                "staged_written",
986                "staged generation exists before live path is touched",
987            ),
988            CrashReplayCheckpoint::new(
989                20,
990                "live_parked",
991                "live generation has been parked but staged is not yet live",
992            ),
993            CrashReplayCheckpoint::new(
994                30,
995                "staged_published",
996                "staged generation has been promoted to live",
997            ),
998        ];
999
1000        let report = replay_named_checkpoints(
1001            "lexical-publish-fixture-restart",
1002            "lexical_publish",
1003            checkpoints,
1004            LexicalPublishFixtureState::new,
1005            |state, checkpoint| {
1006                state.write_staged()?;
1007                match checkpoint.id.as_str() {
1008                    "staged_written" => {}
1009                    "live_parked" => {
1010                        state.park_live()?;
1011                    }
1012                    "staged_published" => {
1013                        state.park_live()?;
1014                        state.publish_staged()?;
1015                    }
1016                    _ => {
1017                        return Err(CrashReplayError::new(
1018                            "advance lexical publish checkpoint",
1019                            "unknown checkpoint",
1020                        ));
1021                    }
1022                }
1023                Ok(())
1024            },
1025            |state| {
1026                if !state.live_path.exists() && state.backup_path.exists() {
1027                    fs::rename(&state.backup_path, &state.live_path)
1028                        .map_err(|err| CrashReplayError::from_error("restore parked live", err))?;
1029                }
1030                Ok(())
1031            },
1032            |state, checkpoint| {
1033                let live = fs::read_to_string(&state.live_path).ok();
1034                let expected = match checkpoint.id.as_str() {
1035                    "staged_written" | "live_parked" => "old-generation",
1036                    "staged_published" => "new-generation",
1037                    _ => "unknown",
1038                };
1039
1040                vec![
1041                    if state.temp_dir.path().exists() {
1042                        CrashReplayInvariant::passed(
1043                            checkpoint,
1044                            "fixture_root_retained",
1045                            "fixture root remains available for artifact inspection",
1046                        )
1047                    } else {
1048                        CrashReplayInvariant::failed(
1049                            checkpoint,
1050                            "fixture_root_retained",
1051                            "fixture root disappeared before invariant checks",
1052                        )
1053                    },
1054                    if live.as_deref() == Some(expected) {
1055                        CrashReplayInvariant::passed(
1056                            checkpoint,
1057                            "live_generation_is_old_or_new",
1058                            format!("live generation recovered as {expected}"),
1059                        )
1060                    } else {
1061                        CrashReplayInvariant::failed(
1062                            checkpoint,
1063                            "live_generation_is_old_or_new",
1064                            format!("expected {expected}, got {live:?}"),
1065                        )
1066                    },
1067                ]
1068            },
1069        );
1070
1071        assert_eq!(report.verdict, CrashReplayVerdict::Clean);
1072        assert!(
1073            report.validate().is_ok(),
1074            "lexical publish replay report should validate: {report:?}"
1075        );
1076    }
1077
1078    #[derive(Debug)]
1079    struct BackupRecoveryFixtureState {
1080        temp_dir: TempDir,
1081        canonical_db: PathBuf,
1082        backup_dir: PathBuf,
1083        manifest: Option<Value>,
1084    }
1085
1086    impl BackupRecoveryFixtureState {
1087        fn new() -> Result<Self, CrashReplayError> {
1088            let temp_dir = tempfile::tempdir()
1089                .map_err(|err| CrashReplayError::from_error("create tempdir", err))?;
1090            let canonical_db = temp_dir.path().join("cass.db");
1091            let backup_dir = temp_dir.path().join("backup");
1092            fs::write(&canonical_db, "canonical-main")
1093                .map_err(|err| CrashReplayError::from_error("seed canonical db", err))?;
1094            fs::write(temp_dir.path().join("cass.db-wal"), "canonical-wal")
1095                .map_err(|err| CrashReplayError::from_error("seed canonical wal", err))?;
1096            fs::create_dir_all(&backup_dir)
1097                .map_err(|err| CrashReplayError::from_error("create backup dir", err))?;
1098            Ok(Self {
1099                temp_dir,
1100                canonical_db,
1101                backup_dir,
1102                manifest: None,
1103            })
1104        }
1105
1106        fn copy_main(&self) -> Result<(), CrashReplayError> {
1107            fs::copy(&self.canonical_db, self.backup_dir.join("cass.db"))
1108                .map(|_| ())
1109                .map_err(|err| CrashReplayError::from_error("copy backup main", err))
1110        }
1111
1112        fn copy_wal_and_manifest(&self) -> Result<(), CrashReplayError> {
1113            fs::copy(
1114                self.temp_dir.path().join("cass.db-wal"),
1115                self.backup_dir.join("cass.db-wal"),
1116            )
1117            .map_err(|err| CrashReplayError::from_error("copy backup wal", err))?;
1118            let manifest = json!({
1119                "schema_version": 1,
1120                "complete": true,
1121                "files": ["cass.db", "cass.db-wal"],
1122            });
1123            let bytes = serde_json::to_vec_pretty(&manifest)
1124                .map_err(|err| CrashReplayError::from_error("encode backup manifest", err))?;
1125            fs::write(self.backup_dir.join("manifest.json"), bytes)
1126                .map_err(|err| CrashReplayError::from_error("write backup manifest", err))
1127        }
1128    }
1129
1130    #[test]
1131    fn backup_recovery_fixture_replays_incomplete_and_complete_bundle_crashes() {
1132        let checkpoints = vec![
1133            CrashReplayCheckpoint::new(
1134                10,
1135                "backup_main_copied",
1136                "backup main file copied before bundle manifest exists",
1137            ),
1138            CrashReplayCheckpoint::new(
1139                20,
1140                "backup_manifest_written",
1141                "backup sidecars and manifest mark the bundle complete",
1142            ),
1143        ];
1144
1145        let report = replay_named_checkpoints(
1146            "backup-recovery-fixture-restart",
1147            "backup_recovery",
1148            checkpoints,
1149            BackupRecoveryFixtureState::new,
1150            |state, checkpoint| {
1151                state.copy_main()?;
1152                match checkpoint.id.as_str() {
1153                    "backup_main_copied" => {}
1154                    "backup_manifest_written" => {
1155                        state.copy_wal_and_manifest()?;
1156                    }
1157                    _ => {
1158                        return Err(CrashReplayError::new(
1159                            "advance backup recovery checkpoint",
1160                            "unknown checkpoint",
1161                        ));
1162                    }
1163                }
1164                Ok(())
1165            },
1166            |state| {
1167                let manifest_path = state.backup_dir.join("manifest.json");
1168                state.manifest = if manifest_path.exists() {
1169                    let bytes = fs::read(&manifest_path)
1170                        .map_err(|err| CrashReplayError::from_error("read backup manifest", err))?;
1171                    Some(serde_json::from_slice(&bytes).map_err(|err| {
1172                        CrashReplayError::from_error("parse backup manifest", err)
1173                    })?)
1174                } else {
1175                    None
1176                };
1177                Ok(())
1178            },
1179            |state, checkpoint| {
1180                let canonical = fs::read_to_string(&state.canonical_db).ok();
1181                let mut invariants = vec![if canonical.as_deref() == Some("canonical-main") {
1182                    CrashReplayInvariant::passed(
1183                        checkpoint,
1184                        "canonical_db_preserved",
1185                        "restart did not replace the canonical DB from an incomplete backup",
1186                    )
1187                } else {
1188                    CrashReplayInvariant::failed(
1189                        checkpoint,
1190                        "canonical_db_preserved",
1191                        format!("unexpected canonical DB content: {canonical:?}"),
1192                    )
1193                }];
1194
1195                match checkpoint.id.as_str() {
1196                    "backup_main_copied" => {
1197                        invariants.push(if state.manifest.is_none() {
1198                            CrashReplayInvariant::passed(
1199                                checkpoint,
1200                                "partial_backup_not_marked_complete",
1201                                "main-only backup has no manifest and is not advertised recoverable",
1202                            )
1203                        } else {
1204                            CrashReplayInvariant::failed(
1205                                checkpoint,
1206                                "partial_backup_not_marked_complete",
1207                                format!("unexpected manifest: {:?}", state.manifest),
1208                            )
1209                        });
1210                    }
1211                    "backup_manifest_written" => {
1212                        let complete = state
1213                            .manifest
1214                            .as_ref()
1215                            .and_then(|manifest| manifest.get("complete"))
1216                            .and_then(Value::as_bool)
1217                            == Some(true);
1218                        let files_match = state
1219                            .manifest
1220                            .as_ref()
1221                            .and_then(|manifest| manifest.get("files"))
1222                            .and_then(Value::as_array)
1223                            .map(|files| {
1224                                let mut names = files.iter().filter_map(Value::as_str);
1225                                matches!(
1226                                    (names.next(), names.next(), names.next()),
1227                                    (Some("cass.db"), Some("cass.db-wal"), None)
1228                                )
1229                            })
1230                            == Some(true);
1231                        let wal_exists = state.backup_dir.join("cass.db-wal").exists();
1232                        invariants.push(if complete && files_match && wal_exists {
1233                            CrashReplayInvariant::passed(
1234                                checkpoint,
1235                                "complete_backup_manifest_matches_sidecars",
1236                                "complete manifest is present only with expected sidecars",
1237                            )
1238                        } else {
1239                            CrashReplayInvariant::failed(
1240                                checkpoint,
1241                                "complete_backup_manifest_matches_sidecars",
1242                                format!(
1243                                    "complete={complete} files_match={files_match} wal_exists={wal_exists}"
1244                                ),
1245                            )
1246                        });
1247                    }
1248                    _ => invariants.push(CrashReplayInvariant::failed(
1249                        checkpoint,
1250                        "known_backup_checkpoint",
1251                        "unexpected backup checkpoint",
1252                    )),
1253                }
1254                invariants
1255            },
1256        );
1257
1258        assert_eq!(report.verdict, CrashReplayVerdict::Clean);
1259        assert!(
1260            report.validate().is_ok(),
1261            "backup recovery replay report should validate: {report:?}"
1262        );
1263    }
1264
1265    #[test]
1266    fn crash_replay_report_round_trips_as_artifact_manifest()
1267    -> Result<(), Box<dyn std::error::Error>> {
1268        let temp_dir = tempfile::tempdir()?;
1269        let path = temp_dir
1270            .path()
1271            .join("artifacts/crash-replay/crash-replay-report.json");
1272        let checkpoints = vec![CrashReplayCheckpoint::new(
1273            1,
1274            "only_checkpoint",
1275            "single checkpoint for artifact round-trip",
1276        )];
1277        let report = replay_named_checkpoints(
1278            "artifact-round-trip",
1279            "harness",
1280            checkpoints,
1281            || Ok(()),
1282            |_state, _checkpoint| Ok(()),
1283            |_state| Ok(()),
1284            |_state, checkpoint| {
1285                vec![CrashReplayInvariant::passed(
1286                    checkpoint,
1287                    "round_trip_invariant",
1288                    "round-trip invariant passed",
1289                )]
1290            },
1291        );
1292
1293        report.save_json(&path)?;
1294        let loaded = CrashReplayReport::load_json(&path)?;
1295
1296        assert_eq!(loaded, report);
1297        Ok(())
1298    }
1299
1300    #[test]
1301    fn crash_replay_validation_rejects_untrustworthy_clean_reports() {
1302        let checkpoint = CrashReplayCheckpoint::new(1, "checkpoint", "validation checkpoint");
1303        let report = CrashReplayReport {
1304            schema_version: CRASH_REPLAY_SCHEMA_VERSION.to_string(),
1305            scenario_id: "bad-clean-report".to_string(),
1306            state_machine: "harness".to_string(),
1307            verdict: CrashReplayVerdict::Clean,
1308            checkpoints: vec![checkpoint.clone()],
1309            events: vec![CrashReplayEvent {
1310                checkpoint_id: checkpoint.id.clone(),
1311                phase: CrashReplayPhase::CheckInvariants,
1312                ok: true,
1313                detail: "checked".to_string(),
1314            }],
1315            invariants: vec![CrashReplayInvariant::failed(
1316                &checkpoint,
1317                "must_not_fail",
1318                "intentional validation failure",
1319            )],
1320        };
1321
1322        assert!(matches!(
1323            report.validate(),
1324            Err(CrashReplayValidationError::CleanReportContainsFailure)
1325        ));
1326
1327        let duplicate_checkpoint = CrashReplayCheckpoint {
1328            ordinal: 2,
1329            ..checkpoint.clone()
1330        };
1331        let duplicate_report = CrashReplayReport {
1332            checkpoints: vec![checkpoint.clone(), duplicate_checkpoint],
1333            ..report.clone()
1334        };
1335        assert!(matches!(
1336            duplicate_report.validate(),
1337            Err(CrashReplayValidationError::DuplicateCheckpointId { .. })
1338        ));
1339
1340        let missing_check_event_report = CrashReplayReport {
1341            events: vec![CrashReplayEvent {
1342                checkpoint_id: checkpoint.id.clone(),
1343                phase: CrashReplayPhase::AdvanceToCheckpoint,
1344                ok: true,
1345                detail: "advanced".to_string(),
1346            }],
1347            invariants: vec![CrashReplayInvariant::passed(
1348                &checkpoint,
1349                "passing_but_unchecked",
1350                "invariant exists but no check event proves it ran",
1351            )],
1352            ..report
1353        };
1354        assert!(matches!(
1355            missing_check_event_report.validate(),
1356            Err(CrashReplayValidationError::CleanReportMissingCheckpointEvent { .. })
1357        ));
1358    }
1359}