Skip to main content

coding_agent_search/
crash_replay.rs

1//! Deterministic crash/replay harness for state-machine proof tests.
2//!
3//! The harness is intentionally small and data-only: production code exposes
4//! named checkpoints, tests simulate a crash at each checkpoint, then restart
5//! and verify invariants. The resulting report can be saved as a JSON artifact
6//! for later replay or review.
7
8use serde::{Deserialize, Serialize};
9use std::collections::BTreeSet;
10use std::error::Error;
11use std::fmt;
12use std::fs;
13use std::io;
14use std::path::Path;
15
16pub const CRASH_REPLAY_SCHEMA_VERSION: &str = "1";
17
18#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
19pub struct CrashReplayCheckpoint {
20    pub id: String,
21    pub ordinal: u32,
22    pub description: String,
23}
24
25impl CrashReplayCheckpoint {
26    pub fn new(ordinal: u32, id: impl Into<String>, description: impl Into<String>) -> Self {
27        Self {
28            id: id.into(),
29            ordinal,
30            description: description.into(),
31        }
32    }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum CrashReplayPhase {
38    AdvanceToCheckpoint,
39    InjectCrash,
40    Restart,
41    CheckInvariants,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45pub struct CrashReplayEvent {
46    pub checkpoint_id: String,
47    pub phase: CrashReplayPhase,
48    pub ok: bool,
49    pub detail: String,
50}
51
52impl CrashReplayEvent {
53    fn ok(
54        checkpoint: &CrashReplayCheckpoint,
55        phase: CrashReplayPhase,
56        detail: impl Into<String>,
57    ) -> Self {
58        Self {
59            checkpoint_id: checkpoint.id.clone(),
60            phase,
61            ok: true,
62            detail: detail.into(),
63        }
64    }
65
66    fn failed(
67        checkpoint: &CrashReplayCheckpoint,
68        phase: CrashReplayPhase,
69        detail: impl Into<String>,
70    ) -> Self {
71        Self {
72            checkpoint_id: checkpoint.id.clone(),
73            phase,
74            ok: false,
75            detail: detail.into(),
76        }
77    }
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct CrashReplayInvariant {
82    pub checkpoint_id: String,
83    pub name: String,
84    pub passed: bool,
85    pub detail: String,
86}
87
88impl CrashReplayInvariant {
89    pub fn passed(
90        checkpoint: &CrashReplayCheckpoint,
91        name: impl Into<String>,
92        detail: impl Into<String>,
93    ) -> Self {
94        Self {
95            checkpoint_id: checkpoint.id.clone(),
96            name: name.into(),
97            passed: true,
98            detail: detail.into(),
99        }
100    }
101
102    pub fn failed(
103        checkpoint: &CrashReplayCheckpoint,
104        name: impl Into<String>,
105        detail: impl Into<String>,
106    ) -> Self {
107        Self {
108            checkpoint_id: checkpoint.id.clone(),
109            name: name.into(),
110            passed: false,
111            detail: detail.into(),
112        }
113    }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
117#[serde(rename_all = "snake_case")]
118pub enum CrashReplayVerdict {
119    Clean,
120    Failed,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
124pub struct CrashReplayReport {
125    pub schema_version: String,
126    pub scenario_id: String,
127    pub state_machine: String,
128    pub verdict: CrashReplayVerdict,
129    pub checkpoints: Vec<CrashReplayCheckpoint>,
130    pub events: Vec<CrashReplayEvent>,
131    pub invariants: Vec<CrashReplayInvariant>,
132}
133
134impl CrashReplayReport {
135    pub fn validate(&self) -> Result<(), CrashReplayValidationError> {
136        if self.schema_version != CRASH_REPLAY_SCHEMA_VERSION {
137            return Err(CrashReplayValidationError::UnsupportedSchemaVersion {
138                expected: CRASH_REPLAY_SCHEMA_VERSION,
139                actual: self.schema_version.clone(),
140            });
141        }
142        if self.scenario_id.trim().is_empty() {
143            return Err(CrashReplayValidationError::EmptyScenarioId);
144        }
145        if self.state_machine.trim().is_empty() {
146            return Err(CrashReplayValidationError::EmptyStateMachine);
147        }
148        if self.checkpoints.is_empty() {
149            return Err(CrashReplayValidationError::NoCheckpoints);
150        }
151        if self.verdict == CrashReplayVerdict::Clean && self.invariants.is_empty() {
152            return Err(CrashReplayValidationError::CleanReportWithoutInvariants);
153        }
154
155        let mut checkpoint_ids = BTreeSet::new();
156        let mut previous_ordinal = None;
157        for (index, checkpoint) in self.checkpoints.iter().enumerate() {
158            if checkpoint.id.trim().is_empty() {
159                return Err(CrashReplayValidationError::EmptyCheckpointId { index });
160            }
161            if checkpoint.description.trim().is_empty() {
162                return Err(CrashReplayValidationError::EmptyCheckpointDescription { index });
163            }
164            if let Some(previous) = previous_ordinal
165                && checkpoint.ordinal <= previous
166            {
167                return Err(CrashReplayValidationError::NonMonotoneCheckpointOrdinal {
168                    index,
169                    previous,
170                    current: checkpoint.ordinal,
171                });
172            }
173            if !checkpoint_ids.insert(checkpoint.id.as_str()) {
174                return Err(CrashReplayValidationError::DuplicateCheckpointId {
175                    index,
176                    checkpoint_id: checkpoint.id.clone(),
177                });
178            }
179            previous_ordinal = Some(checkpoint.ordinal);
180        }
181
182        let mut checked_checkpoints = BTreeSet::new();
183        for (index, event) in self.events.iter().enumerate() {
184            if event.checkpoint_id.trim().is_empty() {
185                return Err(CrashReplayValidationError::EmptyEventCheckpointId { index });
186            }
187            if !checkpoint_ids.contains(event.checkpoint_id.as_str()) {
188                return Err(CrashReplayValidationError::UnknownEventCheckpoint {
189                    index,
190                    checkpoint_id: event.checkpoint_id.clone(),
191                });
192            }
193            if event.detail.trim().is_empty() {
194                return Err(CrashReplayValidationError::EmptyEventDetail { index });
195            }
196            if event.ok && event.phase == CrashReplayPhase::CheckInvariants {
197                checked_checkpoints.insert(event.checkpoint_id.as_str());
198            }
199        }
200
201        let mut invariant_checkpoints = BTreeSet::new();
202        for (index, invariant) in self.invariants.iter().enumerate() {
203            if invariant.checkpoint_id.trim().is_empty() {
204                return Err(CrashReplayValidationError::EmptyInvariantCheckpointId { index });
205            }
206            if !checkpoint_ids.contains(invariant.checkpoint_id.as_str()) {
207                return Err(CrashReplayValidationError::UnknownInvariantCheckpoint {
208                    index,
209                    checkpoint_id: invariant.checkpoint_id.clone(),
210                });
211            }
212            if invariant.name.trim().is_empty() {
213                return Err(CrashReplayValidationError::EmptyInvariantName { index });
214            }
215            if invariant.detail.trim().is_empty() {
216                return Err(CrashReplayValidationError::EmptyInvariantDetail { index });
217            }
218            if invariant.passed {
219                invariant_checkpoints.insert(invariant.checkpoint_id.as_str());
220            }
221        }
222        if self.verdict == CrashReplayVerdict::Clean
223            && (self.events.iter().any(|event| !event.ok)
224                || self.invariants.iter().any(|invariant| !invariant.passed))
225        {
226            return Err(CrashReplayValidationError::CleanReportContainsFailure);
227        }
228        if self.verdict == CrashReplayVerdict::Clean {
229            if self.events.is_empty() {
230                return Err(CrashReplayValidationError::CleanReportWithoutEvents);
231            }
232            for checkpoint in &self.checkpoints {
233                if !checked_checkpoints.contains(checkpoint.id.as_str()) {
234                    return Err(
235                        CrashReplayValidationError::CleanReportMissingCheckpointEvent {
236                            checkpoint_id: checkpoint.id.clone(),
237                        },
238                    );
239                }
240                if !invariant_checkpoints.contains(checkpoint.id.as_str()) {
241                    return Err(
242                        CrashReplayValidationError::CleanReportMissingCheckpointInvariant {
243                            checkpoint_id: checkpoint.id.clone(),
244                        },
245                    );
246                }
247            }
248        }
249
250        Ok(())
251    }
252
253    pub fn save_json(&self, path: &Path) -> Result<(), CrashReplayIoError> {
254        self.validate()?;
255        if let Some(parent) = path
256            .parent()
257            .filter(|parent| !parent.as_os_str().is_empty())
258        {
259            fs::create_dir_all(parent)?;
260        }
261        let json = serde_json::to_vec_pretty(self)?;
262        fs::write(path, json)?;
263        Ok(())
264    }
265
266    pub fn load_json(path: &Path) -> Result<Self, CrashReplayIoError> {
267        let bytes = fs::read(path)?;
268        let report: Self = serde_json::from_slice(&bytes)?;
269        report.validate()?;
270        Ok(report)
271    }
272}
273
274#[derive(Debug, Clone, PartialEq, Eq)]
275pub struct CrashReplayError {
276    pub action: String,
277    pub detail: String,
278}
279
280impl CrashReplayError {
281    pub fn new(action: impl Into<String>, detail: impl Into<String>) -> Self {
282        Self {
283            action: action.into(),
284            detail: detail.into(),
285        }
286    }
287
288    pub fn from_error(action: impl Into<String>, error: impl fmt::Display) -> Self {
289        Self::new(action, error.to_string())
290    }
291}
292
293impl fmt::Display for CrashReplayError {
294    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
295        write!(f, "{}: {}", self.action, self.detail)
296    }
297}
298
299impl Error for CrashReplayError {}
300
301#[derive(Debug)]
302pub enum CrashReplayValidationError {
303    UnsupportedSchemaVersion {
304        expected: &'static str,
305        actual: String,
306    },
307    EmptyScenarioId,
308    EmptyStateMachine,
309    NoCheckpoints,
310    EmptyCheckpointId {
311        index: usize,
312    },
313    EmptyCheckpointDescription {
314        index: usize,
315    },
316    DuplicateCheckpointId {
317        index: usize,
318        checkpoint_id: String,
319    },
320    NonMonotoneCheckpointOrdinal {
321        index: usize,
322        previous: u32,
323        current: u32,
324    },
325    CleanReportWithoutInvariants,
326    CleanReportWithoutEvents,
327    CleanReportContainsFailure,
328    CleanReportMissingCheckpointEvent {
329        checkpoint_id: String,
330    },
331    CleanReportMissingCheckpointInvariant {
332        checkpoint_id: String,
333    },
334    EmptyEventCheckpointId {
335        index: usize,
336    },
337    UnknownEventCheckpoint {
338        index: usize,
339        checkpoint_id: String,
340    },
341    EmptyEventDetail {
342        index: usize,
343    },
344    EmptyInvariantCheckpointId {
345        index: usize,
346    },
347    UnknownInvariantCheckpoint {
348        index: usize,
349        checkpoint_id: String,
350    },
351    EmptyInvariantName {
352        index: usize,
353    },
354    EmptyInvariantDetail {
355        index: usize,
356    },
357}
358
359impl fmt::Display for CrashReplayValidationError {
360    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361        match self {
362            Self::UnsupportedSchemaVersion { expected, actual } => {
363                write!(
364                    f,
365                    "unsupported crash replay schema version {actual}; expected {expected}"
366                )
367            }
368            Self::EmptyScenarioId => write!(f, "crash replay scenario_id cannot be empty"),
369            Self::EmptyStateMachine => write!(f, "crash replay state_machine cannot be empty"),
370            Self::NoCheckpoints => write!(f, "crash replay report must include checkpoints"),
371            Self::EmptyCheckpointId { index } => {
372                write!(f, "crash replay checkpoint #{index} has an empty id")
373            }
374            Self::EmptyCheckpointDescription { index } => write!(
375                f,
376                "crash replay checkpoint #{index} has an empty description"
377            ),
378            Self::DuplicateCheckpointId {
379                index,
380                checkpoint_id,
381            } => write!(
382                f,
383                "crash replay checkpoint #{index} duplicates checkpoint id {checkpoint_id}"
384            ),
385            Self::NonMonotoneCheckpointOrdinal {
386                index,
387                previous,
388                current,
389            } => write!(
390                f,
391                "crash replay checkpoint #{index} ordinal {current} must be greater than previous ordinal {previous}"
392            ),
393            Self::CleanReportWithoutInvariants => {
394                write!(f, "clean crash replay report must include invariants")
395            }
396            Self::CleanReportWithoutEvents => {
397                write!(f, "clean crash replay report must include events")
398            }
399            Self::CleanReportContainsFailure => {
400                write!(
401                    f,
402                    "clean crash replay report contains failed events or invariants"
403                )
404            }
405            Self::CleanReportMissingCheckpointEvent { checkpoint_id } => write!(
406                f,
407                "clean crash replay report has no successful invariant-check event for checkpoint {checkpoint_id}"
408            ),
409            Self::CleanReportMissingCheckpointInvariant { checkpoint_id } => write!(
410                f,
411                "clean crash replay report has no passing invariant for checkpoint {checkpoint_id}"
412            ),
413            Self::EmptyEventCheckpointId { index } => {
414                write!(f, "crash replay event #{index} has an empty checkpoint id")
415            }
416            Self::UnknownEventCheckpoint {
417                index,
418                checkpoint_id,
419            } => write!(
420                f,
421                "crash replay event #{index} references unknown checkpoint {checkpoint_id}"
422            ),
423            Self::EmptyEventDetail { index } => {
424                write!(f, "crash replay event #{index} has an empty detail")
425            }
426            Self::EmptyInvariantCheckpointId { index } => write!(
427                f,
428                "crash replay invariant #{index} has an empty checkpoint id"
429            ),
430            Self::UnknownInvariantCheckpoint {
431                index,
432                checkpoint_id,
433            } => write!(
434                f,
435                "crash replay invariant #{index} references unknown checkpoint {checkpoint_id}"
436            ),
437            Self::EmptyInvariantName { index } => {
438                write!(f, "crash replay invariant #{index} has an empty name")
439            }
440            Self::EmptyInvariantDetail { index } => {
441                write!(f, "crash replay invariant #{index} has an empty detail")
442            }
443        }
444    }
445}
446
447impl Error for CrashReplayValidationError {}
448
449#[derive(Debug)]
450pub enum CrashReplayIoError {
451    Io(io::Error),
452    Json(serde_json::Error),
453    Validation(CrashReplayValidationError),
454}
455
456impl fmt::Display for CrashReplayIoError {
457    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
458        match self {
459            Self::Io(err) => write!(f, "crash replay I/O error: {err}"),
460            Self::Json(err) => write!(f, "crash replay JSON error: {err}"),
461            Self::Validation(err) => write!(f, "crash replay validation error: {err}"),
462        }
463    }
464}
465
466impl Error for CrashReplayIoError {
467    fn source(&self) -> Option<&(dyn Error + 'static)> {
468        match self {
469            Self::Io(err) => Some(err),
470            Self::Json(err) => Some(err),
471            Self::Validation(err) => Some(err),
472        }
473    }
474}
475
476impl From<io::Error> for CrashReplayIoError {
477    fn from(err: io::Error) -> Self {
478        Self::Io(err)
479    }
480}
481
482impl From<serde_json::Error> for CrashReplayIoError {
483    fn from(err: serde_json::Error) -> Self {
484        Self::Json(err)
485    }
486}
487
488impl From<CrashReplayValidationError> for CrashReplayIoError {
489    fn from(err: CrashReplayValidationError) -> Self {
490        Self::Validation(err)
491    }
492}
493
494pub fn replay_named_checkpoints<S, MakeState, Advance, Restart, Check>(
495    scenario_id: impl Into<String>,
496    state_machine: impl Into<String>,
497    mut checkpoints: Vec<CrashReplayCheckpoint>,
498    mut make_state: MakeState,
499    mut advance_to_checkpoint: Advance,
500    mut restart: Restart,
501    mut check_invariants: Check,
502) -> CrashReplayReport
503where
504    MakeState: FnMut() -> Result<S, CrashReplayError>,
505    Advance: FnMut(&mut S, &CrashReplayCheckpoint) -> Result<(), CrashReplayError>,
506    Restart: FnMut(&mut S) -> Result<(), CrashReplayError>,
507    Check: FnMut(&S, &CrashReplayCheckpoint) -> Vec<CrashReplayInvariant>,
508{
509    checkpoints.sort_by_key(|checkpoint| checkpoint.ordinal);
510    let mut report = CrashReplayReport {
511        schema_version: CRASH_REPLAY_SCHEMA_VERSION.to_string(),
512        scenario_id: scenario_id.into(),
513        state_machine: state_machine.into(),
514        verdict: CrashReplayVerdict::Clean,
515        checkpoints: checkpoints.clone(),
516        events: Vec::new(),
517        invariants: Vec::new(),
518    };
519
520    if checkpoints.is_empty() {
521        report.verdict = CrashReplayVerdict::Failed;
522        return report;
523    }
524
525    for checkpoint in checkpoints {
526        let mut state = match make_state() {
527            Ok(state) => state,
528            Err(err) => {
529                report.verdict = CrashReplayVerdict::Failed;
530                report.events.push(CrashReplayEvent::failed(
531                    &checkpoint,
532                    CrashReplayPhase::AdvanceToCheckpoint,
533                    format!("failed creating fresh state: {err}"),
534                ));
535                continue;
536            }
537        };
538
539        match advance_to_checkpoint(&mut state, &checkpoint) {
540            Ok(()) => report.events.push(CrashReplayEvent::ok(
541                &checkpoint,
542                CrashReplayPhase::AdvanceToCheckpoint,
543                "advanced to checkpoint",
544            )),
545            Err(err) => {
546                report.verdict = CrashReplayVerdict::Failed;
547                report.events.push(CrashReplayEvent::failed(
548                    &checkpoint,
549                    CrashReplayPhase::AdvanceToCheckpoint,
550                    err.to_string(),
551                ));
552                continue;
553            }
554        }
555
556        report.events.push(CrashReplayEvent::ok(
557            &checkpoint,
558            CrashReplayPhase::InjectCrash,
559            "simulated process stop at named checkpoint",
560        ));
561
562        match restart(&mut state) {
563            Ok(()) => report.events.push(CrashReplayEvent::ok(
564                &checkpoint,
565                CrashReplayPhase::Restart,
566                "restart action completed",
567            )),
568            Err(err) => {
569                report.verdict = CrashReplayVerdict::Failed;
570                report.events.push(CrashReplayEvent::failed(
571                    &checkpoint,
572                    CrashReplayPhase::Restart,
573                    err.to_string(),
574                ));
575                continue;
576            }
577        }
578
579        let invariants = check_invariants(&state, &checkpoint);
580        if invariants.is_empty() {
581            report.verdict = CrashReplayVerdict::Failed;
582            report.events.push(CrashReplayEvent::failed(
583                &checkpoint,
584                CrashReplayPhase::CheckInvariants,
585                "checkpoint produced no invariants",
586            ));
587            continue;
588        }
589
590        let failed = invariants.iter().any(|invariant| !invariant.passed);
591        if failed {
592            report.verdict = CrashReplayVerdict::Failed;
593        }
594        report.events.push(CrashReplayEvent {
595            checkpoint_id: checkpoint.id.clone(),
596            phase: CrashReplayPhase::CheckInvariants,
597            ok: !failed,
598            detail: format!("{} invariant(s) evaluated", invariants.len()),
599        });
600        report.invariants.extend(invariants);
601    }
602
603    report
604}
605
606#[cfg(test)]
607mod tests {
608    use super::*;
609    use crate::policy_registry::{
610        PolicyControllerStatus, PolicyFallbackState, policy_registry_snapshot,
611    };
612    use crate::search::policy::{
613        CHUNKING_STRATEGY_VERSION, SEMANTIC_SCHEMA_VERSION, SemanticPolicy,
614    };
615    use crate::search::semantic_manifest::{
616        ArtifactRecord, BuildCheckpoint, SemanticManifest, TierKind,
617    };
618    use serde_json::{Value, json};
619    use std::path::PathBuf;
620    use tempfile::TempDir;
621
622    #[derive(Debug)]
623    struct SemanticReplayState {
624        temp_dir: TempDir,
625        loaded: Option<SemanticManifest>,
626    }
627
628    impl SemanticReplayState {
629        fn data_dir(&self) -> &Path {
630            self.temp_dir.path()
631        }
632    }
633
634    fn semantic_checkpoint() -> BuildCheckpoint {
635        BuildCheckpoint {
636            tier: TierKind::Fast,
637            embedder_id: "fnv1a-384".to_string(),
638            last_offset: 8,
639            docs_embedded: 13,
640            conversations_processed: 2,
641            total_conversations: 5,
642            db_fingerprint: "semantic-fp".to_string(),
643            schema_version: SEMANTIC_SCHEMA_VERSION,
644            chunking_version: CHUNKING_STRATEGY_VERSION,
645            saved_at_ms: 1_700_000_000_000,
646        }
647    }
648
649    fn semantic_artifact() -> ArtifactRecord {
650        ArtifactRecord {
651            tier: TierKind::Fast,
652            embedder_id: "fnv1a-384".to_string(),
653            model_revision: "hash".to_string(),
654            schema_version: SEMANTIC_SCHEMA_VERSION,
655            chunking_version: CHUNKING_STRATEGY_VERSION,
656            dimension: 384,
657            doc_count: 13,
658            conversation_count: 5,
659            db_fingerprint: "semantic-fp".to_string(),
660            index_path: "vector_index/fast.fsvi".to_string(),
661            size_bytes: 4096,
662            started_at_ms: 1_700_000_000_000,
663            completed_at_ms: 1_700_000_060_000,
664            ready: true,
665        }
666    }
667
668    #[test]
669    fn semantic_manifest_state_machine_replays_checkpoint_and_publish_crashes() {
670        let checkpoints = vec![
671            CrashReplayCheckpoint::new(
672                10,
673                "semantic_checkpoint_saved",
674                "semantic checkpoint persisted before artifact publish",
675            ),
676            CrashReplayCheckpoint::new(
677                20,
678                "semantic_artifact_published",
679                "semantic artifact published and checkpoint cleared",
680            ),
681        ];
682
683        let report =
684            replay_named_checkpoints(
685                "semantic-manifest-save-restart",
686                "semantic_manifest",
687                checkpoints,
688                || {
689                    Ok(SemanticReplayState {
690                        temp_dir: tempfile::tempdir()
691                            .map_err(|err| CrashReplayError::from_error("create tempdir", err))?,
692                        loaded: None,
693                    })
694                },
695                |state, checkpoint| {
696                    let mut manifest = SemanticManifest::default();
697                    manifest.refresh_backlog(5, "semantic-fp");
698                    manifest.save_checkpoint(semantic_checkpoint());
699                    if checkpoint.id == "semantic_artifact_published" {
700                        manifest.publish_artifact(semantic_artifact());
701                    }
702                    manifest
703                        .save(state.data_dir())
704                        .map_err(|err| CrashReplayError::from_error("save semantic manifest", err))
705                },
706                |state| {
707                    state.loaded = SemanticManifest::load(state.data_dir()).map_err(|err| {
708                        CrashReplayError::from_error("load semantic manifest", err)
709                    })?;
710                    Ok(())
711                },
712                |state, checkpoint| {
713                    let mut invariants = Vec::new();
714                    let Some(manifest) = &state.loaded else {
715                        return vec![CrashReplayInvariant::failed(
716                            checkpoint,
717                            "semantic_manifest_loaded",
718                            "manifest did not load after restart",
719                        )];
720                    };
721
722                    invariants.push(CrashReplayInvariant::passed(
723                        checkpoint,
724                        "semantic_manifest_loaded",
725                        "manifest loaded after restart",
726                    ));
727                    match checkpoint.id.as_str() {
728                        "semantic_checkpoint_saved" => {
729                            invariants.push(if manifest.checkpoint.is_some()
730                            && manifest.fast_tier.is_none()
731                        {
732                            CrashReplayInvariant::passed(
733                                checkpoint,
734                                "checkpoint_without_torn_artifact",
735                                "restart sees resumable checkpoint and no half-published artifact",
736                            )
737                        } else {
738                            CrashReplayInvariant::failed(
739                                checkpoint,
740                                "checkpoint_without_torn_artifact",
741                                format!(
742                                    "checkpoint={:?} fast_tier={:?}",
743                                    manifest.checkpoint, manifest.fast_tier
744                                ),
745                            )
746                        });
747                        }
748                        "semantic_artifact_published" => {
749                            invariants.push(if manifest.checkpoint.is_none()
750                            && manifest.fast_tier.as_ref().is_some_and(|artifact| artifact.ready)
751                        {
752                            CrashReplayInvariant::passed(
753                                checkpoint,
754                                "published_artifact_clears_checkpoint",
755                                "restart sees ready artifact and no stale matching checkpoint",
756                            )
757                        } else {
758                            CrashReplayInvariant::failed(
759                                checkpoint,
760                                "published_artifact_clears_checkpoint",
761                                format!(
762                                    "checkpoint={:?} fast_tier={:?}",
763                                    manifest.checkpoint, manifest.fast_tier
764                                ),
765                            )
766                        });
767                        }
768                        _ => invariants.push(CrashReplayInvariant::failed(
769                            checkpoint,
770                            "known_checkpoint",
771                            "unexpected semantic checkpoint",
772                        )),
773                    }
774                    invariants
775                },
776            );
777
778        assert_eq!(report.verdict, CrashReplayVerdict::Clean);
779        assert_eq!(report.checkpoints.len(), 2);
780        assert_eq!(report.invariants.len(), 4);
781        assert!(
782            report.validate().is_ok(),
783            "semantic replay report should validate: {report:?}"
784        );
785    }
786
787    #[derive(Debug)]
788    struct PolicyReplayState {
789        pipeline: Value,
790        semantic_available: bool,
791        semantic_fallback_mode: Option<&'static str>,
792        snapshot_statuses: Vec<(String, PolicyControllerStatus, PolicyFallbackState)>,
793    }
794
795    fn policy_pipeline_fixture(mode: &str, reason: &str) -> Value {
796        json!({
797            "pipeline_channel_size": 128,
798            "pipeline_max_message_bytes_in_flight": 1048576,
799            "page_prep_workers": 12,
800            "staged_merge_workers": 4,
801            "staged_shard_builders": 8,
802            "controller_mode": "auto",
803            "controller_restore_clear_samples": 3,
804            "controller_restore_hold_ms": 5000,
805            "controller_loadavg_high_watermark_1m": 1.75,
806            "controller_loadavg_low_watermark_1m": 0.75,
807            "runtime": {
808                "controller_mode": mode,
809                "controller_reason": reason
810            }
811        })
812    }
813
814    #[test]
815    fn policy_registry_state_machine_replays_deterministic_controller_snapshots() {
816        let checkpoints = vec![
817            CrashReplayCheckpoint::new(
818                10,
819                "semantic_fallback_snapshot",
820                "semantic controller reports lexical fallback",
821            ),
822            CrashReplayCheckpoint::new(
823                20,
824                "lexical_throttle_snapshot",
825                "lexical rebuild controller reports pressure fallback",
826            ),
827        ];
828
829        let report = replay_named_checkpoints(
830            "policy-registry-recompute-restart",
831            "policy_registry",
832            checkpoints,
833            || {
834                Ok(PolicyReplayState {
835                    pipeline: policy_pipeline_fixture("steady", "pipeline settings active"),
836                    semantic_available: true,
837                    semantic_fallback_mode: None,
838                    snapshot_statuses: Vec::new(),
839                })
840            },
841            |state, checkpoint| {
842                match checkpoint.id.as_str() {
843                    "semantic_fallback_snapshot" => {
844                        state.semantic_available = false;
845                        state.semantic_fallback_mode = Some("lexical");
846                    }
847                    "lexical_throttle_snapshot" => {
848                        state.pipeline =
849                            policy_pipeline_fixture("throttled", "load pressure reduced workers");
850                    }
851                    _ => {
852                        return Err(CrashReplayError::new(
853                            "advance policy checkpoint",
854                            "unknown checkpoint",
855                        ));
856                    }
857                }
858                Ok(())
859            },
860            |state| {
861                let policy = SemanticPolicy::compiled_defaults();
862                let snapshot = policy_registry_snapshot(
863                    &policy,
864                    state.semantic_available,
865                    state.semantic_fallback_mode,
866                    &state.pipeline,
867                );
868                state.snapshot_statuses = snapshot
869                    .controllers
870                    .into_iter()
871                    .map(|controller| {
872                        (
873                            controller.controller_id,
874                            controller.status,
875                            controller.fallback_state,
876                        )
877                    })
878                    .collect();
879                Ok(())
880            },
881            |state, checkpoint| {
882                let ids: Vec<_> = state
883                    .snapshot_statuses
884                    .iter()
885                    .map(|(id, _, _)| id.as_str())
886                    .collect();
887                let mut invariants =
888                    vec![if ids == ["lexical_rebuild_pipeline", "semantic_search"] {
889                        CrashReplayInvariant::passed(
890                            checkpoint,
891                            "controller_ids_sorted",
892                            "controller ids are deterministic and sorted",
893                        )
894                    } else {
895                        CrashReplayInvariant::failed(
896                            checkpoint,
897                            "controller_ids_sorted",
898                            format!("unexpected controller ids: {ids:?}"),
899                        )
900                    }];
901
902                let expected_controller = match checkpoint.id.as_str() {
903                    "semantic_fallback_snapshot" => "semantic_search",
904                    "lexical_throttle_snapshot" => "lexical_rebuild_pipeline",
905                    _ => "unknown",
906                };
907                let controller = state
908                    .snapshot_statuses
909                    .iter()
910                    .find(|(id, _, _)| id == expected_controller);
911                invariants.push(match controller {
912                    Some((
913                        _id,
914                        PolicyControllerStatus::Fallback,
915                        PolicyFallbackState::Conservative,
916                    )) => CrashReplayInvariant::passed(
917                        checkpoint,
918                        "conservative_fallback_reported",
919                        "checkpoint recompute reports conservative fallback",
920                    ),
921                    other => CrashReplayInvariant::failed(
922                        checkpoint,
923                        "conservative_fallback_reported",
924                        format!("unexpected controller status: {other:?}"),
925                    ),
926                });
927                invariants
928            },
929        );
930
931        assert_eq!(report.verdict, CrashReplayVerdict::Clean);
932        assert!(
933            report.validate().is_ok(),
934            "policy replay report should validate: {report:?}"
935        );
936    }
937
938    #[derive(Debug)]
939    struct LexicalPublishFixtureState {
940        temp_dir: TempDir,
941        live_path: PathBuf,
942        staged_path: PathBuf,
943        backup_path: PathBuf,
944    }
945
946    impl LexicalPublishFixtureState {
947        fn new() -> Result<Self, CrashReplayError> {
948            let temp_dir = tempfile::tempdir()
949                .map_err(|err| CrashReplayError::from_error("create tempdir", err))?;
950            let live_path = temp_dir.path().join("live-generation.txt");
951            let staged_path = temp_dir.path().join("staged-generation.txt");
952            let backup_path = temp_dir.path().join("live-generation.bak");
953            fs::write(&live_path, "old-generation")
954                .map_err(|err| CrashReplayError::from_error("seed live generation", err))?;
955            Ok(Self {
956                temp_dir,
957                live_path,
958                staged_path,
959                backup_path,
960            })
961        }
962
963        fn write_staged(&self) -> Result<(), CrashReplayError> {
964            fs::write(&self.staged_path, "new-generation")
965                .map_err(|err| CrashReplayError::from_error("write staged generation", err))
966        }
967
968        fn park_live(&self) -> Result<(), CrashReplayError> {
969            fs::rename(&self.live_path, &self.backup_path)
970                .map_err(|err| CrashReplayError::from_error("park live generation", err))
971        }
972
973        fn publish_staged(&self) -> Result<(), CrashReplayError> {
974            fs::rename(&self.staged_path, &self.live_path)
975                .map_err(|err| CrashReplayError::from_error("publish staged generation", err))
976        }
977    }
978
979    #[test]
980    fn lexical_publish_fixture_replays_park_and_swap_crash_windows() {
981        let checkpoints = vec![
982            CrashReplayCheckpoint::new(
983                10,
984                "staged_written",
985                "staged generation exists before live path is touched",
986            ),
987            CrashReplayCheckpoint::new(
988                20,
989                "live_parked",
990                "live generation has been parked but staged is not yet live",
991            ),
992            CrashReplayCheckpoint::new(
993                30,
994                "staged_published",
995                "staged generation has been promoted to live",
996            ),
997        ];
998
999        let report = replay_named_checkpoints(
1000            "lexical-publish-fixture-restart",
1001            "lexical_publish",
1002            checkpoints,
1003            LexicalPublishFixtureState::new,
1004            |state, checkpoint| {
1005                state.write_staged()?;
1006                match checkpoint.id.as_str() {
1007                    "staged_written" => {}
1008                    "live_parked" => {
1009                        state.park_live()?;
1010                    }
1011                    "staged_published" => {
1012                        state.park_live()?;
1013                        state.publish_staged()?;
1014                    }
1015                    _ => {
1016                        return Err(CrashReplayError::new(
1017                            "advance lexical publish checkpoint",
1018                            "unknown checkpoint",
1019                        ));
1020                    }
1021                }
1022                Ok(())
1023            },
1024            |state| {
1025                if !state.live_path.exists() && state.backup_path.exists() {
1026                    fs::rename(&state.backup_path, &state.live_path)
1027                        .map_err(|err| CrashReplayError::from_error("restore parked live", err))?;
1028                }
1029                Ok(())
1030            },
1031            |state, checkpoint| {
1032                let live = fs::read_to_string(&state.live_path).ok();
1033                let expected = match checkpoint.id.as_str() {
1034                    "staged_written" | "live_parked" => "old-generation",
1035                    "staged_published" => "new-generation",
1036                    _ => "unknown",
1037                };
1038
1039                vec![
1040                    if state.temp_dir.path().exists() {
1041                        CrashReplayInvariant::passed(
1042                            checkpoint,
1043                            "fixture_root_retained",
1044                            "fixture root remains available for artifact inspection",
1045                        )
1046                    } else {
1047                        CrashReplayInvariant::failed(
1048                            checkpoint,
1049                            "fixture_root_retained",
1050                            "fixture root disappeared before invariant checks",
1051                        )
1052                    },
1053                    if live.as_deref() == Some(expected) {
1054                        CrashReplayInvariant::passed(
1055                            checkpoint,
1056                            "live_generation_is_old_or_new",
1057                            format!("live generation recovered as {expected}"),
1058                        )
1059                    } else {
1060                        CrashReplayInvariant::failed(
1061                            checkpoint,
1062                            "live_generation_is_old_or_new",
1063                            format!("expected {expected}, got {live:?}"),
1064                        )
1065                    },
1066                ]
1067            },
1068        );
1069
1070        assert_eq!(report.verdict, CrashReplayVerdict::Clean);
1071        assert!(
1072            report.validate().is_ok(),
1073            "lexical publish replay report should validate: {report:?}"
1074        );
1075    }
1076
1077    #[derive(Debug)]
1078    struct BackupRecoveryFixtureState {
1079        temp_dir: TempDir,
1080        canonical_db: PathBuf,
1081        backup_dir: PathBuf,
1082        manifest: Option<Value>,
1083    }
1084
1085    impl BackupRecoveryFixtureState {
1086        fn new() -> Result<Self, CrashReplayError> {
1087            let temp_dir = tempfile::tempdir()
1088                .map_err(|err| CrashReplayError::from_error("create tempdir", err))?;
1089            let canonical_db = temp_dir.path().join("cass.db");
1090            let backup_dir = temp_dir.path().join("backup");
1091            fs::write(&canonical_db, "canonical-main")
1092                .map_err(|err| CrashReplayError::from_error("seed canonical db", err))?;
1093            fs::write(temp_dir.path().join("cass.db-wal"), "canonical-wal")
1094                .map_err(|err| CrashReplayError::from_error("seed canonical wal", err))?;
1095            fs::create_dir_all(&backup_dir)
1096                .map_err(|err| CrashReplayError::from_error("create backup dir", err))?;
1097            Ok(Self {
1098                temp_dir,
1099                canonical_db,
1100                backup_dir,
1101                manifest: None,
1102            })
1103        }
1104
1105        fn copy_main(&self) -> Result<(), CrashReplayError> {
1106            fs::copy(&self.canonical_db, self.backup_dir.join("cass.db"))
1107                .map(|_| ())
1108                .map_err(|err| CrashReplayError::from_error("copy backup main", err))
1109        }
1110
1111        fn copy_wal_and_manifest(&self) -> Result<(), CrashReplayError> {
1112            fs::copy(
1113                self.temp_dir.path().join("cass.db-wal"),
1114                self.backup_dir.join("cass.db-wal"),
1115            )
1116            .map_err(|err| CrashReplayError::from_error("copy backup wal", err))?;
1117            let manifest = json!({
1118                "schema_version": 1,
1119                "complete": true,
1120                "files": ["cass.db", "cass.db-wal"],
1121            });
1122            let bytes = serde_json::to_vec_pretty(&manifest)
1123                .map_err(|err| CrashReplayError::from_error("encode backup manifest", err))?;
1124            fs::write(self.backup_dir.join("manifest.json"), bytes)
1125                .map_err(|err| CrashReplayError::from_error("write backup manifest", err))
1126        }
1127    }
1128
1129    #[test]
1130    fn backup_recovery_fixture_replays_incomplete_and_complete_bundle_crashes() {
1131        let checkpoints = vec![
1132            CrashReplayCheckpoint::new(
1133                10,
1134                "backup_main_copied",
1135                "backup main file copied before bundle manifest exists",
1136            ),
1137            CrashReplayCheckpoint::new(
1138                20,
1139                "backup_manifest_written",
1140                "backup sidecars and manifest mark the bundle complete",
1141            ),
1142        ];
1143
1144        let report = replay_named_checkpoints(
1145            "backup-recovery-fixture-restart",
1146            "backup_recovery",
1147            checkpoints,
1148            BackupRecoveryFixtureState::new,
1149            |state, checkpoint| {
1150                state.copy_main()?;
1151                match checkpoint.id.as_str() {
1152                    "backup_main_copied" => {}
1153                    "backup_manifest_written" => {
1154                        state.copy_wal_and_manifest()?;
1155                    }
1156                    _ => {
1157                        return Err(CrashReplayError::new(
1158                            "advance backup recovery checkpoint",
1159                            "unknown checkpoint",
1160                        ));
1161                    }
1162                }
1163                Ok(())
1164            },
1165            |state| {
1166                let manifest_path = state.backup_dir.join("manifest.json");
1167                state.manifest = if manifest_path.exists() {
1168                    let bytes = fs::read(&manifest_path)
1169                        .map_err(|err| CrashReplayError::from_error("read backup manifest", err))?;
1170                    Some(serde_json::from_slice(&bytes).map_err(|err| {
1171                        CrashReplayError::from_error("parse backup manifest", err)
1172                    })?)
1173                } else {
1174                    None
1175                };
1176                Ok(())
1177            },
1178            |state, checkpoint| {
1179                let canonical = fs::read_to_string(&state.canonical_db).ok();
1180                let mut invariants = vec![if canonical.as_deref() == Some("canonical-main") {
1181                    CrashReplayInvariant::passed(
1182                        checkpoint,
1183                        "canonical_db_preserved",
1184                        "restart did not replace the canonical DB from an incomplete backup",
1185                    )
1186                } else {
1187                    CrashReplayInvariant::failed(
1188                        checkpoint,
1189                        "canonical_db_preserved",
1190                        format!("unexpected canonical DB content: {canonical:?}"),
1191                    )
1192                }];
1193
1194                match checkpoint.id.as_str() {
1195                    "backup_main_copied" => {
1196                        invariants.push(if state.manifest.is_none() {
1197                            CrashReplayInvariant::passed(
1198                                checkpoint,
1199                                "partial_backup_not_marked_complete",
1200                                "main-only backup has no manifest and is not advertised recoverable",
1201                            )
1202                        } else {
1203                            CrashReplayInvariant::failed(
1204                                checkpoint,
1205                                "partial_backup_not_marked_complete",
1206                                format!("unexpected manifest: {:?}", state.manifest),
1207                            )
1208                        });
1209                    }
1210                    "backup_manifest_written" => {
1211                        let complete = state
1212                            .manifest
1213                            .as_ref()
1214                            .and_then(|manifest| manifest.get("complete"))
1215                            .and_then(Value::as_bool)
1216                            == Some(true);
1217                        let files_match = state
1218                            .manifest
1219                            .as_ref()
1220                            .and_then(|manifest| manifest.get("files"))
1221                            .and_then(Value::as_array)
1222                            .map(|files| {
1223                                let mut names = files.iter().filter_map(Value::as_str);
1224                                matches!(
1225                                    (names.next(), names.next(), names.next()),
1226                                    (Some("cass.db"), Some("cass.db-wal"), None)
1227                                )
1228                            })
1229                            == Some(true);
1230                        let wal_exists = state.backup_dir.join("cass.db-wal").exists();
1231                        invariants.push(if complete && files_match && wal_exists {
1232                            CrashReplayInvariant::passed(
1233                                checkpoint,
1234                                "complete_backup_manifest_matches_sidecars",
1235                                "complete manifest is present only with expected sidecars",
1236                            )
1237                        } else {
1238                            CrashReplayInvariant::failed(
1239                                checkpoint,
1240                                "complete_backup_manifest_matches_sidecars",
1241                                format!(
1242                                    "complete={complete} files_match={files_match} wal_exists={wal_exists}"
1243                                ),
1244                            )
1245                        });
1246                    }
1247                    _ => invariants.push(CrashReplayInvariant::failed(
1248                        checkpoint,
1249                        "known_backup_checkpoint",
1250                        "unexpected backup checkpoint",
1251                    )),
1252                }
1253                invariants
1254            },
1255        );
1256
1257        assert_eq!(report.verdict, CrashReplayVerdict::Clean);
1258        assert!(
1259            report.validate().is_ok(),
1260            "backup recovery replay report should validate: {report:?}"
1261        );
1262    }
1263
1264    #[test]
1265    fn crash_replay_report_round_trips_as_artifact_manifest()
1266    -> Result<(), Box<dyn std::error::Error>> {
1267        let temp_dir = tempfile::tempdir()?;
1268        let path = temp_dir
1269            .path()
1270            .join("artifacts/crash-replay/crash-replay-report.json");
1271        let checkpoints = vec![CrashReplayCheckpoint::new(
1272            1,
1273            "only_checkpoint",
1274            "single checkpoint for artifact round-trip",
1275        )];
1276        let report = replay_named_checkpoints(
1277            "artifact-round-trip",
1278            "harness",
1279            checkpoints,
1280            || Ok(()),
1281            |_state, _checkpoint| Ok(()),
1282            |_state| Ok(()),
1283            |_state, checkpoint| {
1284                vec![CrashReplayInvariant::passed(
1285                    checkpoint,
1286                    "round_trip_invariant",
1287                    "round-trip invariant passed",
1288                )]
1289            },
1290        );
1291
1292        report.save_json(&path)?;
1293        let loaded = CrashReplayReport::load_json(&path)?;
1294
1295        assert_eq!(loaded, report);
1296        Ok(())
1297    }
1298
1299    #[test]
1300    fn crash_replay_validation_rejects_untrustworthy_clean_reports() {
1301        let checkpoint = CrashReplayCheckpoint::new(1, "checkpoint", "validation checkpoint");
1302        let report = CrashReplayReport {
1303            schema_version: CRASH_REPLAY_SCHEMA_VERSION.to_string(),
1304            scenario_id: "bad-clean-report".to_string(),
1305            state_machine: "harness".to_string(),
1306            verdict: CrashReplayVerdict::Clean,
1307            checkpoints: vec![checkpoint.clone()],
1308            events: vec![CrashReplayEvent {
1309                checkpoint_id: checkpoint.id.clone(),
1310                phase: CrashReplayPhase::CheckInvariants,
1311                ok: true,
1312                detail: "checked".to_string(),
1313            }],
1314            invariants: vec![CrashReplayInvariant::failed(
1315                &checkpoint,
1316                "must_not_fail",
1317                "intentional validation failure",
1318            )],
1319        };
1320
1321        assert!(matches!(
1322            report.validate(),
1323            Err(CrashReplayValidationError::CleanReportContainsFailure)
1324        ));
1325
1326        let duplicate_checkpoint = CrashReplayCheckpoint {
1327            ordinal: 2,
1328            ..checkpoint.clone()
1329        };
1330        let duplicate_report = CrashReplayReport {
1331            checkpoints: vec![checkpoint.clone(), duplicate_checkpoint],
1332            ..report.clone()
1333        };
1334        assert!(matches!(
1335            duplicate_report.validate(),
1336            Err(CrashReplayValidationError::DuplicateCheckpointId { .. })
1337        ));
1338
1339        let missing_check_event_report = CrashReplayReport {
1340            events: vec![CrashReplayEvent {
1341                checkpoint_id: checkpoint.id.clone(),
1342                phase: CrashReplayPhase::AdvanceToCheckpoint,
1343                ok: true,
1344                detail: "advanced".to_string(),
1345            }],
1346            invariants: vec![CrashReplayInvariant::passed(
1347                &checkpoint,
1348                "passing_but_unchecked",
1349                "invariant exists but no check event proves it ran",
1350            )],
1351            ..report
1352        };
1353        assert!(matches!(
1354            missing_check_event_report.validate(),
1355            Err(CrashReplayValidationError::CleanReportMissingCheckpointEvent { .. })
1356        ));
1357    }
1358}