1use serde::{Deserialize, Serialize};
9use std::collections::BTreeSet;
10use std::error::Error;
11use std::fmt;
12use std::fs;
13use std::io;
14use std::path::Path;
15
16pub const CRASH_REPLAY_SCHEMA_VERSION: &str = "1";
17
18#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
19pub struct CrashReplayCheckpoint {
20 pub id: String,
21 pub ordinal: u32,
22 pub description: String,
23}
24
25impl CrashReplayCheckpoint {
26 pub fn new(ordinal: u32, id: impl Into<String>, description: impl Into<String>) -> Self {
27 Self {
28 id: id.into(),
29 ordinal,
30 description: description.into(),
31 }
32 }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum CrashReplayPhase {
38 AdvanceToCheckpoint,
39 InjectCrash,
40 Restart,
41 CheckInvariants,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45pub struct CrashReplayEvent {
46 pub checkpoint_id: String,
47 pub phase: CrashReplayPhase,
48 pub ok: bool,
49 pub detail: String,
50}
51
52impl CrashReplayEvent {
53 fn ok(
54 checkpoint: &CrashReplayCheckpoint,
55 phase: CrashReplayPhase,
56 detail: impl Into<String>,
57 ) -> Self {
58 Self {
59 checkpoint_id: checkpoint.id.clone(),
60 phase,
61 ok: true,
62 detail: detail.into(),
63 }
64 }
65
66 fn failed(
67 checkpoint: &CrashReplayCheckpoint,
68 phase: CrashReplayPhase,
69 detail: impl Into<String>,
70 ) -> Self {
71 Self {
72 checkpoint_id: checkpoint.id.clone(),
73 phase,
74 ok: false,
75 detail: detail.into(),
76 }
77 }
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct CrashReplayInvariant {
82 pub checkpoint_id: String,
83 pub name: String,
84 pub passed: bool,
85 pub detail: String,
86}
87
88impl CrashReplayInvariant {
89 pub fn passed(
90 checkpoint: &CrashReplayCheckpoint,
91 name: impl Into<String>,
92 detail: impl Into<String>,
93 ) -> Self {
94 Self {
95 checkpoint_id: checkpoint.id.clone(),
96 name: name.into(),
97 passed: true,
98 detail: detail.into(),
99 }
100 }
101
102 pub fn failed(
103 checkpoint: &CrashReplayCheckpoint,
104 name: impl Into<String>,
105 detail: impl Into<String>,
106 ) -> Self {
107 Self {
108 checkpoint_id: checkpoint.id.clone(),
109 name: name.into(),
110 passed: false,
111 detail: detail.into(),
112 }
113 }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
117#[serde(rename_all = "snake_case")]
118pub enum CrashReplayVerdict {
119 Clean,
120 Failed,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
124pub struct CrashReplayReport {
125 pub schema_version: String,
126 pub scenario_id: String,
127 pub state_machine: String,
128 pub verdict: CrashReplayVerdict,
129 pub checkpoints: Vec<CrashReplayCheckpoint>,
130 pub events: Vec<CrashReplayEvent>,
131 pub invariants: Vec<CrashReplayInvariant>,
132}
133
134impl CrashReplayReport {
135 pub fn validate(&self) -> Result<(), CrashReplayValidationError> {
136 if self.schema_version != CRASH_REPLAY_SCHEMA_VERSION {
137 return Err(CrashReplayValidationError::UnsupportedSchemaVersion {
138 expected: CRASH_REPLAY_SCHEMA_VERSION,
139 actual: self.schema_version.clone(),
140 });
141 }
142 if self.scenario_id.trim().is_empty() {
143 return Err(CrashReplayValidationError::EmptyScenarioId);
144 }
145 if self.state_machine.trim().is_empty() {
146 return Err(CrashReplayValidationError::EmptyStateMachine);
147 }
148 if self.checkpoints.is_empty() {
149 return Err(CrashReplayValidationError::NoCheckpoints);
150 }
151 if self.verdict == CrashReplayVerdict::Clean && self.invariants.is_empty() {
152 return Err(CrashReplayValidationError::CleanReportWithoutInvariants);
153 }
154
155 let mut checkpoint_ids = BTreeSet::new();
156 let mut previous_ordinal = None;
157 for (index, checkpoint) in self.checkpoints.iter().enumerate() {
158 if checkpoint.id.trim().is_empty() {
159 return Err(CrashReplayValidationError::EmptyCheckpointId { index });
160 }
161 if checkpoint.description.trim().is_empty() {
162 return Err(CrashReplayValidationError::EmptyCheckpointDescription { index });
163 }
164 if let Some(previous) = previous_ordinal
165 && checkpoint.ordinal <= previous
166 {
167 return Err(CrashReplayValidationError::NonMonotoneCheckpointOrdinal {
168 index,
169 previous,
170 current: checkpoint.ordinal,
171 });
172 }
173 if !checkpoint_ids.insert(checkpoint.id.as_str()) {
174 return Err(CrashReplayValidationError::DuplicateCheckpointId {
175 index,
176 checkpoint_id: checkpoint.id.clone(),
177 });
178 }
179 previous_ordinal = Some(checkpoint.ordinal);
180 }
181
182 let mut checked_checkpoints = BTreeSet::new();
183 for (index, event) in self.events.iter().enumerate() {
184 if event.checkpoint_id.trim().is_empty() {
185 return Err(CrashReplayValidationError::EmptyEventCheckpointId { index });
186 }
187 if !checkpoint_ids.contains(event.checkpoint_id.as_str()) {
188 return Err(CrashReplayValidationError::UnknownEventCheckpoint {
189 index,
190 checkpoint_id: event.checkpoint_id.clone(),
191 });
192 }
193 if event.detail.trim().is_empty() {
194 return Err(CrashReplayValidationError::EmptyEventDetail { index });
195 }
196 if event.ok && event.phase == CrashReplayPhase::CheckInvariants {
197 checked_checkpoints.insert(event.checkpoint_id.as_str());
198 }
199 }
200
201 let mut invariant_checkpoints = BTreeSet::new();
202 for (index, invariant) in self.invariants.iter().enumerate() {
203 if invariant.checkpoint_id.trim().is_empty() {
204 return Err(CrashReplayValidationError::EmptyInvariantCheckpointId { index });
205 }
206 if !checkpoint_ids.contains(invariant.checkpoint_id.as_str()) {
207 return Err(CrashReplayValidationError::UnknownInvariantCheckpoint {
208 index,
209 checkpoint_id: invariant.checkpoint_id.clone(),
210 });
211 }
212 if invariant.name.trim().is_empty() {
213 return Err(CrashReplayValidationError::EmptyInvariantName { index });
214 }
215 if invariant.detail.trim().is_empty() {
216 return Err(CrashReplayValidationError::EmptyInvariantDetail { index });
217 }
218 if invariant.passed {
219 invariant_checkpoints.insert(invariant.checkpoint_id.as_str());
220 }
221 }
222 if self.verdict == CrashReplayVerdict::Clean
223 && (self.events.iter().any(|event| !event.ok)
224 || self.invariants.iter().any(|invariant| !invariant.passed))
225 {
226 return Err(CrashReplayValidationError::CleanReportContainsFailure);
227 }
228 if self.verdict == CrashReplayVerdict::Clean {
229 if self.events.is_empty() {
230 return Err(CrashReplayValidationError::CleanReportWithoutEvents);
231 }
232 for checkpoint in &self.checkpoints {
233 if !checked_checkpoints.contains(checkpoint.id.as_str()) {
234 return Err(
235 CrashReplayValidationError::CleanReportMissingCheckpointEvent {
236 checkpoint_id: checkpoint.id.clone(),
237 },
238 );
239 }
240 if !invariant_checkpoints.contains(checkpoint.id.as_str()) {
241 return Err(
242 CrashReplayValidationError::CleanReportMissingCheckpointInvariant {
243 checkpoint_id: checkpoint.id.clone(),
244 },
245 );
246 }
247 }
248 }
249
250 Ok(())
251 }
252
253 pub fn save_json(&self, path: &Path) -> Result<(), CrashReplayIoError> {
254 self.validate()?;
255 if let Some(parent) = path
256 .parent()
257 .filter(|parent| !parent.as_os_str().is_empty())
258 {
259 fs::create_dir_all(parent)?;
260 }
261 let json = serde_json::to_vec_pretty(self)?;
262 fs::write(path, json)?;
263 Ok(())
264 }
265
266 pub fn load_json(path: &Path) -> Result<Self, CrashReplayIoError> {
267 let bytes = fs::read(path)?;
268 let report: Self = serde_json::from_slice(&bytes)?;
269 report.validate()?;
270 Ok(report)
271 }
272}
273
274#[derive(Debug, Clone, PartialEq, Eq)]
275pub struct CrashReplayError {
276 pub action: String,
277 pub detail: String,
278}
279
280impl CrashReplayError {
281 pub fn new(action: impl Into<String>, detail: impl Into<String>) -> Self {
282 Self {
283 action: action.into(),
284 detail: detail.into(),
285 }
286 }
287
288 pub fn from_error(action: impl Into<String>, error: impl fmt::Display) -> Self {
289 Self::new(action, error.to_string())
290 }
291}
292
293impl fmt::Display for CrashReplayError {
294 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
295 write!(f, "{}: {}", self.action, self.detail)
296 }
297}
298
299impl Error for CrashReplayError {}
300
301#[derive(Debug)]
302pub enum CrashReplayValidationError {
303 UnsupportedSchemaVersion {
304 expected: &'static str,
305 actual: String,
306 },
307 EmptyScenarioId,
308 EmptyStateMachine,
309 NoCheckpoints,
310 EmptyCheckpointId {
311 index: usize,
312 },
313 EmptyCheckpointDescription {
314 index: usize,
315 },
316 DuplicateCheckpointId {
317 index: usize,
318 checkpoint_id: String,
319 },
320 NonMonotoneCheckpointOrdinal {
321 index: usize,
322 previous: u32,
323 current: u32,
324 },
325 CleanReportWithoutInvariants,
326 CleanReportWithoutEvents,
327 CleanReportContainsFailure,
328 CleanReportMissingCheckpointEvent {
329 checkpoint_id: String,
330 },
331 CleanReportMissingCheckpointInvariant {
332 checkpoint_id: String,
333 },
334 EmptyEventCheckpointId {
335 index: usize,
336 },
337 UnknownEventCheckpoint {
338 index: usize,
339 checkpoint_id: String,
340 },
341 EmptyEventDetail {
342 index: usize,
343 },
344 EmptyInvariantCheckpointId {
345 index: usize,
346 },
347 UnknownInvariantCheckpoint {
348 index: usize,
349 checkpoint_id: String,
350 },
351 EmptyInvariantName {
352 index: usize,
353 },
354 EmptyInvariantDetail {
355 index: usize,
356 },
357}
358
359impl fmt::Display for CrashReplayValidationError {
360 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361 match self {
362 Self::UnsupportedSchemaVersion { expected, actual } => {
363 write!(
364 f,
365 "unsupported crash replay schema version {actual}; expected {expected}"
366 )
367 }
368 Self::EmptyScenarioId => write!(f, "crash replay scenario_id cannot be empty"),
369 Self::EmptyStateMachine => write!(f, "crash replay state_machine cannot be empty"),
370 Self::NoCheckpoints => write!(f, "crash replay report must include checkpoints"),
371 Self::EmptyCheckpointId { index } => {
372 write!(f, "crash replay checkpoint #{index} has an empty id")
373 }
374 Self::EmptyCheckpointDescription { index } => write!(
375 f,
376 "crash replay checkpoint #{index} has an empty description"
377 ),
378 Self::DuplicateCheckpointId {
379 index,
380 checkpoint_id,
381 } => write!(
382 f,
383 "crash replay checkpoint #{index} duplicates checkpoint id {checkpoint_id}"
384 ),
385 Self::NonMonotoneCheckpointOrdinal {
386 index,
387 previous,
388 current,
389 } => write!(
390 f,
391 "crash replay checkpoint #{index} ordinal {current} must be greater than previous ordinal {previous}"
392 ),
393 Self::CleanReportWithoutInvariants => {
394 write!(f, "clean crash replay report must include invariants")
395 }
396 Self::CleanReportWithoutEvents => {
397 write!(f, "clean crash replay report must include events")
398 }
399 Self::CleanReportContainsFailure => {
400 write!(
401 f,
402 "clean crash replay report contains failed events or invariants"
403 )
404 }
405 Self::CleanReportMissingCheckpointEvent { checkpoint_id } => write!(
406 f,
407 "clean crash replay report has no successful invariant-check event for checkpoint {checkpoint_id}"
408 ),
409 Self::CleanReportMissingCheckpointInvariant { checkpoint_id } => write!(
410 f,
411 "clean crash replay report has no passing invariant for checkpoint {checkpoint_id}"
412 ),
413 Self::EmptyEventCheckpointId { index } => {
414 write!(f, "crash replay event #{index} has an empty checkpoint id")
415 }
416 Self::UnknownEventCheckpoint {
417 index,
418 checkpoint_id,
419 } => write!(
420 f,
421 "crash replay event #{index} references unknown checkpoint {checkpoint_id}"
422 ),
423 Self::EmptyEventDetail { index } => {
424 write!(f, "crash replay event #{index} has an empty detail")
425 }
426 Self::EmptyInvariantCheckpointId { index } => write!(
427 f,
428 "crash replay invariant #{index} has an empty checkpoint id"
429 ),
430 Self::UnknownInvariantCheckpoint {
431 index,
432 checkpoint_id,
433 } => write!(
434 f,
435 "crash replay invariant #{index} references unknown checkpoint {checkpoint_id}"
436 ),
437 Self::EmptyInvariantName { index } => {
438 write!(f, "crash replay invariant #{index} has an empty name")
439 }
440 Self::EmptyInvariantDetail { index } => {
441 write!(f, "crash replay invariant #{index} has an empty detail")
442 }
443 }
444 }
445}
446
447impl Error for CrashReplayValidationError {}
448
449#[derive(Debug)]
450pub enum CrashReplayIoError {
451 Io(io::Error),
452 Json(serde_json::Error),
453 Validation(CrashReplayValidationError),
454}
455
456impl fmt::Display for CrashReplayIoError {
457 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
458 match self {
459 Self::Io(err) => write!(f, "crash replay I/O error: {err}"),
460 Self::Json(err) => write!(f, "crash replay JSON error: {err}"),
461 Self::Validation(err) => write!(f, "crash replay validation error: {err}"),
462 }
463 }
464}
465
466impl Error for CrashReplayIoError {
467 fn source(&self) -> Option<&(dyn Error + 'static)> {
468 match self {
469 Self::Io(err) => Some(err),
470 Self::Json(err) => Some(err),
471 Self::Validation(err) => Some(err),
472 }
473 }
474}
475
476impl From<io::Error> for CrashReplayIoError {
477 fn from(err: io::Error) -> Self {
478 Self::Io(err)
479 }
480}
481
482impl From<serde_json::Error> for CrashReplayIoError {
483 fn from(err: serde_json::Error) -> Self {
484 Self::Json(err)
485 }
486}
487
488impl From<CrashReplayValidationError> for CrashReplayIoError {
489 fn from(err: CrashReplayValidationError) -> Self {
490 Self::Validation(err)
491 }
492}
493
494pub fn replay_named_checkpoints<S, MakeState, Advance, Restart, Check>(
495 scenario_id: impl Into<String>,
496 state_machine: impl Into<String>,
497 mut checkpoints: Vec<CrashReplayCheckpoint>,
498 mut make_state: MakeState,
499 mut advance_to_checkpoint: Advance,
500 mut restart: Restart,
501 mut check_invariants: Check,
502) -> CrashReplayReport
503where
504 MakeState: FnMut() -> Result<S, CrashReplayError>,
505 Advance: FnMut(&mut S, &CrashReplayCheckpoint) -> Result<(), CrashReplayError>,
506 Restart: FnMut(&mut S) -> Result<(), CrashReplayError>,
507 Check: FnMut(&S, &CrashReplayCheckpoint) -> Vec<CrashReplayInvariant>,
508{
509 checkpoints.sort_by_key(|checkpoint| checkpoint.ordinal);
510 let mut report = CrashReplayReport {
511 schema_version: CRASH_REPLAY_SCHEMA_VERSION.to_string(),
512 scenario_id: scenario_id.into(),
513 state_machine: state_machine.into(),
514 verdict: CrashReplayVerdict::Clean,
515 checkpoints: checkpoints.clone(),
516 events: Vec::new(),
517 invariants: Vec::new(),
518 };
519
520 if checkpoints.is_empty() {
521 report.verdict = CrashReplayVerdict::Failed;
522 return report;
523 }
524
525 for checkpoint in checkpoints {
526 let mut state = match make_state() {
527 Ok(state) => state,
528 Err(err) => {
529 report.verdict = CrashReplayVerdict::Failed;
530 report.events.push(CrashReplayEvent::failed(
531 &checkpoint,
532 CrashReplayPhase::AdvanceToCheckpoint,
533 format!("failed creating fresh state: {err}"),
534 ));
535 continue;
536 }
537 };
538
539 match advance_to_checkpoint(&mut state, &checkpoint) {
540 Ok(()) => report.events.push(CrashReplayEvent::ok(
541 &checkpoint,
542 CrashReplayPhase::AdvanceToCheckpoint,
543 "advanced to checkpoint",
544 )),
545 Err(err) => {
546 report.verdict = CrashReplayVerdict::Failed;
547 report.events.push(CrashReplayEvent::failed(
548 &checkpoint,
549 CrashReplayPhase::AdvanceToCheckpoint,
550 err.to_string(),
551 ));
552 continue;
553 }
554 }
555
556 report.events.push(CrashReplayEvent::ok(
557 &checkpoint,
558 CrashReplayPhase::InjectCrash,
559 "simulated process stop at named checkpoint",
560 ));
561
562 match restart(&mut state) {
563 Ok(()) => report.events.push(CrashReplayEvent::ok(
564 &checkpoint,
565 CrashReplayPhase::Restart,
566 "restart action completed",
567 )),
568 Err(err) => {
569 report.verdict = CrashReplayVerdict::Failed;
570 report.events.push(CrashReplayEvent::failed(
571 &checkpoint,
572 CrashReplayPhase::Restart,
573 err.to_string(),
574 ));
575 continue;
576 }
577 }
578
579 let invariants = check_invariants(&state, &checkpoint);
580 if invariants.is_empty() {
581 report.verdict = CrashReplayVerdict::Failed;
582 report.events.push(CrashReplayEvent::failed(
583 &checkpoint,
584 CrashReplayPhase::CheckInvariants,
585 "checkpoint produced no invariants",
586 ));
587 continue;
588 }
589
590 let failed = invariants.iter().any(|invariant| !invariant.passed);
591 if failed {
592 report.verdict = CrashReplayVerdict::Failed;
593 }
594 report.events.push(CrashReplayEvent {
595 checkpoint_id: checkpoint.id.clone(),
596 phase: CrashReplayPhase::CheckInvariants,
597 ok: !failed,
598 detail: format!("{} invariant(s) evaluated", invariants.len()),
599 });
600 report.invariants.extend(invariants);
601 }
602
603 report
604}
605
606#[cfg(test)]
607mod tests {
608 use super::*;
609 use crate::policy_registry::{
610 PolicyControllerStatus, PolicyFallbackState, policy_registry_snapshot,
611 };
612 use crate::search::policy::{
613 CHUNKING_STRATEGY_VERSION, SEMANTIC_SCHEMA_VERSION, SemanticPolicy,
614 };
615 use crate::search::semantic_manifest::{
616 ArtifactRecord, BuildCheckpoint, SemanticManifest, TierKind,
617 };
618 use serde_json::{Value, json};
619 use std::path::PathBuf;
620 use tempfile::TempDir;
621
622 #[derive(Debug)]
623 struct SemanticReplayState {
624 temp_dir: TempDir,
625 loaded: Option<SemanticManifest>,
626 }
627
628 impl SemanticReplayState {
629 fn data_dir(&self) -> &Path {
630 self.temp_dir.path()
631 }
632 }
633
634 fn semantic_checkpoint() -> BuildCheckpoint {
635 BuildCheckpoint {
636 tier: TierKind::Fast,
637 embedder_id: "fnv1a-384".to_string(),
638 last_offset: 8,
639 docs_embedded: 13,
640 conversations_processed: 2,
641 total_conversations: 5,
642 db_fingerprint: "semantic-fp".to_string(),
643 schema_version: SEMANTIC_SCHEMA_VERSION,
644 chunking_version: CHUNKING_STRATEGY_VERSION,
645 saved_at_ms: 1_700_000_000_000,
646 }
647 }
648
649 fn semantic_artifact() -> ArtifactRecord {
650 ArtifactRecord {
651 tier: TierKind::Fast,
652 embedder_id: "fnv1a-384".to_string(),
653 model_revision: "hash".to_string(),
654 schema_version: SEMANTIC_SCHEMA_VERSION,
655 chunking_version: CHUNKING_STRATEGY_VERSION,
656 dimension: 384,
657 doc_count: 13,
658 conversation_count: 5,
659 db_fingerprint: "semantic-fp".to_string(),
660 index_path: "vector_index/fast.fsvi".to_string(),
661 size_bytes: 4096,
662 started_at_ms: 1_700_000_000_000,
663 completed_at_ms: 1_700_000_060_000,
664 ready: true,
665 }
666 }
667
668 #[test]
669 fn semantic_manifest_state_machine_replays_checkpoint_and_publish_crashes() {
670 let checkpoints = vec![
671 CrashReplayCheckpoint::new(
672 10,
673 "semantic_checkpoint_saved",
674 "semantic checkpoint persisted before artifact publish",
675 ),
676 CrashReplayCheckpoint::new(
677 20,
678 "semantic_artifact_published",
679 "semantic artifact published and checkpoint cleared",
680 ),
681 ];
682
683 let report =
684 replay_named_checkpoints(
685 "semantic-manifest-save-restart",
686 "semantic_manifest",
687 checkpoints,
688 || {
689 Ok(SemanticReplayState {
690 temp_dir: tempfile::tempdir()
691 .map_err(|err| CrashReplayError::from_error("create tempdir", err))?,
692 loaded: None,
693 })
694 },
695 |state, checkpoint| {
696 let mut manifest = SemanticManifest::default();
697 manifest.refresh_backlog(5, "semantic-fp");
698 manifest.save_checkpoint(semantic_checkpoint());
699 if checkpoint.id == "semantic_artifact_published" {
700 manifest.publish_artifact(semantic_artifact());
701 }
702 manifest
703 .save(state.data_dir())
704 .map_err(|err| CrashReplayError::from_error("save semantic manifest", err))
705 },
706 |state| {
707 state.loaded = SemanticManifest::load(state.data_dir()).map_err(|err| {
708 CrashReplayError::from_error("load semantic manifest", err)
709 })?;
710 Ok(())
711 },
712 |state, checkpoint| {
713 let mut invariants = Vec::new();
714 let Some(manifest) = &state.loaded else {
715 return vec![CrashReplayInvariant::failed(
716 checkpoint,
717 "semantic_manifest_loaded",
718 "manifest did not load after restart",
719 )];
720 };
721
722 invariants.push(CrashReplayInvariant::passed(
723 checkpoint,
724 "semantic_manifest_loaded",
725 "manifest loaded after restart",
726 ));
727 match checkpoint.id.as_str() {
728 "semantic_checkpoint_saved" => {
729 invariants.push(if manifest.checkpoint.is_some()
730 && manifest.fast_tier.is_none()
731 {
732 CrashReplayInvariant::passed(
733 checkpoint,
734 "checkpoint_without_torn_artifact",
735 "restart sees resumable checkpoint and no half-published artifact",
736 )
737 } else {
738 CrashReplayInvariant::failed(
739 checkpoint,
740 "checkpoint_without_torn_artifact",
741 format!(
742 "checkpoint={:?} fast_tier={:?}",
743 manifest.checkpoint, manifest.fast_tier
744 ),
745 )
746 });
747 }
748 "semantic_artifact_published" => {
749 invariants.push(if manifest.checkpoint.is_none()
750 && manifest.fast_tier.as_ref().is_some_and(|artifact| artifact.ready)
751 {
752 CrashReplayInvariant::passed(
753 checkpoint,
754 "published_artifact_clears_checkpoint",
755 "restart sees ready artifact and no stale matching checkpoint",
756 )
757 } else {
758 CrashReplayInvariant::failed(
759 checkpoint,
760 "published_artifact_clears_checkpoint",
761 format!(
762 "checkpoint={:?} fast_tier={:?}",
763 manifest.checkpoint, manifest.fast_tier
764 ),
765 )
766 });
767 }
768 _ => invariants.push(CrashReplayInvariant::failed(
769 checkpoint,
770 "known_checkpoint",
771 "unexpected semantic checkpoint",
772 )),
773 }
774 invariants
775 },
776 );
777
778 assert_eq!(report.verdict, CrashReplayVerdict::Clean);
779 assert_eq!(report.checkpoints.len(), 2);
780 assert_eq!(report.invariants.len(), 4);
781 assert!(
782 report.validate().is_ok(),
783 "semantic replay report should validate: {report:?}"
784 );
785 }
786
787 #[derive(Debug)]
788 struct PolicyReplayState {
789 pipeline: Value,
790 semantic_available: bool,
791 semantic_fallback_mode: Option<&'static str>,
792 snapshot_statuses: Vec<(String, PolicyControllerStatus, PolicyFallbackState)>,
793 }
794
795 fn policy_pipeline_fixture(mode: &str, reason: &str) -> Value {
796 json!({
797 "pipeline_channel_size": 128,
798 "pipeline_max_message_bytes_in_flight": 1048576,
799 "page_prep_workers": 12,
800 "staged_merge_workers": 4,
801 "staged_shard_builders": 8,
802 "controller_mode": "auto",
803 "controller_restore_clear_samples": 3,
804 "controller_restore_hold_ms": 5000,
805 "controller_loadavg_high_watermark_1m": 1.75,
806 "controller_loadavg_low_watermark_1m": 0.75,
807 "runtime": {
808 "controller_mode": mode,
809 "controller_reason": reason
810 }
811 })
812 }
813
814 #[test]
815 fn policy_registry_state_machine_replays_deterministic_controller_snapshots() {
816 let checkpoints = vec![
817 CrashReplayCheckpoint::new(
818 10,
819 "semantic_fallback_snapshot",
820 "semantic controller reports lexical fallback",
821 ),
822 CrashReplayCheckpoint::new(
823 20,
824 "lexical_throttle_snapshot",
825 "lexical rebuild controller reports pressure fallback",
826 ),
827 ];
828
829 let report = replay_named_checkpoints(
830 "policy-registry-recompute-restart",
831 "policy_registry",
832 checkpoints,
833 || {
834 Ok(PolicyReplayState {
835 pipeline: policy_pipeline_fixture("steady", "pipeline settings active"),
836 semantic_available: true,
837 semantic_fallback_mode: None,
838 snapshot_statuses: Vec::new(),
839 })
840 },
841 |state, checkpoint| {
842 match checkpoint.id.as_str() {
843 "semantic_fallback_snapshot" => {
844 state.semantic_available = false;
845 state.semantic_fallback_mode = Some("lexical");
846 }
847 "lexical_throttle_snapshot" => {
848 state.pipeline =
849 policy_pipeline_fixture("throttled", "load pressure reduced workers");
850 }
851 _ => {
852 return Err(CrashReplayError::new(
853 "advance policy checkpoint",
854 "unknown checkpoint",
855 ));
856 }
857 }
858 Ok(())
859 },
860 |state| {
861 let policy = SemanticPolicy::compiled_defaults();
862 let snapshot = policy_registry_snapshot(
863 &policy,
864 state.semantic_available,
865 state.semantic_fallback_mode,
866 &state.pipeline,
867 );
868 state.snapshot_statuses = snapshot
869 .controllers
870 .into_iter()
871 .map(|controller| {
872 (
873 controller.controller_id,
874 controller.status,
875 controller.fallback_state,
876 )
877 })
878 .collect();
879 Ok(())
880 },
881 |state, checkpoint| {
882 let ids: Vec<_> = state
883 .snapshot_statuses
884 .iter()
885 .map(|(id, _, _)| id.as_str())
886 .collect();
887 let mut invariants =
888 vec![if ids == ["lexical_rebuild_pipeline", "semantic_search"] {
889 CrashReplayInvariant::passed(
890 checkpoint,
891 "controller_ids_sorted",
892 "controller ids are deterministic and sorted",
893 )
894 } else {
895 CrashReplayInvariant::failed(
896 checkpoint,
897 "controller_ids_sorted",
898 format!("unexpected controller ids: {ids:?}"),
899 )
900 }];
901
902 let expected_controller = match checkpoint.id.as_str() {
903 "semantic_fallback_snapshot" => "semantic_search",
904 "lexical_throttle_snapshot" => "lexical_rebuild_pipeline",
905 _ => "unknown",
906 };
907 let controller = state
908 .snapshot_statuses
909 .iter()
910 .find(|(id, _, _)| id == expected_controller);
911 invariants.push(match controller {
912 Some((
913 _id,
914 PolicyControllerStatus::Fallback,
915 PolicyFallbackState::Conservative,
916 )) => CrashReplayInvariant::passed(
917 checkpoint,
918 "conservative_fallback_reported",
919 "checkpoint recompute reports conservative fallback",
920 ),
921 other => CrashReplayInvariant::failed(
922 checkpoint,
923 "conservative_fallback_reported",
924 format!("unexpected controller status: {other:?}"),
925 ),
926 });
927 invariants
928 },
929 );
930
931 assert_eq!(report.verdict, CrashReplayVerdict::Clean);
932 assert!(
933 report.validate().is_ok(),
934 "policy replay report should validate: {report:?}"
935 );
936 }
937
938 #[derive(Debug)]
939 struct LexicalPublishFixtureState {
940 temp_dir: TempDir,
941 live_path: PathBuf,
942 staged_path: PathBuf,
943 backup_path: PathBuf,
944 }
945
946 impl LexicalPublishFixtureState {
947 fn new() -> Result<Self, CrashReplayError> {
948 let temp_dir = tempfile::tempdir()
949 .map_err(|err| CrashReplayError::from_error("create tempdir", err))?;
950 let live_path = temp_dir.path().join("live-generation.txt");
951 let staged_path = temp_dir.path().join("staged-generation.txt");
952 let backup_path = temp_dir.path().join("live-generation.bak");
953 fs::write(&live_path, "old-generation")
954 .map_err(|err| CrashReplayError::from_error("seed live generation", err))?;
955 Ok(Self {
956 temp_dir,
957 live_path,
958 staged_path,
959 backup_path,
960 })
961 }
962
963 fn write_staged(&self) -> Result<(), CrashReplayError> {
964 fs::write(&self.staged_path, "new-generation")
965 .map_err(|err| CrashReplayError::from_error("write staged generation", err))
966 }
967
968 fn park_live(&self) -> Result<(), CrashReplayError> {
969 fs::rename(&self.live_path, &self.backup_path)
970 .map_err(|err| CrashReplayError::from_error("park live generation", err))
971 }
972
973 fn publish_staged(&self) -> Result<(), CrashReplayError> {
974 fs::rename(&self.staged_path, &self.live_path)
975 .map_err(|err| CrashReplayError::from_error("publish staged generation", err))
976 }
977 }
978
979 #[test]
980 fn lexical_publish_fixture_replays_park_and_swap_crash_windows() {
981 let checkpoints = vec![
982 CrashReplayCheckpoint::new(
983 10,
984 "staged_written",
985 "staged generation exists before live path is touched",
986 ),
987 CrashReplayCheckpoint::new(
988 20,
989 "live_parked",
990 "live generation has been parked but staged is not yet live",
991 ),
992 CrashReplayCheckpoint::new(
993 30,
994 "staged_published",
995 "staged generation has been promoted to live",
996 ),
997 ];
998
999 let report = replay_named_checkpoints(
1000 "lexical-publish-fixture-restart",
1001 "lexical_publish",
1002 checkpoints,
1003 LexicalPublishFixtureState::new,
1004 |state, checkpoint| {
1005 state.write_staged()?;
1006 match checkpoint.id.as_str() {
1007 "staged_written" => {}
1008 "live_parked" => {
1009 state.park_live()?;
1010 }
1011 "staged_published" => {
1012 state.park_live()?;
1013 state.publish_staged()?;
1014 }
1015 _ => {
1016 return Err(CrashReplayError::new(
1017 "advance lexical publish checkpoint",
1018 "unknown checkpoint",
1019 ));
1020 }
1021 }
1022 Ok(())
1023 },
1024 |state| {
1025 if !state.live_path.exists() && state.backup_path.exists() {
1026 fs::rename(&state.backup_path, &state.live_path)
1027 .map_err(|err| CrashReplayError::from_error("restore parked live", err))?;
1028 }
1029 Ok(())
1030 },
1031 |state, checkpoint| {
1032 let live = fs::read_to_string(&state.live_path).ok();
1033 let expected = match checkpoint.id.as_str() {
1034 "staged_written" | "live_parked" => "old-generation",
1035 "staged_published" => "new-generation",
1036 _ => "unknown",
1037 };
1038
1039 vec![
1040 if state.temp_dir.path().exists() {
1041 CrashReplayInvariant::passed(
1042 checkpoint,
1043 "fixture_root_retained",
1044 "fixture root remains available for artifact inspection",
1045 )
1046 } else {
1047 CrashReplayInvariant::failed(
1048 checkpoint,
1049 "fixture_root_retained",
1050 "fixture root disappeared before invariant checks",
1051 )
1052 },
1053 if live.as_deref() == Some(expected) {
1054 CrashReplayInvariant::passed(
1055 checkpoint,
1056 "live_generation_is_old_or_new",
1057 format!("live generation recovered as {expected}"),
1058 )
1059 } else {
1060 CrashReplayInvariant::failed(
1061 checkpoint,
1062 "live_generation_is_old_or_new",
1063 format!("expected {expected}, got {live:?}"),
1064 )
1065 },
1066 ]
1067 },
1068 );
1069
1070 assert_eq!(report.verdict, CrashReplayVerdict::Clean);
1071 assert!(
1072 report.validate().is_ok(),
1073 "lexical publish replay report should validate: {report:?}"
1074 );
1075 }
1076
1077 #[derive(Debug)]
1078 struct BackupRecoveryFixtureState {
1079 temp_dir: TempDir,
1080 canonical_db: PathBuf,
1081 backup_dir: PathBuf,
1082 manifest: Option<Value>,
1083 }
1084
1085 impl BackupRecoveryFixtureState {
1086 fn new() -> Result<Self, CrashReplayError> {
1087 let temp_dir = tempfile::tempdir()
1088 .map_err(|err| CrashReplayError::from_error("create tempdir", err))?;
1089 let canonical_db = temp_dir.path().join("cass.db");
1090 let backup_dir = temp_dir.path().join("backup");
1091 fs::write(&canonical_db, "canonical-main")
1092 .map_err(|err| CrashReplayError::from_error("seed canonical db", err))?;
1093 fs::write(temp_dir.path().join("cass.db-wal"), "canonical-wal")
1094 .map_err(|err| CrashReplayError::from_error("seed canonical wal", err))?;
1095 fs::create_dir_all(&backup_dir)
1096 .map_err(|err| CrashReplayError::from_error("create backup dir", err))?;
1097 Ok(Self {
1098 temp_dir,
1099 canonical_db,
1100 backup_dir,
1101 manifest: None,
1102 })
1103 }
1104
1105 fn copy_main(&self) -> Result<(), CrashReplayError> {
1106 fs::copy(&self.canonical_db, self.backup_dir.join("cass.db"))
1107 .map(|_| ())
1108 .map_err(|err| CrashReplayError::from_error("copy backup main", err))
1109 }
1110
1111 fn copy_wal_and_manifest(&self) -> Result<(), CrashReplayError> {
1112 fs::copy(
1113 self.temp_dir.path().join("cass.db-wal"),
1114 self.backup_dir.join("cass.db-wal"),
1115 )
1116 .map_err(|err| CrashReplayError::from_error("copy backup wal", err))?;
1117 let manifest = json!({
1118 "schema_version": 1,
1119 "complete": true,
1120 "files": ["cass.db", "cass.db-wal"],
1121 });
1122 let bytes = serde_json::to_vec_pretty(&manifest)
1123 .map_err(|err| CrashReplayError::from_error("encode backup manifest", err))?;
1124 fs::write(self.backup_dir.join("manifest.json"), bytes)
1125 .map_err(|err| CrashReplayError::from_error("write backup manifest", err))
1126 }
1127 }
1128
1129 #[test]
1130 fn backup_recovery_fixture_replays_incomplete_and_complete_bundle_crashes() {
1131 let checkpoints = vec![
1132 CrashReplayCheckpoint::new(
1133 10,
1134 "backup_main_copied",
1135 "backup main file copied before bundle manifest exists",
1136 ),
1137 CrashReplayCheckpoint::new(
1138 20,
1139 "backup_manifest_written",
1140 "backup sidecars and manifest mark the bundle complete",
1141 ),
1142 ];
1143
1144 let report = replay_named_checkpoints(
1145 "backup-recovery-fixture-restart",
1146 "backup_recovery",
1147 checkpoints,
1148 BackupRecoveryFixtureState::new,
1149 |state, checkpoint| {
1150 state.copy_main()?;
1151 match checkpoint.id.as_str() {
1152 "backup_main_copied" => {}
1153 "backup_manifest_written" => {
1154 state.copy_wal_and_manifest()?;
1155 }
1156 _ => {
1157 return Err(CrashReplayError::new(
1158 "advance backup recovery checkpoint",
1159 "unknown checkpoint",
1160 ));
1161 }
1162 }
1163 Ok(())
1164 },
1165 |state| {
1166 let manifest_path = state.backup_dir.join("manifest.json");
1167 state.manifest = if manifest_path.exists() {
1168 let bytes = fs::read(&manifest_path)
1169 .map_err(|err| CrashReplayError::from_error("read backup manifest", err))?;
1170 Some(serde_json::from_slice(&bytes).map_err(|err| {
1171 CrashReplayError::from_error("parse backup manifest", err)
1172 })?)
1173 } else {
1174 None
1175 };
1176 Ok(())
1177 },
1178 |state, checkpoint| {
1179 let canonical = fs::read_to_string(&state.canonical_db).ok();
1180 let mut invariants = vec![if canonical.as_deref() == Some("canonical-main") {
1181 CrashReplayInvariant::passed(
1182 checkpoint,
1183 "canonical_db_preserved",
1184 "restart did not replace the canonical DB from an incomplete backup",
1185 )
1186 } else {
1187 CrashReplayInvariant::failed(
1188 checkpoint,
1189 "canonical_db_preserved",
1190 format!("unexpected canonical DB content: {canonical:?}"),
1191 )
1192 }];
1193
1194 match checkpoint.id.as_str() {
1195 "backup_main_copied" => {
1196 invariants.push(if state.manifest.is_none() {
1197 CrashReplayInvariant::passed(
1198 checkpoint,
1199 "partial_backup_not_marked_complete",
1200 "main-only backup has no manifest and is not advertised recoverable",
1201 )
1202 } else {
1203 CrashReplayInvariant::failed(
1204 checkpoint,
1205 "partial_backup_not_marked_complete",
1206 format!("unexpected manifest: {:?}", state.manifest),
1207 )
1208 });
1209 }
1210 "backup_manifest_written" => {
1211 let complete = state
1212 .manifest
1213 .as_ref()
1214 .and_then(|manifest| manifest.get("complete"))
1215 .and_then(Value::as_bool)
1216 == Some(true);
1217 let files_match = state
1218 .manifest
1219 .as_ref()
1220 .and_then(|manifest| manifest.get("files"))
1221 .and_then(Value::as_array)
1222 .map(|files| {
1223 let mut names = files.iter().filter_map(Value::as_str);
1224 matches!(
1225 (names.next(), names.next(), names.next()),
1226 (Some("cass.db"), Some("cass.db-wal"), None)
1227 )
1228 })
1229 == Some(true);
1230 let wal_exists = state.backup_dir.join("cass.db-wal").exists();
1231 invariants.push(if complete && files_match && wal_exists {
1232 CrashReplayInvariant::passed(
1233 checkpoint,
1234 "complete_backup_manifest_matches_sidecars",
1235 "complete manifest is present only with expected sidecars",
1236 )
1237 } else {
1238 CrashReplayInvariant::failed(
1239 checkpoint,
1240 "complete_backup_manifest_matches_sidecars",
1241 format!(
1242 "complete={complete} files_match={files_match} wal_exists={wal_exists}"
1243 ),
1244 )
1245 });
1246 }
1247 _ => invariants.push(CrashReplayInvariant::failed(
1248 checkpoint,
1249 "known_backup_checkpoint",
1250 "unexpected backup checkpoint",
1251 )),
1252 }
1253 invariants
1254 },
1255 );
1256
1257 assert_eq!(report.verdict, CrashReplayVerdict::Clean);
1258 assert!(
1259 report.validate().is_ok(),
1260 "backup recovery replay report should validate: {report:?}"
1261 );
1262 }
1263
1264 #[test]
1265 fn crash_replay_report_round_trips_as_artifact_manifest()
1266 -> Result<(), Box<dyn std::error::Error>> {
1267 let temp_dir = tempfile::tempdir()?;
1268 let path = temp_dir
1269 .path()
1270 .join("artifacts/crash-replay/crash-replay-report.json");
1271 let checkpoints = vec![CrashReplayCheckpoint::new(
1272 1,
1273 "only_checkpoint",
1274 "single checkpoint for artifact round-trip",
1275 )];
1276 let report = replay_named_checkpoints(
1277 "artifact-round-trip",
1278 "harness",
1279 checkpoints,
1280 || Ok(()),
1281 |_state, _checkpoint| Ok(()),
1282 |_state| Ok(()),
1283 |_state, checkpoint| {
1284 vec![CrashReplayInvariant::passed(
1285 checkpoint,
1286 "round_trip_invariant",
1287 "round-trip invariant passed",
1288 )]
1289 },
1290 );
1291
1292 report.save_json(&path)?;
1293 let loaded = CrashReplayReport::load_json(&path)?;
1294
1295 assert_eq!(loaded, report);
1296 Ok(())
1297 }
1298
1299 #[test]
1300 fn crash_replay_validation_rejects_untrustworthy_clean_reports() {
1301 let checkpoint = CrashReplayCheckpoint::new(1, "checkpoint", "validation checkpoint");
1302 let report = CrashReplayReport {
1303 schema_version: CRASH_REPLAY_SCHEMA_VERSION.to_string(),
1304 scenario_id: "bad-clean-report".to_string(),
1305 state_machine: "harness".to_string(),
1306 verdict: CrashReplayVerdict::Clean,
1307 checkpoints: vec![checkpoint.clone()],
1308 events: vec![CrashReplayEvent {
1309 checkpoint_id: checkpoint.id.clone(),
1310 phase: CrashReplayPhase::CheckInvariants,
1311 ok: true,
1312 detail: "checked".to_string(),
1313 }],
1314 invariants: vec![CrashReplayInvariant::failed(
1315 &checkpoint,
1316 "must_not_fail",
1317 "intentional validation failure",
1318 )],
1319 };
1320
1321 assert!(matches!(
1322 report.validate(),
1323 Err(CrashReplayValidationError::CleanReportContainsFailure)
1324 ));
1325
1326 let duplicate_checkpoint = CrashReplayCheckpoint {
1327 ordinal: 2,
1328 ..checkpoint.clone()
1329 };
1330 let duplicate_report = CrashReplayReport {
1331 checkpoints: vec![checkpoint.clone(), duplicate_checkpoint],
1332 ..report.clone()
1333 };
1334 assert!(matches!(
1335 duplicate_report.validate(),
1336 Err(CrashReplayValidationError::DuplicateCheckpointId { .. })
1337 ));
1338
1339 let missing_check_event_report = CrashReplayReport {
1340 events: vec![CrashReplayEvent {
1341 checkpoint_id: checkpoint.id.clone(),
1342 phase: CrashReplayPhase::AdvanceToCheckpoint,
1343 ok: true,
1344 detail: "advanced".to_string(),
1345 }],
1346 invariants: vec![CrashReplayInvariant::passed(
1347 &checkpoint,
1348 "passing_but_unchecked",
1349 "invariant exists but no check event proves it ran",
1350 )],
1351 ..report
1352 };
1353 assert!(matches!(
1354 missing_check_event_report.validate(),
1355 Err(CrashReplayValidationError::CleanReportMissingCheckpointEvent { .. })
1356 ));
1357 }
1358}