1use serde::{Deserialize, Serialize};
9use std::collections::BTreeSet;
10use std::error::Error;
11use std::fmt;
12use std::fs;
13use std::io;
14use std::path::Path;
15
16pub const CRASH_REPLAY_SCHEMA_VERSION: &str = "1";
17
18#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
19pub struct CrashReplayCheckpoint {
20 pub id: String,
21 pub ordinal: u32,
22 pub description: String,
23}
24
25impl CrashReplayCheckpoint {
26 pub fn new(ordinal: u32, id: impl Into<String>, description: impl Into<String>) -> Self {
27 Self {
28 id: id.into(),
29 ordinal,
30 description: description.into(),
31 }
32 }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum CrashReplayPhase {
38 AdvanceToCheckpoint,
39 InjectCrash,
40 Restart,
41 CheckInvariants,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45pub struct CrashReplayEvent {
46 pub checkpoint_id: String,
47 pub phase: CrashReplayPhase,
48 pub ok: bool,
49 pub detail: String,
50}
51
52impl CrashReplayEvent {
53 fn ok(
54 checkpoint: &CrashReplayCheckpoint,
55 phase: CrashReplayPhase,
56 detail: impl Into<String>,
57 ) -> Self {
58 Self {
59 checkpoint_id: checkpoint.id.clone(),
60 phase,
61 ok: true,
62 detail: detail.into(),
63 }
64 }
65
66 fn failed(
67 checkpoint: &CrashReplayCheckpoint,
68 phase: CrashReplayPhase,
69 detail: impl Into<String>,
70 ) -> Self {
71 Self {
72 checkpoint_id: checkpoint.id.clone(),
73 phase,
74 ok: false,
75 detail: detail.into(),
76 }
77 }
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct CrashReplayInvariant {
82 pub checkpoint_id: String,
83 pub name: String,
84 pub passed: bool,
85 pub detail: String,
86}
87
88impl CrashReplayInvariant {
89 pub fn passed(
90 checkpoint: &CrashReplayCheckpoint,
91 name: impl Into<String>,
92 detail: impl Into<String>,
93 ) -> Self {
94 Self {
95 checkpoint_id: checkpoint.id.clone(),
96 name: name.into(),
97 passed: true,
98 detail: detail.into(),
99 }
100 }
101
102 pub fn failed(
103 checkpoint: &CrashReplayCheckpoint,
104 name: impl Into<String>,
105 detail: impl Into<String>,
106 ) -> Self {
107 Self {
108 checkpoint_id: checkpoint.id.clone(),
109 name: name.into(),
110 passed: false,
111 detail: detail.into(),
112 }
113 }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
117#[serde(rename_all = "snake_case")]
118pub enum CrashReplayVerdict {
119 Clean,
120 Failed,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
124pub struct CrashReplayReport {
125 pub schema_version: String,
126 pub scenario_id: String,
127 pub state_machine: String,
128 pub verdict: CrashReplayVerdict,
129 pub checkpoints: Vec<CrashReplayCheckpoint>,
130 pub events: Vec<CrashReplayEvent>,
131 pub invariants: Vec<CrashReplayInvariant>,
132}
133
134impl CrashReplayReport {
135 pub fn validate(&self) -> Result<(), CrashReplayValidationError> {
136 if self.schema_version != CRASH_REPLAY_SCHEMA_VERSION {
137 return Err(CrashReplayValidationError::UnsupportedSchemaVersion {
138 expected: CRASH_REPLAY_SCHEMA_VERSION,
139 actual: self.schema_version.clone(),
140 });
141 }
142 if self.scenario_id.trim().is_empty() {
143 return Err(CrashReplayValidationError::EmptyScenarioId);
144 }
145 if self.state_machine.trim().is_empty() {
146 return Err(CrashReplayValidationError::EmptyStateMachine);
147 }
148 if self.checkpoints.is_empty() {
149 return Err(CrashReplayValidationError::NoCheckpoints);
150 }
151 if self.verdict == CrashReplayVerdict::Clean && self.invariants.is_empty() {
152 return Err(CrashReplayValidationError::CleanReportWithoutInvariants);
153 }
154
155 let mut checkpoint_ids = BTreeSet::new();
156 let mut previous_ordinal = None;
157 for (index, checkpoint) in self.checkpoints.iter().enumerate() {
158 if checkpoint.id.trim().is_empty() {
159 return Err(CrashReplayValidationError::EmptyCheckpointId { index });
160 }
161 if checkpoint.description.trim().is_empty() {
162 return Err(CrashReplayValidationError::EmptyCheckpointDescription { index });
163 }
164 if let Some(previous) = previous_ordinal
165 && checkpoint.ordinal <= previous
166 {
167 return Err(CrashReplayValidationError::NonMonotoneCheckpointOrdinal {
168 index,
169 previous,
170 current: checkpoint.ordinal,
171 });
172 }
173 if !checkpoint_ids.insert(checkpoint.id.as_str()) {
174 return Err(CrashReplayValidationError::DuplicateCheckpointId {
175 index,
176 checkpoint_id: checkpoint.id.clone(),
177 });
178 }
179 previous_ordinal = Some(checkpoint.ordinal);
180 }
181
182 let mut checked_checkpoints = BTreeSet::new();
183 for (index, event) in self.events.iter().enumerate() {
184 if event.checkpoint_id.trim().is_empty() {
185 return Err(CrashReplayValidationError::EmptyEventCheckpointId { index });
186 }
187 if !checkpoint_ids.contains(event.checkpoint_id.as_str()) {
188 return Err(CrashReplayValidationError::UnknownEventCheckpoint {
189 index,
190 checkpoint_id: event.checkpoint_id.clone(),
191 });
192 }
193 if event.detail.trim().is_empty() {
194 return Err(CrashReplayValidationError::EmptyEventDetail { index });
195 }
196 if event.ok && event.phase == CrashReplayPhase::CheckInvariants {
197 checked_checkpoints.insert(event.checkpoint_id.as_str());
198 }
199 }
200
201 let mut invariant_checkpoints = BTreeSet::new();
202 for (index, invariant) in self.invariants.iter().enumerate() {
203 if invariant.checkpoint_id.trim().is_empty() {
204 return Err(CrashReplayValidationError::EmptyInvariantCheckpointId { index });
205 }
206 if !checkpoint_ids.contains(invariant.checkpoint_id.as_str()) {
207 return Err(CrashReplayValidationError::UnknownInvariantCheckpoint {
208 index,
209 checkpoint_id: invariant.checkpoint_id.clone(),
210 });
211 }
212 if invariant.name.trim().is_empty() {
213 return Err(CrashReplayValidationError::EmptyInvariantName { index });
214 }
215 if invariant.detail.trim().is_empty() {
216 return Err(CrashReplayValidationError::EmptyInvariantDetail { index });
217 }
218 if invariant.passed {
219 invariant_checkpoints.insert(invariant.checkpoint_id.as_str());
220 }
221 }
222 if self.verdict == CrashReplayVerdict::Clean
223 && (self.events.iter().any(|event| !event.ok)
224 || self.invariants.iter().any(|invariant| !invariant.passed))
225 {
226 return Err(CrashReplayValidationError::CleanReportContainsFailure);
227 }
228 if self.verdict == CrashReplayVerdict::Clean {
229 if self.events.is_empty() {
230 return Err(CrashReplayValidationError::CleanReportWithoutEvents);
231 }
232 for checkpoint in &self.checkpoints {
233 if !checked_checkpoints.contains(checkpoint.id.as_str()) {
234 return Err(
235 CrashReplayValidationError::CleanReportMissingCheckpointEvent {
236 checkpoint_id: checkpoint.id.clone(),
237 },
238 );
239 }
240 if !invariant_checkpoints.contains(checkpoint.id.as_str()) {
241 return Err(
242 CrashReplayValidationError::CleanReportMissingCheckpointInvariant {
243 checkpoint_id: checkpoint.id.clone(),
244 },
245 );
246 }
247 }
248 }
249
250 Ok(())
251 }
252
253 pub fn save_json(&self, path: &Path) -> Result<(), CrashReplayIoError> {
254 self.validate()?;
255 if let Some(parent) = path
256 .parent()
257 .filter(|parent| !parent.as_os_str().is_empty())
258 {
259 fs::create_dir_all(parent)?;
260 }
261 let json = serde_json::to_vec_pretty(self)?;
262 fs::write(path, json)?;
263 Ok(())
264 }
265
266 pub fn load_json(path: &Path) -> Result<Self, CrashReplayIoError> {
267 let bytes = fs::read(path)?;
268 let report: Self = serde_json::from_slice(&bytes)?;
269 report.validate()?;
270 Ok(report)
271 }
272}
273
274#[derive(Debug, Clone, PartialEq, Eq)]
275pub struct CrashReplayError {
276 pub action: String,
277 pub detail: String,
278}
279
280impl CrashReplayError {
281 pub fn new(action: impl Into<String>, detail: impl Into<String>) -> Self {
282 Self {
283 action: action.into(),
284 detail: detail.into(),
285 }
286 }
287
288 pub fn from_error(action: impl Into<String>, error: impl fmt::Display) -> Self {
289 Self::new(action, error.to_string())
290 }
291}
292
293impl fmt::Display for CrashReplayError {
294 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
295 write!(f, "{}: {}", self.action, self.detail)
296 }
297}
298
299impl Error for CrashReplayError {}
300
301#[derive(Debug)]
302pub enum CrashReplayValidationError {
303 UnsupportedSchemaVersion {
304 expected: &'static str,
305 actual: String,
306 },
307 EmptyScenarioId,
308 EmptyStateMachine,
309 NoCheckpoints,
310 EmptyCheckpointId {
311 index: usize,
312 },
313 EmptyCheckpointDescription {
314 index: usize,
315 },
316 DuplicateCheckpointId {
317 index: usize,
318 checkpoint_id: String,
319 },
320 NonMonotoneCheckpointOrdinal {
321 index: usize,
322 previous: u32,
323 current: u32,
324 },
325 CleanReportWithoutInvariants,
326 CleanReportWithoutEvents,
327 CleanReportContainsFailure,
328 CleanReportMissingCheckpointEvent {
329 checkpoint_id: String,
330 },
331 CleanReportMissingCheckpointInvariant {
332 checkpoint_id: String,
333 },
334 EmptyEventCheckpointId {
335 index: usize,
336 },
337 UnknownEventCheckpoint {
338 index: usize,
339 checkpoint_id: String,
340 },
341 EmptyEventDetail {
342 index: usize,
343 },
344 EmptyInvariantCheckpointId {
345 index: usize,
346 },
347 UnknownInvariantCheckpoint {
348 index: usize,
349 checkpoint_id: String,
350 },
351 EmptyInvariantName {
352 index: usize,
353 },
354 EmptyInvariantDetail {
355 index: usize,
356 },
357}
358
359impl fmt::Display for CrashReplayValidationError {
360 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361 match self {
362 Self::UnsupportedSchemaVersion { expected, actual } => {
363 write!(
364 f,
365 "unsupported crash replay schema version {actual}; expected {expected}"
366 )
367 }
368 Self::EmptyScenarioId => write!(f, "crash replay scenario_id cannot be empty"),
369 Self::EmptyStateMachine => write!(f, "crash replay state_machine cannot be empty"),
370 Self::NoCheckpoints => write!(f, "crash replay report must include checkpoints"),
371 Self::EmptyCheckpointId { index } => {
372 write!(f, "crash replay checkpoint #{index} has an empty id")
373 }
374 Self::EmptyCheckpointDescription { index } => write!(
375 f,
376 "crash replay checkpoint #{index} has an empty description"
377 ),
378 Self::DuplicateCheckpointId {
379 index,
380 checkpoint_id,
381 } => write!(
382 f,
383 "crash replay checkpoint #{index} duplicates checkpoint id {checkpoint_id}"
384 ),
385 Self::NonMonotoneCheckpointOrdinal {
386 index,
387 previous,
388 current,
389 } => write!(
390 f,
391 "crash replay checkpoint #{index} ordinal {current} must be greater than previous ordinal {previous}"
392 ),
393 Self::CleanReportWithoutInvariants => {
394 write!(f, "clean crash replay report must include invariants")
395 }
396 Self::CleanReportWithoutEvents => {
397 write!(f, "clean crash replay report must include events")
398 }
399 Self::CleanReportContainsFailure => {
400 write!(
401 f,
402 "clean crash replay report contains failed events or invariants"
403 )
404 }
405 Self::CleanReportMissingCheckpointEvent { checkpoint_id } => write!(
406 f,
407 "clean crash replay report has no successful invariant-check event for checkpoint {checkpoint_id}"
408 ),
409 Self::CleanReportMissingCheckpointInvariant { checkpoint_id } => write!(
410 f,
411 "clean crash replay report has no passing invariant for checkpoint {checkpoint_id}"
412 ),
413 Self::EmptyEventCheckpointId { index } => {
414 write!(f, "crash replay event #{index} has an empty checkpoint id")
415 }
416 Self::UnknownEventCheckpoint {
417 index,
418 checkpoint_id,
419 } => write!(
420 f,
421 "crash replay event #{index} references unknown checkpoint {checkpoint_id}"
422 ),
423 Self::EmptyEventDetail { index } => {
424 write!(f, "crash replay event #{index} has an empty detail")
425 }
426 Self::EmptyInvariantCheckpointId { index } => write!(
427 f,
428 "crash replay invariant #{index} has an empty checkpoint id"
429 ),
430 Self::UnknownInvariantCheckpoint {
431 index,
432 checkpoint_id,
433 } => write!(
434 f,
435 "crash replay invariant #{index} references unknown checkpoint {checkpoint_id}"
436 ),
437 Self::EmptyInvariantName { index } => {
438 write!(f, "crash replay invariant #{index} has an empty name")
439 }
440 Self::EmptyInvariantDetail { index } => {
441 write!(f, "crash replay invariant #{index} has an empty detail")
442 }
443 }
444 }
445}
446
447impl Error for CrashReplayValidationError {}
448
449#[derive(Debug)]
450pub enum CrashReplayIoError {
451 Io(io::Error),
452 Json(serde_json::Error),
453 Validation(CrashReplayValidationError),
454}
455
456impl fmt::Display for CrashReplayIoError {
457 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
458 match self {
459 Self::Io(err) => write!(f, "crash replay I/O error: {err}"),
460 Self::Json(err) => write!(f, "crash replay JSON error: {err}"),
461 Self::Validation(err) => write!(f, "crash replay validation error: {err}"),
462 }
463 }
464}
465
466impl Error for CrashReplayIoError {
467 fn source(&self) -> Option<&(dyn Error + 'static)> {
468 match self {
469 Self::Io(err) => Some(err),
470 Self::Json(err) => Some(err),
471 Self::Validation(err) => Some(err),
472 }
473 }
474}
475
476impl From<io::Error> for CrashReplayIoError {
477 fn from(err: io::Error) -> Self {
478 Self::Io(err)
479 }
480}
481
482impl From<serde_json::Error> for CrashReplayIoError {
483 fn from(err: serde_json::Error) -> Self {
484 Self::Json(err)
485 }
486}
487
488impl From<CrashReplayValidationError> for CrashReplayIoError {
489 fn from(err: CrashReplayValidationError) -> Self {
490 Self::Validation(err)
491 }
492}
493
494pub fn replay_named_checkpoints<S, MakeState, Advance, Restart, Check>(
495 scenario_id: impl Into<String>,
496 state_machine: impl Into<String>,
497 mut checkpoints: Vec<CrashReplayCheckpoint>,
498 mut make_state: MakeState,
499 mut advance_to_checkpoint: Advance,
500 mut restart: Restart,
501 mut check_invariants: Check,
502) -> CrashReplayReport
503where
504 MakeState: FnMut() -> Result<S, CrashReplayError>,
505 Advance: FnMut(&mut S, &CrashReplayCheckpoint) -> Result<(), CrashReplayError>,
506 Restart: FnMut(&mut S) -> Result<(), CrashReplayError>,
507 Check: FnMut(&S, &CrashReplayCheckpoint) -> Vec<CrashReplayInvariant>,
508{
509 checkpoints.sort_by_key(|checkpoint| checkpoint.ordinal);
510 let mut report = CrashReplayReport {
511 schema_version: CRASH_REPLAY_SCHEMA_VERSION.to_string(),
512 scenario_id: scenario_id.into(),
513 state_machine: state_machine.into(),
514 verdict: CrashReplayVerdict::Clean,
515 checkpoints: checkpoints.clone(),
516 events: Vec::new(),
517 invariants: Vec::new(),
518 };
519
520 if checkpoints.is_empty() {
521 report.verdict = CrashReplayVerdict::Failed;
522 return report;
523 }
524
525 for checkpoint in checkpoints {
526 let mut state = match make_state() {
527 Ok(state) => state,
528 Err(err) => {
529 report.verdict = CrashReplayVerdict::Failed;
530 report.events.push(CrashReplayEvent::failed(
531 &checkpoint,
532 CrashReplayPhase::AdvanceToCheckpoint,
533 format!("failed creating fresh state: {err}"),
534 ));
535 continue;
536 }
537 };
538
539 match advance_to_checkpoint(&mut state, &checkpoint) {
540 Ok(()) => report.events.push(CrashReplayEvent::ok(
541 &checkpoint,
542 CrashReplayPhase::AdvanceToCheckpoint,
543 "advanced to checkpoint",
544 )),
545 Err(err) => {
546 report.verdict = CrashReplayVerdict::Failed;
547 report.events.push(CrashReplayEvent::failed(
548 &checkpoint,
549 CrashReplayPhase::AdvanceToCheckpoint,
550 err.to_string(),
551 ));
552 continue;
553 }
554 }
555
556 report.events.push(CrashReplayEvent::ok(
557 &checkpoint,
558 CrashReplayPhase::InjectCrash,
559 "simulated process stop at named checkpoint",
560 ));
561
562 match restart(&mut state) {
563 Ok(()) => report.events.push(CrashReplayEvent::ok(
564 &checkpoint,
565 CrashReplayPhase::Restart,
566 "restart action completed",
567 )),
568 Err(err) => {
569 report.verdict = CrashReplayVerdict::Failed;
570 report.events.push(CrashReplayEvent::failed(
571 &checkpoint,
572 CrashReplayPhase::Restart,
573 err.to_string(),
574 ));
575 continue;
576 }
577 }
578
579 let invariants = check_invariants(&state, &checkpoint);
580 if invariants.is_empty() {
581 report.verdict = CrashReplayVerdict::Failed;
582 report.events.push(CrashReplayEvent::failed(
583 &checkpoint,
584 CrashReplayPhase::CheckInvariants,
585 "checkpoint produced no invariants",
586 ));
587 continue;
588 }
589
590 let failed = invariants.iter().any(|invariant| !invariant.passed);
591 if failed {
592 report.verdict = CrashReplayVerdict::Failed;
593 }
594 report.events.push(CrashReplayEvent {
595 checkpoint_id: checkpoint.id.clone(),
596 phase: CrashReplayPhase::CheckInvariants,
597 ok: !failed,
598 detail: format!("{} invariant(s) evaluated", invariants.len()),
599 });
600 report.invariants.extend(invariants);
601 }
602
603 report
604}
605
606#[cfg(test)]
607mod tests {
608 use super::*;
609 use crate::policy_registry::{
610 PolicyControllerStatus, PolicyFallbackState, policy_registry_snapshot,
611 };
612 use crate::search::policy::{
613 CHUNKING_STRATEGY_VERSION, SEMANTIC_SCHEMA_VERSION, SemanticPolicy,
614 };
615 use crate::search::semantic_manifest::{
616 ArtifactRecord, BuildCheckpoint, SemanticManifest, TierKind,
617 };
618 use serde_json::{Value, json};
619 use std::path::PathBuf;
620 use tempfile::TempDir;
621
622 #[derive(Debug)]
623 struct SemanticReplayState {
624 temp_dir: TempDir,
625 loaded: Option<SemanticManifest>,
626 }
627
628 impl SemanticReplayState {
629 fn data_dir(&self) -> &Path {
630 self.temp_dir.path()
631 }
632 }
633
634 fn semantic_checkpoint() -> BuildCheckpoint {
635 BuildCheckpoint {
636 tier: TierKind::Fast,
637 embedder_id: "fnv1a-384".to_string(),
638 last_offset: 8,
639 docs_embedded: 13,
640 conversations_processed: 2,
641 total_conversations: 5,
642 db_fingerprint: "semantic-fp".to_string(),
643 schema_version: SEMANTIC_SCHEMA_VERSION,
644 chunking_version: CHUNKING_STRATEGY_VERSION,
645 saved_at_ms: 1_700_000_000_000,
646 last_message_id: None,
647 }
648 }
649
650 fn semantic_artifact() -> ArtifactRecord {
651 ArtifactRecord {
652 tier: TierKind::Fast,
653 embedder_id: "fnv1a-384".to_string(),
654 model_revision: "hash".to_string(),
655 schema_version: SEMANTIC_SCHEMA_VERSION,
656 chunking_version: CHUNKING_STRATEGY_VERSION,
657 dimension: 384,
658 doc_count: 13,
659 conversation_count: 5,
660 db_fingerprint: "semantic-fp".to_string(),
661 index_path: "vector_index/fast.fsvi".to_string(),
662 size_bytes: 4096,
663 started_at_ms: 1_700_000_000_000,
664 completed_at_ms: 1_700_000_060_000,
665 ready: true,
666 }
667 }
668
669 #[test]
670 fn semantic_manifest_state_machine_replays_checkpoint_and_publish_crashes() {
671 let checkpoints = vec![
672 CrashReplayCheckpoint::new(
673 10,
674 "semantic_checkpoint_saved",
675 "semantic checkpoint persisted before artifact publish",
676 ),
677 CrashReplayCheckpoint::new(
678 20,
679 "semantic_artifact_published",
680 "semantic artifact published and checkpoint cleared",
681 ),
682 ];
683
684 let report =
685 replay_named_checkpoints(
686 "semantic-manifest-save-restart",
687 "semantic_manifest",
688 checkpoints,
689 || {
690 Ok(SemanticReplayState {
691 temp_dir: tempfile::tempdir()
692 .map_err(|err| CrashReplayError::from_error("create tempdir", err))?,
693 loaded: None,
694 })
695 },
696 |state, checkpoint| {
697 let mut manifest = SemanticManifest::default();
698 manifest.refresh_backlog(5, "semantic-fp");
699 manifest.save_checkpoint(semantic_checkpoint());
700 if checkpoint.id == "semantic_artifact_published" {
701 manifest.publish_artifact(semantic_artifact());
702 }
703 manifest
704 .save(state.data_dir())
705 .map_err(|err| CrashReplayError::from_error("save semantic manifest", err))
706 },
707 |state| {
708 state.loaded = SemanticManifest::load(state.data_dir()).map_err(|err| {
709 CrashReplayError::from_error("load semantic manifest", err)
710 })?;
711 Ok(())
712 },
713 |state, checkpoint| {
714 let mut invariants = Vec::new();
715 let Some(manifest) = &state.loaded else {
716 return vec![CrashReplayInvariant::failed(
717 checkpoint,
718 "semantic_manifest_loaded",
719 "manifest did not load after restart",
720 )];
721 };
722
723 invariants.push(CrashReplayInvariant::passed(
724 checkpoint,
725 "semantic_manifest_loaded",
726 "manifest loaded after restart",
727 ));
728 match checkpoint.id.as_str() {
729 "semantic_checkpoint_saved" => {
730 invariants.push(if manifest.checkpoint.is_some()
731 && manifest.fast_tier.is_none()
732 {
733 CrashReplayInvariant::passed(
734 checkpoint,
735 "checkpoint_without_torn_artifact",
736 "restart sees resumable checkpoint and no half-published artifact",
737 )
738 } else {
739 CrashReplayInvariant::failed(
740 checkpoint,
741 "checkpoint_without_torn_artifact",
742 format!(
743 "checkpoint={:?} fast_tier={:?}",
744 manifest.checkpoint, manifest.fast_tier
745 ),
746 )
747 });
748 }
749 "semantic_artifact_published" => {
750 invariants.push(if manifest.checkpoint.is_none()
751 && manifest.fast_tier.as_ref().is_some_and(|artifact| artifact.ready)
752 {
753 CrashReplayInvariant::passed(
754 checkpoint,
755 "published_artifact_clears_checkpoint",
756 "restart sees ready artifact and no stale matching checkpoint",
757 )
758 } else {
759 CrashReplayInvariant::failed(
760 checkpoint,
761 "published_artifact_clears_checkpoint",
762 format!(
763 "checkpoint={:?} fast_tier={:?}",
764 manifest.checkpoint, manifest.fast_tier
765 ),
766 )
767 });
768 }
769 _ => invariants.push(CrashReplayInvariant::failed(
770 checkpoint,
771 "known_checkpoint",
772 "unexpected semantic checkpoint",
773 )),
774 }
775 invariants
776 },
777 );
778
779 assert_eq!(report.verdict, CrashReplayVerdict::Clean);
780 assert_eq!(report.checkpoints.len(), 2);
781 assert_eq!(report.invariants.len(), 4);
782 assert!(
783 report.validate().is_ok(),
784 "semantic replay report should validate: {report:?}"
785 );
786 }
787
788 #[derive(Debug)]
789 struct PolicyReplayState {
790 pipeline: Value,
791 semantic_available: bool,
792 semantic_fallback_mode: Option<&'static str>,
793 snapshot_statuses: Vec<(String, PolicyControllerStatus, PolicyFallbackState)>,
794 }
795
796 fn policy_pipeline_fixture(mode: &str, reason: &str) -> Value {
797 json!({
798 "pipeline_channel_size": 128,
799 "pipeline_max_message_bytes_in_flight": 1048576,
800 "page_prep_workers": 12,
801 "staged_merge_workers": 4,
802 "staged_shard_builders": 8,
803 "controller_mode": "auto",
804 "controller_restore_clear_samples": 3,
805 "controller_restore_hold_ms": 5000,
806 "controller_loadavg_high_watermark_1m": 1.75,
807 "controller_loadavg_low_watermark_1m": 0.75,
808 "runtime": {
809 "controller_mode": mode,
810 "controller_reason": reason
811 }
812 })
813 }
814
815 #[test]
816 fn policy_registry_state_machine_replays_deterministic_controller_snapshots() {
817 let checkpoints = vec![
818 CrashReplayCheckpoint::new(
819 10,
820 "semantic_fallback_snapshot",
821 "semantic controller reports lexical fallback",
822 ),
823 CrashReplayCheckpoint::new(
824 20,
825 "lexical_throttle_snapshot",
826 "lexical rebuild controller reports pressure fallback",
827 ),
828 ];
829
830 let report = replay_named_checkpoints(
831 "policy-registry-recompute-restart",
832 "policy_registry",
833 checkpoints,
834 || {
835 Ok(PolicyReplayState {
836 pipeline: policy_pipeline_fixture("steady", "pipeline settings active"),
837 semantic_available: true,
838 semantic_fallback_mode: None,
839 snapshot_statuses: Vec::new(),
840 })
841 },
842 |state, checkpoint| {
843 match checkpoint.id.as_str() {
844 "semantic_fallback_snapshot" => {
845 state.semantic_available = false;
846 state.semantic_fallback_mode = Some("lexical");
847 }
848 "lexical_throttle_snapshot" => {
849 state.pipeline =
850 policy_pipeline_fixture("throttled", "load pressure reduced workers");
851 }
852 _ => {
853 return Err(CrashReplayError::new(
854 "advance policy checkpoint",
855 "unknown checkpoint",
856 ));
857 }
858 }
859 Ok(())
860 },
861 |state| {
862 let policy = SemanticPolicy::compiled_defaults();
863 let snapshot = policy_registry_snapshot(
864 &policy,
865 state.semantic_available,
866 state.semantic_fallback_mode,
867 &state.pipeline,
868 );
869 state.snapshot_statuses = snapshot
870 .controllers
871 .into_iter()
872 .map(|controller| {
873 (
874 controller.controller_id,
875 controller.status,
876 controller.fallback_state,
877 )
878 })
879 .collect();
880 Ok(())
881 },
882 |state, checkpoint| {
883 let ids: Vec<_> = state
884 .snapshot_statuses
885 .iter()
886 .map(|(id, _, _)| id.as_str())
887 .collect();
888 let mut invariants =
889 vec![if ids == ["lexical_rebuild_pipeline", "semantic_search"] {
890 CrashReplayInvariant::passed(
891 checkpoint,
892 "controller_ids_sorted",
893 "controller ids are deterministic and sorted",
894 )
895 } else {
896 CrashReplayInvariant::failed(
897 checkpoint,
898 "controller_ids_sorted",
899 format!("unexpected controller ids: {ids:?}"),
900 )
901 }];
902
903 let expected_controller = match checkpoint.id.as_str() {
904 "semantic_fallback_snapshot" => "semantic_search",
905 "lexical_throttle_snapshot" => "lexical_rebuild_pipeline",
906 _ => "unknown",
907 };
908 let controller = state
909 .snapshot_statuses
910 .iter()
911 .find(|(id, _, _)| id == expected_controller);
912 invariants.push(match controller {
913 Some((
914 _id,
915 PolicyControllerStatus::Fallback,
916 PolicyFallbackState::Conservative,
917 )) => CrashReplayInvariant::passed(
918 checkpoint,
919 "conservative_fallback_reported",
920 "checkpoint recompute reports conservative fallback",
921 ),
922 other => CrashReplayInvariant::failed(
923 checkpoint,
924 "conservative_fallback_reported",
925 format!("unexpected controller status: {other:?}"),
926 ),
927 });
928 invariants
929 },
930 );
931
932 assert_eq!(report.verdict, CrashReplayVerdict::Clean);
933 assert!(
934 report.validate().is_ok(),
935 "policy replay report should validate: {report:?}"
936 );
937 }
938
939 #[derive(Debug)]
940 struct LexicalPublishFixtureState {
941 temp_dir: TempDir,
942 live_path: PathBuf,
943 staged_path: PathBuf,
944 backup_path: PathBuf,
945 }
946
947 impl LexicalPublishFixtureState {
948 fn new() -> Result<Self, CrashReplayError> {
949 let temp_dir = tempfile::tempdir()
950 .map_err(|err| CrashReplayError::from_error("create tempdir", err))?;
951 let live_path = temp_dir.path().join("live-generation.txt");
952 let staged_path = temp_dir.path().join("staged-generation.txt");
953 let backup_path = temp_dir.path().join("live-generation.bak");
954 fs::write(&live_path, "old-generation")
955 .map_err(|err| CrashReplayError::from_error("seed live generation", err))?;
956 Ok(Self {
957 temp_dir,
958 live_path,
959 staged_path,
960 backup_path,
961 })
962 }
963
964 fn write_staged(&self) -> Result<(), CrashReplayError> {
965 fs::write(&self.staged_path, "new-generation")
966 .map_err(|err| CrashReplayError::from_error("write staged generation", err))
967 }
968
969 fn park_live(&self) -> Result<(), CrashReplayError> {
970 fs::rename(&self.live_path, &self.backup_path)
971 .map_err(|err| CrashReplayError::from_error("park live generation", err))
972 }
973
974 fn publish_staged(&self) -> Result<(), CrashReplayError> {
975 fs::rename(&self.staged_path, &self.live_path)
976 .map_err(|err| CrashReplayError::from_error("publish staged generation", err))
977 }
978 }
979
980 #[test]
981 fn lexical_publish_fixture_replays_park_and_swap_crash_windows() {
982 let checkpoints = vec![
983 CrashReplayCheckpoint::new(
984 10,
985 "staged_written",
986 "staged generation exists before live path is touched",
987 ),
988 CrashReplayCheckpoint::new(
989 20,
990 "live_parked",
991 "live generation has been parked but staged is not yet live",
992 ),
993 CrashReplayCheckpoint::new(
994 30,
995 "staged_published",
996 "staged generation has been promoted to live",
997 ),
998 ];
999
1000 let report = replay_named_checkpoints(
1001 "lexical-publish-fixture-restart",
1002 "lexical_publish",
1003 checkpoints,
1004 LexicalPublishFixtureState::new,
1005 |state, checkpoint| {
1006 state.write_staged()?;
1007 match checkpoint.id.as_str() {
1008 "staged_written" => {}
1009 "live_parked" => {
1010 state.park_live()?;
1011 }
1012 "staged_published" => {
1013 state.park_live()?;
1014 state.publish_staged()?;
1015 }
1016 _ => {
1017 return Err(CrashReplayError::new(
1018 "advance lexical publish checkpoint",
1019 "unknown checkpoint",
1020 ));
1021 }
1022 }
1023 Ok(())
1024 },
1025 |state| {
1026 if !state.live_path.exists() && state.backup_path.exists() {
1027 fs::rename(&state.backup_path, &state.live_path)
1028 .map_err(|err| CrashReplayError::from_error("restore parked live", err))?;
1029 }
1030 Ok(())
1031 },
1032 |state, checkpoint| {
1033 let live = fs::read_to_string(&state.live_path).ok();
1034 let expected = match checkpoint.id.as_str() {
1035 "staged_written" | "live_parked" => "old-generation",
1036 "staged_published" => "new-generation",
1037 _ => "unknown",
1038 };
1039
1040 vec![
1041 if state.temp_dir.path().exists() {
1042 CrashReplayInvariant::passed(
1043 checkpoint,
1044 "fixture_root_retained",
1045 "fixture root remains available for artifact inspection",
1046 )
1047 } else {
1048 CrashReplayInvariant::failed(
1049 checkpoint,
1050 "fixture_root_retained",
1051 "fixture root disappeared before invariant checks",
1052 )
1053 },
1054 if live.as_deref() == Some(expected) {
1055 CrashReplayInvariant::passed(
1056 checkpoint,
1057 "live_generation_is_old_or_new",
1058 format!("live generation recovered as {expected}"),
1059 )
1060 } else {
1061 CrashReplayInvariant::failed(
1062 checkpoint,
1063 "live_generation_is_old_or_new",
1064 format!("expected {expected}, got {live:?}"),
1065 )
1066 },
1067 ]
1068 },
1069 );
1070
1071 assert_eq!(report.verdict, CrashReplayVerdict::Clean);
1072 assert!(
1073 report.validate().is_ok(),
1074 "lexical publish replay report should validate: {report:?}"
1075 );
1076 }
1077
1078 #[derive(Debug)]
1079 struct BackupRecoveryFixtureState {
1080 temp_dir: TempDir,
1081 canonical_db: PathBuf,
1082 backup_dir: PathBuf,
1083 manifest: Option<Value>,
1084 }
1085
1086 impl BackupRecoveryFixtureState {
1087 fn new() -> Result<Self, CrashReplayError> {
1088 let temp_dir = tempfile::tempdir()
1089 .map_err(|err| CrashReplayError::from_error("create tempdir", err))?;
1090 let canonical_db = temp_dir.path().join("cass.db");
1091 let backup_dir = temp_dir.path().join("backup");
1092 fs::write(&canonical_db, "canonical-main")
1093 .map_err(|err| CrashReplayError::from_error("seed canonical db", err))?;
1094 fs::write(temp_dir.path().join("cass.db-wal"), "canonical-wal")
1095 .map_err(|err| CrashReplayError::from_error("seed canonical wal", err))?;
1096 fs::create_dir_all(&backup_dir)
1097 .map_err(|err| CrashReplayError::from_error("create backup dir", err))?;
1098 Ok(Self {
1099 temp_dir,
1100 canonical_db,
1101 backup_dir,
1102 manifest: None,
1103 })
1104 }
1105
1106 fn copy_main(&self) -> Result<(), CrashReplayError> {
1107 fs::copy(&self.canonical_db, self.backup_dir.join("cass.db"))
1108 .map(|_| ())
1109 .map_err(|err| CrashReplayError::from_error("copy backup main", err))
1110 }
1111
1112 fn copy_wal_and_manifest(&self) -> Result<(), CrashReplayError> {
1113 fs::copy(
1114 self.temp_dir.path().join("cass.db-wal"),
1115 self.backup_dir.join("cass.db-wal"),
1116 )
1117 .map_err(|err| CrashReplayError::from_error("copy backup wal", err))?;
1118 let manifest = json!({
1119 "schema_version": 1,
1120 "complete": true,
1121 "files": ["cass.db", "cass.db-wal"],
1122 });
1123 let bytes = serde_json::to_vec_pretty(&manifest)
1124 .map_err(|err| CrashReplayError::from_error("encode backup manifest", err))?;
1125 fs::write(self.backup_dir.join("manifest.json"), bytes)
1126 .map_err(|err| CrashReplayError::from_error("write backup manifest", err))
1127 }
1128 }
1129
1130 #[test]
1131 fn backup_recovery_fixture_replays_incomplete_and_complete_bundle_crashes() {
1132 let checkpoints = vec![
1133 CrashReplayCheckpoint::new(
1134 10,
1135 "backup_main_copied",
1136 "backup main file copied before bundle manifest exists",
1137 ),
1138 CrashReplayCheckpoint::new(
1139 20,
1140 "backup_manifest_written",
1141 "backup sidecars and manifest mark the bundle complete",
1142 ),
1143 ];
1144
1145 let report = replay_named_checkpoints(
1146 "backup-recovery-fixture-restart",
1147 "backup_recovery",
1148 checkpoints,
1149 BackupRecoveryFixtureState::new,
1150 |state, checkpoint| {
1151 state.copy_main()?;
1152 match checkpoint.id.as_str() {
1153 "backup_main_copied" => {}
1154 "backup_manifest_written" => {
1155 state.copy_wal_and_manifest()?;
1156 }
1157 _ => {
1158 return Err(CrashReplayError::new(
1159 "advance backup recovery checkpoint",
1160 "unknown checkpoint",
1161 ));
1162 }
1163 }
1164 Ok(())
1165 },
1166 |state| {
1167 let manifest_path = state.backup_dir.join("manifest.json");
1168 state.manifest = if manifest_path.exists() {
1169 let bytes = fs::read(&manifest_path)
1170 .map_err(|err| CrashReplayError::from_error("read backup manifest", err))?;
1171 Some(serde_json::from_slice(&bytes).map_err(|err| {
1172 CrashReplayError::from_error("parse backup manifest", err)
1173 })?)
1174 } else {
1175 None
1176 };
1177 Ok(())
1178 },
1179 |state, checkpoint| {
1180 let canonical = fs::read_to_string(&state.canonical_db).ok();
1181 let mut invariants = vec![if canonical.as_deref() == Some("canonical-main") {
1182 CrashReplayInvariant::passed(
1183 checkpoint,
1184 "canonical_db_preserved",
1185 "restart did not replace the canonical DB from an incomplete backup",
1186 )
1187 } else {
1188 CrashReplayInvariant::failed(
1189 checkpoint,
1190 "canonical_db_preserved",
1191 format!("unexpected canonical DB content: {canonical:?}"),
1192 )
1193 }];
1194
1195 match checkpoint.id.as_str() {
1196 "backup_main_copied" => {
1197 invariants.push(if state.manifest.is_none() {
1198 CrashReplayInvariant::passed(
1199 checkpoint,
1200 "partial_backup_not_marked_complete",
1201 "main-only backup has no manifest and is not advertised recoverable",
1202 )
1203 } else {
1204 CrashReplayInvariant::failed(
1205 checkpoint,
1206 "partial_backup_not_marked_complete",
1207 format!("unexpected manifest: {:?}", state.manifest),
1208 )
1209 });
1210 }
1211 "backup_manifest_written" => {
1212 let complete = state
1213 .manifest
1214 .as_ref()
1215 .and_then(|manifest| manifest.get("complete"))
1216 .and_then(Value::as_bool)
1217 == Some(true);
1218 let files_match = state
1219 .manifest
1220 .as_ref()
1221 .and_then(|manifest| manifest.get("files"))
1222 .and_then(Value::as_array)
1223 .map(|files| {
1224 let mut names = files.iter().filter_map(Value::as_str);
1225 matches!(
1226 (names.next(), names.next(), names.next()),
1227 (Some("cass.db"), Some("cass.db-wal"), None)
1228 )
1229 })
1230 == Some(true);
1231 let wal_exists = state.backup_dir.join("cass.db-wal").exists();
1232 invariants.push(if complete && files_match && wal_exists {
1233 CrashReplayInvariant::passed(
1234 checkpoint,
1235 "complete_backup_manifest_matches_sidecars",
1236 "complete manifest is present only with expected sidecars",
1237 )
1238 } else {
1239 CrashReplayInvariant::failed(
1240 checkpoint,
1241 "complete_backup_manifest_matches_sidecars",
1242 format!(
1243 "complete={complete} files_match={files_match} wal_exists={wal_exists}"
1244 ),
1245 )
1246 });
1247 }
1248 _ => invariants.push(CrashReplayInvariant::failed(
1249 checkpoint,
1250 "known_backup_checkpoint",
1251 "unexpected backup checkpoint",
1252 )),
1253 }
1254 invariants
1255 },
1256 );
1257
1258 assert_eq!(report.verdict, CrashReplayVerdict::Clean);
1259 assert!(
1260 report.validate().is_ok(),
1261 "backup recovery replay report should validate: {report:?}"
1262 );
1263 }
1264
1265 #[test]
1266 fn crash_replay_report_round_trips_as_artifact_manifest()
1267 -> Result<(), Box<dyn std::error::Error>> {
1268 let temp_dir = tempfile::tempdir()?;
1269 let path = temp_dir
1270 .path()
1271 .join("artifacts/crash-replay/crash-replay-report.json");
1272 let checkpoints = vec![CrashReplayCheckpoint::new(
1273 1,
1274 "only_checkpoint",
1275 "single checkpoint for artifact round-trip",
1276 )];
1277 let report = replay_named_checkpoints(
1278 "artifact-round-trip",
1279 "harness",
1280 checkpoints,
1281 || Ok(()),
1282 |_state, _checkpoint| Ok(()),
1283 |_state| Ok(()),
1284 |_state, checkpoint| {
1285 vec![CrashReplayInvariant::passed(
1286 checkpoint,
1287 "round_trip_invariant",
1288 "round-trip invariant passed",
1289 )]
1290 },
1291 );
1292
1293 report.save_json(&path)?;
1294 let loaded = CrashReplayReport::load_json(&path)?;
1295
1296 assert_eq!(loaded, report);
1297 Ok(())
1298 }
1299
1300 #[test]
1301 fn crash_replay_validation_rejects_untrustworthy_clean_reports() {
1302 let checkpoint = CrashReplayCheckpoint::new(1, "checkpoint", "validation checkpoint");
1303 let report = CrashReplayReport {
1304 schema_version: CRASH_REPLAY_SCHEMA_VERSION.to_string(),
1305 scenario_id: "bad-clean-report".to_string(),
1306 state_machine: "harness".to_string(),
1307 verdict: CrashReplayVerdict::Clean,
1308 checkpoints: vec![checkpoint.clone()],
1309 events: vec![CrashReplayEvent {
1310 checkpoint_id: checkpoint.id.clone(),
1311 phase: CrashReplayPhase::CheckInvariants,
1312 ok: true,
1313 detail: "checked".to_string(),
1314 }],
1315 invariants: vec![CrashReplayInvariant::failed(
1316 &checkpoint,
1317 "must_not_fail",
1318 "intentional validation failure",
1319 )],
1320 };
1321
1322 assert!(matches!(
1323 report.validate(),
1324 Err(CrashReplayValidationError::CleanReportContainsFailure)
1325 ));
1326
1327 let duplicate_checkpoint = CrashReplayCheckpoint {
1328 ordinal: 2,
1329 ..checkpoint.clone()
1330 };
1331 let duplicate_report = CrashReplayReport {
1332 checkpoints: vec![checkpoint.clone(), duplicate_checkpoint],
1333 ..report.clone()
1334 };
1335 assert!(matches!(
1336 duplicate_report.validate(),
1337 Err(CrashReplayValidationError::DuplicateCheckpointId { .. })
1338 ));
1339
1340 let missing_check_event_report = CrashReplayReport {
1341 events: vec![CrashReplayEvent {
1342 checkpoint_id: checkpoint.id.clone(),
1343 phase: CrashReplayPhase::AdvanceToCheckpoint,
1344 ok: true,
1345 detail: "advanced".to_string(),
1346 }],
1347 invariants: vec![CrashReplayInvariant::passed(
1348 &checkpoint,
1349 "passing_but_unchecked",
1350 "invariant exists but no check event proves it ran",
1351 )],
1352 ..report
1353 };
1354 assert!(matches!(
1355 missing_check_event_report.validate(),
1356 Err(CrashReplayValidationError::CleanReportMissingCheckpointEvent { .. })
1357 ));
1358 }
1359}