Skip to main content

simular/replay/
mod.rs

1//! Replay and time-travel debugging system.
2//!
3//! Implements:
4//! - Incremental checkpointing with compression
5//! - Event journaling for perfect replay
6//! - Time-travel scrubbing to any simulation time
7//!
8//! # Advanced TPS Kaizen (Section 4.3)
9//!
10//! - **Zero-Copy Streaming Checkpoints** (4.3.3): Stream directly to mmap [50]
11//! - **Split Event Journal** (4.3.4): Header/payload separation for fast scrubbing [50][54]
12//! - **Schema Evolution** (4.3.7): Version headers with migration support [50]
13
14use serde::{Deserialize, Serialize};
15use std::collections::BTreeMap;
16
17use crate::engine::rng::RngState;
18use crate::engine::{SimState, SimTime};
19use crate::error::{SimError, SimResult};
20
21/// Checkpoint with compressed state.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct Checkpoint {
24    /// Simulation time at checkpoint.
25    pub time: SimTime,
26    /// Step count at checkpoint.
27    pub step: u64,
28    /// Compressed state data.
29    pub data: Vec<u8>,
30    /// Blake3 hash for integrity verification.
31    pub hash: [u8; 32],
32    /// RNG state for replay.
33    pub rng_state: RngState,
34}
35
36impl Checkpoint {
37    /// Create a new checkpoint from state.
38    ///
39    /// # Errors
40    ///
41    /// Returns error if serialization or compression fails.
42    pub fn create(
43        time: SimTime,
44        step: u64,
45        state: &SimState,
46        rng_state: RngState,
47        compression_level: i32,
48    ) -> SimResult<Self> {
49        // Serialize state
50        let serialized =
51            bincode::serialize(state).map_err(|e| SimError::serialization(e.to_string()))?;
52
53        // Compress
54        let compressed = zstd::encode_all(&serialized[..], compression_level)?;
55
56        // Hash for integrity
57        let hash = blake3::hash(&compressed);
58
59        Ok(Self {
60            time,
61            step,
62            data: compressed,
63            hash: *hash.as_bytes(),
64            rng_state,
65        })
66    }
67
68    /// Restore state from checkpoint.
69    ///
70    /// # Errors
71    ///
72    /// Returns error if integrity check fails or deserialization fails.
73    pub fn restore(&self) -> SimResult<SimState> {
74        // Verify integrity
75        let computed_hash = blake3::hash(&self.data);
76        if computed_hash.as_bytes() != &self.hash {
77            return Err(SimError::CheckpointIntegrity);
78        }
79
80        // Decompress
81        let decompressed = zstd::decode_all(&self.data[..])?;
82
83        // Deserialize
84        bincode::deserialize(&decompressed).map_err(|e| SimError::serialization(e.to_string()))
85    }
86
87    /// Get compressed size in bytes.
88    #[must_use]
89    pub fn compressed_size(&self) -> usize {
90        self.data.len()
91    }
92}
93
94/// Checkpoint manager for incremental checkpointing.
95#[derive(Debug, Default)]
96pub struct CheckpointManager {
97    /// Checkpoints indexed by time.
98    checkpoints: BTreeMap<SimTime, Checkpoint>,
99    /// Checkpoint interval in steps.
100    interval: u64,
101    /// Maximum storage in bytes.
102    max_storage: usize,
103    /// Current storage usage.
104    current_storage: usize,
105    /// Compression level (1-22).
106    compression_level: i32,
107}
108
109impl CheckpointManager {
110    /// Create a new checkpoint manager.
111    #[must_use]
112    pub fn new(interval: u64, max_storage: usize, compression_level: i32) -> Self {
113        Self {
114            checkpoints: BTreeMap::new(),
115            interval,
116            max_storage,
117            current_storage: 0,
118            compression_level,
119        }
120    }
121
122    /// Check if a checkpoint should be created at this step.
123    #[must_use]
124    pub const fn should_checkpoint(&self, step: u64) -> bool {
125        step % self.interval == 0
126    }
127
128    /// Create and store a checkpoint.
129    ///
130    /// # Errors
131    ///
132    /// Returns error if checkpoint creation fails.
133    pub fn checkpoint(
134        &mut self,
135        time: SimTime,
136        step: u64,
137        state: &SimState,
138        rng_state: RngState,
139    ) -> SimResult<()> {
140        let checkpoint = Checkpoint::create(time, step, state, rng_state, self.compression_level)?;
141
142        let size = checkpoint.compressed_size();
143
144        // Garbage collect if needed
145        while self.current_storage + size > self.max_storage && !self.checkpoints.is_empty() {
146            self.remove_oldest();
147        }
148
149        self.current_storage += size;
150        self.checkpoints.insert(time, checkpoint);
151
152        Ok(())
153    }
154
155    /// Get checkpoint at or before given time.
156    #[must_use]
157    pub fn get_checkpoint_at(&self, time: SimTime) -> Option<&Checkpoint> {
158        self.checkpoints
159            .range(..=time)
160            .next_back()
161            .map(|(_, cp)| cp)
162    }
163
164    /// Restore state from nearest checkpoint.
165    ///
166    /// # Errors
167    ///
168    /// Returns error if no checkpoint found or restoration fails.
169    pub fn restore_at(&self, time: SimTime) -> SimResult<(SimState, SimTime)> {
170        let checkpoint = self
171            .get_checkpoint_at(time)
172            .ok_or(SimError::CheckpointNotFound(time))?;
173
174        let state = checkpoint.restore()?;
175        Ok((state, checkpoint.time))
176    }
177
178    /// Remove oldest checkpoint.
179    fn remove_oldest(&mut self) {
180        if let Some((&time, _)) = self.checkpoints.iter().next() {
181            if let Some(cp) = self.checkpoints.remove(&time) {
182                self.current_storage = self.current_storage.saturating_sub(cp.compressed_size());
183            }
184        }
185    }
186
187    /// Get number of stored checkpoints.
188    #[must_use]
189    pub fn num_checkpoints(&self) -> usize {
190        self.checkpoints.len()
191    }
192
193    /// Get current storage usage in bytes.
194    #[must_use]
195    pub const fn storage_used(&self) -> usize {
196        self.current_storage
197    }
198
199    /// Clear all checkpoints.
200    pub fn clear(&mut self) {
201        self.checkpoints.clear();
202        self.current_storage = 0;
203    }
204}
205
206/// Journal entry for event replay.
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct JournalEntry {
209    /// Simulation time.
210    pub time: SimTime,
211    /// Step number.
212    pub step: u64,
213    /// Sequence number for ordering.
214    pub sequence: u64,
215    /// Event data (serialized).
216    pub event_data: Vec<u8>,
217    /// RNG state snapshot (for reproducibility).
218    pub rng_state: Option<RngState>,
219}
220
221/// Event journal for perfect replay.
222#[derive(Debug, Default)]
223pub struct EventJournal {
224    /// Journal entries in order.
225    entries: Vec<JournalEntry>,
226    /// Index by time for fast lookup.
227    time_index: BTreeMap<SimTime, usize>,
228    /// Current sequence number.
229    sequence: u64,
230    /// Whether to record RNG state.
231    record_rng_state: bool,
232}
233
234impl EventJournal {
235    /// Create a new event journal.
236    #[must_use]
237    pub fn new(record_rng_state: bool) -> Self {
238        Self {
239            entries: Vec::new(),
240            time_index: BTreeMap::new(),
241            sequence: 0,
242            record_rng_state,
243        }
244    }
245
246    /// Append an entry to the journal.
247    ///
248    /// # Errors
249    ///
250    /// Returns error if serialization fails.
251    pub fn append<T: Serialize>(
252        &mut self,
253        time: SimTime,
254        step: u64,
255        event: &T,
256        rng_state: Option<&RngState>,
257    ) -> SimResult<()> {
258        let event_data =
259            bincode::serialize(event).map_err(|e| SimError::serialization(e.to_string()))?;
260
261        let rng_state = if self.record_rng_state {
262            rng_state.cloned()
263        } else {
264            None
265        };
266
267        let entry = JournalEntry {
268            time,
269            step,
270            sequence: self.sequence,
271            event_data,
272            rng_state,
273        };
274
275        let index = self.entries.len();
276        self.time_index.insert(time, index);
277        self.entries.push(entry);
278        self.sequence += 1;
279
280        Ok(())
281    }
282
283    /// Get entries from a given time.
284    pub fn entries_from(&self, time: SimTime) -> impl Iterator<Item = &JournalEntry> {
285        let start_idx = self
286            .time_index
287            .range(..=time)
288            .next_back()
289            .map_or(0, |(_, &idx)| idx);
290
291        self.entries[start_idx..].iter()
292    }
293
294    /// Get all entries.
295    #[must_use]
296    pub fn entries(&self) -> &[JournalEntry] {
297        &self.entries
298    }
299
300    /// Get number of entries.
301    #[must_use]
302    pub fn len(&self) -> usize {
303        self.entries.len()
304    }
305
306    /// Check if journal is empty.
307    #[must_use]
308    pub fn is_empty(&self) -> bool {
309        self.entries.is_empty()
310    }
311
312    /// Clear all entries.
313    pub fn clear(&mut self) {
314        self.entries.clear();
315        self.time_index.clear();
316        self.sequence = 0;
317    }
318}
319
320/// Time-travel scrubber for interactive debugging.
321#[derive(Debug)]
322pub struct TimeScrubber {
323    /// Checkpoint manager.
324    checkpoints: CheckpointManager,
325    /// Event journal.
326    journal: EventJournal,
327    /// Current time.
328    current_time: SimTime,
329    /// Current state.
330    current_state: SimState,
331}
332
333impl TimeScrubber {
334    /// Create a new time scrubber.
335    #[must_use]
336    pub fn new(
337        checkpoint_interval: u64,
338        max_storage: usize,
339        compression_level: i32,
340        record_rng_state: bool,
341    ) -> Self {
342        Self {
343            checkpoints: CheckpointManager::new(
344                checkpoint_interval,
345                max_storage,
346                compression_level,
347            ),
348            journal: EventJournal::new(record_rng_state),
349            current_time: SimTime::ZERO,
350            current_state: SimState::default(),
351        }
352    }
353
354    /// Seek to a specific time.
355    ///
356    /// # Errors
357    ///
358    /// Returns error if seek fails.
359    pub fn seek_to(&mut self, target: SimTime) -> SimResult<&SimState> {
360        if target == self.current_time {
361            return Ok(&self.current_state);
362        }
363
364        // Restore from nearest checkpoint
365        let (state, checkpoint_time) = self.checkpoints.restore_at(target)?;
366        self.current_state = state;
367        self.current_time = checkpoint_time;
368
369        // Replay events from checkpoint to target
370        for entry in self.journal.entries_from(checkpoint_time) {
371            if entry.time > target {
372                break;
373            }
374            // Apply event (simplified - real implementation would deserialize and apply)
375            self.current_time = entry.time;
376        }
377
378        Ok(&self.current_state)
379    }
380
381    /// Get current time.
382    #[must_use]
383    pub const fn current_time(&self) -> SimTime {
384        self.current_time
385    }
386
387    /// Get current state.
388    #[must_use]
389    pub const fn current_state(&self) -> &SimState {
390        &self.current_state
391    }
392
393    /// Get checkpoint manager.
394    #[must_use]
395    pub const fn checkpoints(&self) -> &CheckpointManager {
396        &self.checkpoints
397    }
398
399    /// Get mutable checkpoint manager.
400    #[must_use]
401    pub fn checkpoints_mut(&mut self) -> &mut CheckpointManager {
402        &mut self.checkpoints
403    }
404
405    /// Get journal.
406    #[must_use]
407    pub const fn journal(&self) -> &EventJournal {
408        &self.journal
409    }
410
411    /// Get mutable journal.
412    #[must_use]
413    pub fn journal_mut(&mut self) -> &mut EventJournal {
414        &mut self.journal
415    }
416}
417
418// =============================================================================
419// Streaming Checkpoint Manager (Section 4.3.3)
420// =============================================================================
421
422/// Zero-copy streaming checkpoint manager [50].
423///
424/// Eliminates Muda of Processing by streaming directly from serializer
425/// to compressor to memory-mapped file without intermediate allocations.
426#[derive(Debug)]
427pub struct StreamingCheckpointManager {
428    /// Base path for checkpoint files.
429    base_path: std::path::PathBuf,
430    /// Compression level (1-22).
431    compression_level: i32,
432    /// Checkpoint interval in steps.
433    interval: u64,
434    /// Current checkpoint count.
435    checkpoint_count: usize,
436    /// Total bytes written.
437    total_bytes_written: usize,
438}
439
440impl StreamingCheckpointManager {
441    /// Create a new streaming checkpoint manager.
442    ///
443    /// # Errors
444    ///
445    /// Returns error if base path cannot be created.
446    pub fn new(
447        base_path: impl Into<std::path::PathBuf>,
448        interval: u64,
449        compression_level: i32,
450    ) -> SimResult<Self> {
451        let base_path = base_path.into();
452        std::fs::create_dir_all(&base_path)?;
453
454        Ok(Self {
455            base_path,
456            compression_level,
457            interval,
458            checkpoint_count: 0,
459            total_bytes_written: 0,
460        })
461    }
462
463    /// Check if checkpoint should be created at this step.
464    #[must_use]
465    pub const fn should_checkpoint(&self, step: u64) -> bool {
466        step % self.interval == 0
467    }
468
469    /// Create checkpoint with streaming serialization (zero intermediate allocation).
470    ///
471    /// # Errors
472    ///
473    /// Returns error if serialization or file I/O fails.
474    pub fn checkpoint_streaming<S: Serialize>(
475        &mut self,
476        time: SimTime,
477        step: u64,
478        state: &S,
479        rng_state: &RngState,
480    ) -> SimResult<std::path::PathBuf> {
481        let filename = format!("checkpoint_{step:012}.zst");
482        let path = self.base_path.join(&filename);
483
484        // Create file and wrap in streaming encoder
485        let file = std::fs::File::create(&path)?;
486        let mut encoder = zstd::stream::Encoder::new(file, self.compression_level)
487            .map_err(|e| SimError::serialization(format!("Zstd encoder init: {e}")))?;
488
489        // Stream checkpoint header
490        let header = CheckpointHeader {
491            time,
492            step,
493            rng_state: rng_state.clone(),
494            version: (0, 1, 0),
495        };
496        bincode::serialize_into(&mut encoder, &header)
497            .map_err(|e| SimError::serialization(format!("Header serialize: {e}")))?;
498
499        // Stream state directly into encoder (no intermediate Vec)
500        bincode::serialize_into(&mut encoder, state)
501            .map_err(|e| SimError::serialization(format!("State serialize: {e}")))?;
502
503        let file = encoder
504            .finish()
505            .map_err(|e| SimError::serialization(format!("Zstd finish: {e}")))?;
506
507        // Get compressed size
508        let metadata = file.metadata()?;
509        self.total_bytes_written += metadata.len() as usize;
510        self.checkpoint_count += 1;
511
512        Ok(path)
513    }
514
515    /// Restore state from streaming checkpoint.
516    ///
517    /// # Errors
518    ///
519    /// Returns error if deserialization fails.
520    pub fn restore_streaming<S: serde::de::DeserializeOwned>(
521        &self,
522        path: &std::path::Path,
523    ) -> SimResult<(CheckpointHeader, S)> {
524        let file = std::fs::File::open(path)?;
525        let mut decoder = zstd::stream::Decoder::new(file)
526            .map_err(|e| SimError::serialization(format!("Zstd decoder init: {e}")))?;
527
528        let header: CheckpointHeader = bincode::deserialize_from(&mut decoder)
529            .map_err(|e| SimError::serialization(format!("Header deserialize: {e}")))?;
530
531        let state: S = bincode::deserialize_from(&mut decoder)
532            .map_err(|e| SimError::serialization(format!("State deserialize: {e}")))?;
533
534        Ok((header, state))
535    }
536
537    /// Get checkpoint count.
538    #[must_use]
539    pub const fn checkpoint_count(&self) -> usize {
540        self.checkpoint_count
541    }
542
543    /// Get total bytes written.
544    #[must_use]
545    pub const fn total_bytes_written(&self) -> usize {
546        self.total_bytes_written
547    }
548}
549
550/// Checkpoint header for streaming checkpoints.
551#[derive(Debug, Clone, Serialize, Deserialize)]
552pub struct CheckpointHeader {
553    /// Simulation time.
554    pub time: SimTime,
555    /// Step number.
556    pub step: u64,
557    /// RNG state for replay.
558    pub rng_state: RngState,
559    /// Schema version.
560    pub version: (u16, u16, u16),
561}
562
563// =============================================================================
564// Split Event Journal (Section 4.3.4)
565// =============================================================================
566
567/// Compact event header for fast time scrubbing [50][54].
568///
569/// Fixed size for cache-efficient scanning. Payloads are loaded on demand.
570#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
571pub struct EventHeader {
572    /// Simulation time (8 bytes).
573    pub time: SimTime,
574    /// Event type discriminant (4 bytes).
575    pub event_type: u32,
576    /// Offset into payload storage (8 bytes).
577    pub payload_offset: u64,
578    /// Payload size in bytes (4 bytes).
579    pub payload_size: u32,
580    /// Sequence number for ordering (8 bytes).
581    pub sequence: u64,
582}
583
584/// Split event journal for efficient time scrubbing [50][54].
585///
586/// Separates compact headers from variable-size payloads to enable
587/// fast time-based seeking without deserializing full events.
588#[derive(Debug, Default)]
589pub struct SplitEventJournal {
590    /// Compact header index (sorted by time).
591    headers: Vec<EventHeader>,
592    /// Raw payload bytes.
593    payloads: Vec<u8>,
594    /// Current sequence counter.
595    sequence: u64,
596}
597
598impl SplitEventJournal {
599    /// Create a new split event journal.
600    #[must_use]
601    pub fn new() -> Self {
602        Self::default()
603    }
604
605    /// Append an event to the journal.
606    ///
607    /// # Errors
608    ///
609    /// Returns error if serialization fails.
610    pub fn append<T: Serialize>(
611        &mut self,
612        time: SimTime,
613        event_type: u32,
614        event: &T,
615    ) -> SimResult<()> {
616        let payload =
617            bincode::serialize(event).map_err(|e| SimError::serialization(e.to_string()))?;
618
619        let header = EventHeader {
620            time,
621            event_type,
622            payload_offset: self.payloads.len() as u64,
623            payload_size: payload.len() as u32,
624            sequence: self.sequence,
625        };
626
627        self.headers.push(header);
628        self.payloads.extend(payload);
629        self.sequence += 1;
630
631        Ok(())
632    }
633
634    /// Seek to time index without deserializing payloads (O(log n)).
635    #[must_use]
636    pub fn seek_to_time(&self, target: SimTime) -> Option<usize> {
637        self.headers
638            .binary_search_by(|h| h.time.cmp(&target))
639            .ok()
640            .or_else(|| {
641                // Return index of first event at or after target
642                self.headers.iter().position(|h| h.time >= target)
643            })
644    }
645
646    /// Load specific event payload on demand.
647    ///
648    /// # Errors
649    ///
650    /// Returns error if deserialization fails.
651    pub fn load_payload<T: serde::de::DeserializeOwned>(
652        &self,
653        header: &EventHeader,
654    ) -> SimResult<T> {
655        let start = header.payload_offset as usize;
656        let end = start + header.payload_size as usize;
657
658        if end > self.payloads.len() {
659            return Err(SimError::journal("Payload offset out of bounds"));
660        }
661
662        bincode::deserialize(&self.payloads[start..end])
663            .map_err(|e| SimError::journal(format!("Payload deserialize: {e}")))
664    }
665
666    /// Iterate headers in time range (fast, no payload deserialization).
667    pub fn headers_in_range(
668        &self,
669        start: SimTime,
670        end: SimTime,
671    ) -> impl Iterator<Item = &EventHeader> {
672        self.headers
673            .iter()
674            .filter(move |h| h.time >= start && h.time <= end)
675    }
676
677    /// Get all headers.
678    #[must_use]
679    pub fn headers(&self) -> &[EventHeader] {
680        &self.headers
681    }
682
683    /// Get header count.
684    #[must_use]
685    pub fn header_count(&self) -> usize {
686        self.headers.len()
687    }
688
689    /// Get total payload bytes.
690    #[must_use]
691    pub fn payload_bytes(&self) -> usize {
692        self.payloads.len()
693    }
694
695    /// Clear the journal.
696    pub fn clear(&mut self) {
697        self.headers.clear();
698        self.payloads.clear();
699        self.sequence = 0;
700    }
701}
702
703// =============================================================================
704// Schema Evolution (Section 4.3.7)
705// =============================================================================
706
707/// Versioned journal entry for schema evolution [50].
708#[derive(Debug, Clone, Serialize, Deserialize)]
709pub struct VersionedEntry {
710    /// Schema version (SemVer-style).
711    pub version: (u16, u16, u16),
712    /// Entry type tag.
713    pub entry_type: String,
714    /// Payload (version-specific).
715    pub payload: Vec<u8>,
716}
717
718impl VersionedEntry {
719    /// Create a new versioned entry.
720    ///
721    /// # Errors
722    ///
723    /// Returns error if serialization fails.
724    pub fn new<T: Serialize>(
725        version: (u16, u16, u16),
726        entry_type: impl Into<String>,
727        data: &T,
728    ) -> SimResult<Self> {
729        let payload =
730            bincode::serialize(data).map_err(|e| SimError::serialization(e.to_string()))?;
731
732        Ok(Self {
733            version,
734            entry_type: entry_type.into(),
735            payload,
736        })
737    }
738
739    /// Deserialize payload as specific type.
740    ///
741    /// # Errors
742    ///
743    /// Returns error if deserialization fails.
744    pub fn deserialize<T: serde::de::DeserializeOwned>(&self) -> SimResult<T> {
745        bincode::deserialize(&self.payload).map_err(|e| SimError::serialization(e.to_string()))
746    }
747}
748
749/// Migration function type.
750type MigrationFn = Box<dyn Fn(&[u8]) -> SimResult<Vec<u8>> + Send + Sync>;
751
752/// Version tuple for schema versioning.
753type SchemaVersion = (u16, u16, u16);
754
755/// Migration key: (`from_version`, `to_version`).
756type MigrationKey = (SchemaVersion, SchemaVersion);
757
758/// Schema migrator for backward compatibility [50].
759///
760/// Enables reading old journal entries by migrating them to current schema version.
761pub struct SchemaMigrator {
762    /// Current version.
763    current_version: SchemaVersion,
764    /// Registered migrations: (`from_version`, `to_version`) -> `migration_fn`.
765    migrations: std::collections::HashMap<MigrationKey, MigrationFn>,
766}
767
768impl std::fmt::Debug for SchemaMigrator {
769    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
770        f.debug_struct("SchemaMigrator")
771            .field("current_version", &self.current_version)
772            .field("migration_count", &self.migrations.len())
773            .finish()
774    }
775}
776
777impl SchemaMigrator {
778    /// Create a new schema migrator.
779    #[must_use]
780    pub fn new(current_version: SchemaVersion) -> Self {
781        Self {
782            current_version,
783            migrations: std::collections::HashMap::new(),
784        }
785    }
786
787    /// Register migration between versions.
788    pub fn register<F>(&mut self, from: SchemaVersion, to: SchemaVersion, migrate: F)
789    where
790        F: Fn(&[u8]) -> SimResult<Vec<u8>> + Send + Sync + 'static,
791    {
792        self.migrations.insert((from, to), Box::new(migrate));
793    }
794
795    /// Check if entry needs migration.
796    #[must_use]
797    pub fn needs_migration(&self, entry: &VersionedEntry) -> bool {
798        entry.version != self.current_version
799    }
800
801    /// Migrate entry to current version.
802    ///
803    /// # Errors
804    ///
805    /// Returns error if no migration path exists or migration fails.
806    pub fn migrate_to_current(&self, entry: &VersionedEntry) -> SimResult<Vec<u8>> {
807        if entry.version == self.current_version {
808            return Ok(entry.payload.clone());
809        }
810
811        // Try direct migration
812        if let Some(migration) = self.migrations.get(&(entry.version, self.current_version)) {
813            return migration(&entry.payload);
814        }
815
816        // Try to find a migration path (simple linear search for now)
817        // In production, you'd want Dijkstra or BFS for optimal path
818        for (&(from, intermediate), first_step) in &self.migrations {
819            if from == entry.version {
820                if let Some(second_step) =
821                    self.migrations.get(&(intermediate, self.current_version))
822                {
823                    let intermediate_payload = first_step(&entry.payload)?;
824                    return second_step(&intermediate_payload);
825                }
826            }
827        }
828
829        Err(SimError::journal(format!(
830            "No migration path from {:?} to {:?}",
831            entry.version, self.current_version
832        )))
833    }
834
835    /// Get current version.
836    #[must_use]
837    pub const fn current_version(&self) -> SchemaVersion {
838        self.current_version
839    }
840
841    /// Get number of registered migrations.
842    #[must_use]
843    pub fn migration_count(&self) -> usize {
844        self.migrations.len()
845    }
846}
847
848#[cfg(test)]
849mod tests {
850    use super::*;
851    use crate::engine::rng::SimRng;
852    use crate::engine::state::Vec3;
853
854    #[test]
855    fn test_checkpoint_create_restore() {
856        let mut state = SimState::new();
857        state.add_body(1.0, Vec3::new(1.0, 2.0, 3.0), Vec3::new(4.0, 5.0, 6.0));
858
859        let rng = SimRng::new(42);
860        let rng_state = rng.save_state();
861
862        let checkpoint = Checkpoint::create(SimTime::from_secs(1.0), 100, &state, rng_state, 3);
863
864        assert!(checkpoint.is_ok());
865        let checkpoint = checkpoint.ok().unwrap();
866
867        let restored = checkpoint.restore();
868        assert!(restored.is_ok());
869        let restored = restored.ok().unwrap();
870
871        assert_eq!(restored.num_bodies(), 1);
872        assert!((restored.positions()[0].x - 1.0).abs() < f64::EPSILON);
873    }
874
875    #[test]
876    fn test_checkpoint_integrity_check() {
877        let state = SimState::new();
878        let rng = SimRng::new(42);
879
880        let mut checkpoint =
881            Checkpoint::create(SimTime::from_secs(1.0), 100, &state, rng.save_state(), 3)
882                .ok()
883                .unwrap();
884
885        // Corrupt the data
886        if !checkpoint.data.is_empty() {
887            checkpoint.data[0] ^= 0xFF;
888        }
889
890        let result = checkpoint.restore();
891        assert!(result.is_err());
892    }
893
894    #[test]
895    fn test_checkpoint_manager() {
896        let mut manager = CheckpointManager::new(10, 1024 * 1024, 3);
897
898        let state = SimState::new();
899        let rng = SimRng::new(42);
900
901        // Create checkpoints
902        for step in (0..100).step_by(10) {
903            let time = SimTime::from_secs(step as f64 * 0.1);
904            manager
905                .checkpoint(time, step as u64, &state, rng.save_state())
906                .ok();
907        }
908
909        assert_eq!(manager.num_checkpoints(), 10);
910
911        // Get checkpoint at specific time
912        let cp = manager.get_checkpoint_at(SimTime::from_secs(0.55));
913        assert!(cp.is_some());
914        assert!(cp.map(|c| c.time.as_secs_f64()).unwrap_or(0.0) <= 0.55);
915    }
916
917    #[test]
918    fn test_checkpoint_manager_gc() {
919        // Very small max storage to trigger GC
920        let mut manager = CheckpointManager::new(1, 100, 1);
921
922        let state = SimState::new();
923        let rng = SimRng::new(42);
924
925        // Create many checkpoints
926        for step in 0..100 {
927            let time = SimTime::from_secs(step as f64 * 0.01);
928            manager
929                .checkpoint(time, step, &state, rng.save_state())
930                .ok();
931        }
932
933        // Should have garbage collected old ones
934        assert!(manager.storage_used() <= 100);
935    }
936
937    #[test]
938    fn test_event_journal() {
939        let mut journal = EventJournal::new(true);
940
941        let event1 = "event1";
942        let event2 = "event2";
943
944        journal
945            .append(SimTime::from_secs(1.0), 100, &event1, None)
946            .ok();
947        journal
948            .append(SimTime::from_secs(2.0), 200, &event2, None)
949            .ok();
950
951        assert_eq!(journal.len(), 2);
952        assert!(!journal.is_empty());
953
954        let entries: Vec<_> = journal.entries_from(SimTime::from_secs(1.5)).collect();
955        assert!(!entries.is_empty());
956    }
957
958    #[test]
959    fn test_time_scrubber() {
960        let mut scrubber = TimeScrubber::new(10, 1024 * 1024, 3, false);
961
962        assert_eq!(scrubber.current_time(), SimTime::ZERO);
963
964        // Add some checkpoints manually
965        let state = SimState::new();
966        let rng = SimRng::new(42);
967
968        scrubber
969            .checkpoints_mut()
970            .checkpoint(SimTime::from_secs(1.0), 100, &state, rng.save_state())
971            .ok();
972
973        // Seek to checkpoint time
974        let result = scrubber.seek_to(SimTime::from_secs(1.0));
975        assert!(result.is_ok());
976    }
977
978    // === Split Event Journal Tests (Section 4.3.4) ===
979
980    #[test]
981    fn test_split_journal_append() {
982        let mut journal = SplitEventJournal::new();
983
984        journal.append(SimTime::from_secs(1.0), 1, &"event1").ok();
985        journal.append(SimTime::from_secs(2.0), 2, &"event2").ok();
986
987        assert_eq!(journal.header_count(), 2);
988        assert!(journal.payload_bytes() > 0);
989    }
990
991    #[test]
992    fn test_split_journal_load_payload() {
993        let mut journal = SplitEventJournal::new();
994
995        let event = "test_event_data";
996        journal.append(SimTime::from_secs(1.0), 1, &event).ok();
997
998        let header = &journal.headers()[0];
999        let loaded: String = journal.load_payload(header).ok().unwrap();
1000
1001        assert_eq!(loaded, event);
1002    }
1003
1004    #[test]
1005    fn test_split_journal_seek_to_time() {
1006        let mut journal = SplitEventJournal::new();
1007
1008        for i in 0..10 {
1009            journal
1010                .append(SimTime::from_secs(i as f64), i as u32, &format!("event{i}"))
1011                .ok();
1012        }
1013
1014        // Exact match
1015        let idx = journal.seek_to_time(SimTime::from_secs(5.0));
1016        assert_eq!(idx, Some(5));
1017
1018        // Between times
1019        let idx = journal.seek_to_time(SimTime::from_secs(5.5));
1020        assert_eq!(idx, Some(6)); // First event at or after target
1021
1022        // At start
1023        let idx = journal.seek_to_time(SimTime::ZERO);
1024        assert_eq!(idx, Some(0));
1025    }
1026
1027    #[test]
1028    fn test_split_journal_headers_in_range() {
1029        let mut journal = SplitEventJournal::new();
1030
1031        for i in 0..10 {
1032            journal
1033                .append(SimTime::from_secs(i as f64), i as u32, &i)
1034                .ok();
1035        }
1036
1037        let headers: Vec<_> = journal
1038            .headers_in_range(SimTime::from_secs(3.0), SimTime::from_secs(7.0))
1039            .collect();
1040
1041        assert_eq!(headers.len(), 5); // Events at t=3,4,5,6,7
1042        assert_eq!(headers[0].event_type, 3);
1043        assert_eq!(headers[4].event_type, 7);
1044    }
1045
1046    #[test]
1047    fn test_split_journal_clear() {
1048        let mut journal = SplitEventJournal::new();
1049
1050        journal.append(SimTime::from_secs(1.0), 1, &"event").ok();
1051        assert_eq!(journal.header_count(), 1);
1052
1053        journal.clear();
1054        assert_eq!(journal.header_count(), 0);
1055        assert_eq!(journal.payload_bytes(), 0);
1056    }
1057
1058    // === Schema Evolution Tests (Section 4.3.7) ===
1059
1060    #[test]
1061    fn test_versioned_entry_create() {
1062        let data = "test_data";
1063        let entry = VersionedEntry::new((1, 0, 0), "test_type", &data);
1064
1065        assert!(entry.is_ok());
1066        let entry = entry.ok().unwrap();
1067
1068        assert_eq!(entry.version, (1, 0, 0));
1069        assert_eq!(entry.entry_type, "test_type");
1070    }
1071
1072    #[test]
1073    fn test_versioned_entry_deserialize() {
1074        let data = vec![1u32, 2, 3, 4, 5];
1075        let entry = VersionedEntry::new((1, 0, 0), "vec_u32", &data)
1076            .ok()
1077            .unwrap();
1078
1079        let restored: Vec<u32> = entry.deserialize().ok().unwrap();
1080        assert_eq!(restored, data);
1081    }
1082
1083    #[test]
1084    fn test_schema_migrator_no_migration_needed() {
1085        let migrator = SchemaMigrator::new((1, 0, 0));
1086        let entry = VersionedEntry::new((1, 0, 0), "test", &"data")
1087            .ok()
1088            .unwrap();
1089
1090        assert!(!migrator.needs_migration(&entry));
1091
1092        let result = migrator.migrate_to_current(&entry);
1093        assert!(result.is_ok());
1094        assert_eq!(result.ok().unwrap(), entry.payload);
1095    }
1096
1097    #[test]
1098    fn test_schema_migrator_direct_migration() {
1099        let mut migrator = SchemaMigrator::new((1, 1, 0));
1100
1101        // Register migration from 1.0.0 to 1.1.0
1102        migrator.register((1, 0, 0), (1, 1, 0), |payload| {
1103            // Simple migration: prepend a byte
1104            let mut new_payload = vec![0xFF];
1105            new_payload.extend_from_slice(payload);
1106            Ok(new_payload)
1107        });
1108
1109        let entry = VersionedEntry {
1110            version: (1, 0, 0),
1111            entry_type: "test".to_string(),
1112            payload: vec![1, 2, 3],
1113        };
1114
1115        assert!(migrator.needs_migration(&entry));
1116
1117        let result = migrator.migrate_to_current(&entry);
1118        assert!(result.is_ok());
1119
1120        let migrated = result.ok().unwrap();
1121        assert_eq!(migrated, vec![0xFF, 1, 2, 3]);
1122    }
1123
1124    #[test]
1125    fn test_schema_migrator_no_path() {
1126        let migrator = SchemaMigrator::new((2, 0, 0));
1127
1128        let entry = VersionedEntry {
1129            version: (1, 0, 0),
1130            entry_type: "test".to_string(),
1131            payload: vec![1, 2, 3],
1132        };
1133
1134        let result = migrator.migrate_to_current(&entry);
1135        assert!(result.is_err());
1136    }
1137
1138    #[test]
1139    fn test_schema_migrator_chained_migration() {
1140        let mut migrator = SchemaMigrator::new((1, 2, 0));
1141
1142        // Register 1.0.0 -> 1.1.0
1143        migrator.register((1, 0, 0), (1, 1, 0), |payload| {
1144            let mut new = vec![0xAA];
1145            new.extend_from_slice(payload);
1146            Ok(new)
1147        });
1148
1149        // Register 1.1.0 -> 1.2.0
1150        migrator.register((1, 1, 0), (1, 2, 0), |payload| {
1151            let mut new = vec![0xBB];
1152            new.extend_from_slice(payload);
1153            Ok(new)
1154        });
1155
1156        let entry = VersionedEntry {
1157            version: (1, 0, 0),
1158            entry_type: "test".to_string(),
1159            payload: vec![1, 2, 3],
1160        };
1161
1162        let result = migrator.migrate_to_current(&entry);
1163        assert!(result.is_ok());
1164
1165        let migrated = result.ok().unwrap();
1166        // Should be: 0xBB (from 1.1->1.2) + 0xAA (from 1.0->1.1) + [1,2,3]
1167        assert_eq!(migrated, vec![0xBB, 0xAA, 1, 2, 3]);
1168    }
1169
1170    // === Streaming Checkpoint Manager Tests (Section 4.3.3) ===
1171
1172    #[test]
1173    fn test_streaming_checkpoint_roundtrip() {
1174        let temp_dir = tempfile::tempdir().ok().unwrap();
1175        let mut manager = StreamingCheckpointManager::new(temp_dir.path(), 10, 3)
1176            .ok()
1177            .unwrap();
1178
1179        let mut state = SimState::new();
1180        state.add_body(1.0, Vec3::new(1.0, 2.0, 3.0), Vec3::new(4.0, 5.0, 6.0));
1181
1182        let rng = SimRng::new(42);
1183        let rng_state = rng.save_state();
1184
1185        // Create checkpoint
1186        let path = manager
1187            .checkpoint_streaming(SimTime::from_secs(1.0), 100, &state, &rng_state)
1188            .ok()
1189            .unwrap();
1190
1191        assert!(path.exists());
1192        assert_eq!(manager.checkpoint_count(), 1);
1193
1194        // Restore checkpoint
1195        let (header, restored): (CheckpointHeader, SimState) =
1196            manager.restore_streaming(&path).ok().unwrap();
1197
1198        assert_eq!(header.step, 100);
1199        assert!((header.time.as_secs_f64() - 1.0).abs() < f64::EPSILON);
1200        assert_eq!(restored.num_bodies(), 1);
1201        assert!((restored.positions()[0].x - 1.0).abs() < f64::EPSILON);
1202    }
1203
1204    #[test]
1205    fn test_streaming_checkpoint_should_checkpoint() {
1206        let temp_dir = tempfile::tempdir().ok().unwrap();
1207        let manager = StreamingCheckpointManager::new(temp_dir.path(), 10, 3)
1208            .ok()
1209            .unwrap();
1210
1211        assert!(manager.should_checkpoint(0));
1212        assert!(!manager.should_checkpoint(5));
1213        assert!(manager.should_checkpoint(10));
1214        assert!(manager.should_checkpoint(100));
1215    }
1216
1217    // === Additional Coverage Tests ===
1218
1219    #[test]
1220    fn test_checkpoint_manager_clear() {
1221        let mut manager = CheckpointManager::new(10, 1024 * 1024, 3);
1222        let state = SimState::new();
1223        let rng = SimRng::new(42);
1224
1225        manager
1226            .checkpoint(SimTime::from_secs(1.0), 10, &state, rng.save_state())
1227            .ok();
1228
1229        assert_eq!(manager.num_checkpoints(), 1);
1230        assert!(manager.storage_used() > 0);
1231
1232        manager.clear();
1233        assert_eq!(manager.num_checkpoints(), 0);
1234        assert_eq!(manager.storage_used(), 0);
1235    }
1236
1237    #[test]
1238    fn test_checkpoint_manager_restore_not_found() {
1239        let manager = CheckpointManager::new(10, 1024 * 1024, 3);
1240
1241        let result = manager.restore_at(SimTime::from_secs(5.0));
1242        assert!(result.is_err());
1243    }
1244
1245    #[test]
1246    fn test_event_journal_entries() {
1247        let mut journal = EventJournal::new(false);
1248
1249        journal
1250            .append(SimTime::from_secs(1.0), 100, &"event1", None)
1251            .ok();
1252        journal
1253            .append(SimTime::from_secs(2.0), 200, &"event2", None)
1254            .ok();
1255
1256        let entries = journal.entries();
1257        assert_eq!(entries.len(), 2);
1258        assert_eq!(entries[0].step, 100);
1259        assert_eq!(entries[1].step, 200);
1260    }
1261
1262    #[test]
1263    fn test_event_journal_clear() {
1264        let mut journal = EventJournal::new(false);
1265
1266        journal
1267            .append(SimTime::from_secs(1.0), 100, &"event", None)
1268            .ok();
1269        assert!(!journal.is_empty());
1270
1271        journal.clear();
1272        assert!(journal.is_empty());
1273        assert_eq!(journal.len(), 0);
1274    }
1275
1276    #[test]
1277    fn test_event_journal_with_rng() {
1278        let mut journal = EventJournal::new(true); // record RNG state
1279        let rng = SimRng::new(42);
1280        let rng_state = rng.save_state();
1281
1282        journal
1283            .append(SimTime::from_secs(1.0), 100, &"event", Some(&rng_state))
1284            .ok();
1285
1286        let entries = journal.entries();
1287        assert!(entries[0].rng_state.is_some());
1288    }
1289
1290    #[test]
1291    fn test_time_scrubber_seek_same_time() {
1292        let mut scrubber = TimeScrubber::new(10, 1024 * 1024, 3, false);
1293        let state = SimState::new();
1294        let rng = SimRng::new(42);
1295
1296        scrubber
1297            .checkpoints_mut()
1298            .checkpoint(SimTime::ZERO, 0, &state, rng.save_state())
1299            .ok();
1300
1301        // First seek
1302        let _ = scrubber.seek_to(SimTime::ZERO);
1303        // Seek to same time should return immediately
1304        let result = scrubber.seek_to(SimTime::ZERO);
1305        assert!(result.is_ok());
1306    }
1307
1308    #[test]
1309    fn test_time_scrubber_current_state() {
1310        let scrubber = TimeScrubber::new(10, 1024 * 1024, 3, false);
1311        let state = scrubber.current_state();
1312        assert_eq!(state.num_bodies(), 0);
1313    }
1314
1315    #[test]
1316    fn test_time_scrubber_journal_accessors() {
1317        let mut scrubber = TimeScrubber::new(10, 1024 * 1024, 3, false);
1318
1319        // Read-only journal
1320        assert!(scrubber.journal().is_empty());
1321
1322        // Mutable journal
1323        scrubber
1324            .journal_mut()
1325            .append(SimTime::from_secs(1.0), 100, &"event", None)
1326            .ok();
1327
1328        assert!(!scrubber.journal().is_empty());
1329    }
1330
1331    #[test]
1332    fn test_streaming_checkpoint_total_bytes() {
1333        let temp_dir = tempfile::tempdir().ok().unwrap();
1334        let mut manager = StreamingCheckpointManager::new(temp_dir.path(), 10, 3)
1335            .ok()
1336            .unwrap();
1337
1338        assert_eq!(manager.total_bytes_written(), 0);
1339
1340        let state = SimState::new();
1341        let rng = SimRng::new(42);
1342
1343        manager
1344            .checkpoint_streaming(SimTime::from_secs(1.0), 10, &state, &rng.save_state())
1345            .ok();
1346
1347        assert!(manager.total_bytes_written() > 0);
1348    }
1349
1350    #[test]
1351    fn test_checkpoint_compressed_size() {
1352        let state = SimState::new();
1353        let rng = SimRng::new(42);
1354
1355        let checkpoint =
1356            Checkpoint::create(SimTime::from_secs(1.0), 100, &state, rng.save_state(), 3)
1357                .ok()
1358                .unwrap();
1359
1360        assert!(checkpoint.compressed_size() > 0);
1361    }
1362
1363    #[test]
1364    fn test_checkpoint_clone() {
1365        let state = SimState::new();
1366        let rng = SimRng::new(42);
1367
1368        let checkpoint =
1369            Checkpoint::create(SimTime::from_secs(1.0), 100, &state, rng.save_state(), 3)
1370                .ok()
1371                .unwrap();
1372
1373        let cloned = checkpoint.clone();
1374        assert_eq!(cloned.step, checkpoint.step);
1375        assert_eq!(cloned.hash, checkpoint.hash);
1376    }
1377
1378    #[test]
1379    fn test_journal_entry_clone() {
1380        let mut journal = EventJournal::new(false);
1381        journal
1382            .append(SimTime::from_secs(1.0), 100, &"event", None)
1383            .ok();
1384
1385        let entry = &journal.entries()[0];
1386        let cloned = entry.clone();
1387        assert_eq!(cloned.step, entry.step);
1388    }
1389
1390    #[test]
1391    fn test_versioned_entry_debug() {
1392        let entry = VersionedEntry::new((1, 0, 0), "test", &"data")
1393            .ok()
1394            .unwrap();
1395        let debug = format!("{:?}", entry);
1396        assert!(debug.contains("VersionedEntry"));
1397    }
1398
1399    #[test]
1400    fn test_event_header_debug() {
1401        let header = EventHeader {
1402            time: SimTime::from_secs(1.0),
1403            event_type: 42,
1404            payload_offset: 0,
1405            payload_size: 100,
1406            sequence: 1,
1407        };
1408        let debug = format!("{:?}", header);
1409        assert!(debug.contains("EventHeader"));
1410    }
1411
1412    #[test]
1413    fn test_checkpoint_header_debug() {
1414        let rng = SimRng::new(42);
1415        let header = CheckpointHeader {
1416            time: SimTime::from_secs(1.0),
1417            step: 100,
1418            rng_state: rng.save_state(),
1419            version: (1, 0, 0),
1420        };
1421        let debug = format!("{:?}", header);
1422        assert!(debug.contains("CheckpointHeader"));
1423    }
1424
1425    #[test]
1426    fn test_checkpoint_manager_should_checkpoint() {
1427        let manager = CheckpointManager::new(10, 1024 * 1024, 3);
1428        assert!(manager.should_checkpoint(0));
1429        assert!(!manager.should_checkpoint(5));
1430        assert!(manager.should_checkpoint(10));
1431        assert!(manager.should_checkpoint(20));
1432    }
1433
1434    #[test]
1435    fn test_split_journal_seek_before_start() {
1436        let mut journal = SplitEventJournal::new();
1437
1438        for i in 1..10 {
1439            journal
1440                .append(SimTime::from_secs(i as f64), i as u32, &i)
1441                .ok();
1442        }
1443
1444        // Seek before any events
1445        let idx = journal.seek_to_time(SimTime::from_secs(0.5));
1446        assert_eq!(idx, Some(0));
1447    }
1448
1449    #[test]
1450    fn test_split_journal_seek_after_end() {
1451        let mut journal = SplitEventJournal::new();
1452
1453        for i in 0..5 {
1454            journal
1455                .append(SimTime::from_secs(i as f64), i as u32, &i)
1456                .ok();
1457        }
1458
1459        // Seek way after all events
1460        let idx = journal.seek_to_time(SimTime::from_secs(100.0));
1461        assert_eq!(idx, None);
1462    }
1463
1464    #[test]
1465    fn test_split_journal_empty_seek() {
1466        let journal = SplitEventJournal::new();
1467        let idx = journal.seek_to_time(SimTime::from_secs(1.0));
1468        assert_eq!(idx, None);
1469    }
1470}
1471
1472#[cfg(test)]
1473mod proptests {
1474    use super::*;
1475    use crate::engine::rng::SimRng;
1476    use crate::engine::state::Vec3;
1477    use proptest::prelude::*;
1478
1479    proptest! {
1480        /// Falsification: checkpoint restore produces identical state.
1481        #[test]
1482        fn prop_checkpoint_roundtrip(
1483            x in -1000.0f64..1000.0,
1484            y in -1000.0f64..1000.0,
1485            z in -1000.0f64..1000.0,
1486            mass in 0.1f64..1000.0,
1487            seed in 0u64..u64::MAX,
1488        ) {
1489            let mut state = SimState::new();
1490            state.add_body(mass, Vec3::new(x, y, z), Vec3::zero());
1491
1492            let rng = SimRng::new(seed);
1493            let checkpoint = Checkpoint::create(
1494                SimTime::from_secs(1.0),
1495                100,
1496                &state,
1497                rng.save_state(),
1498                3,
1499            );
1500
1501            prop_assert!(checkpoint.is_ok());
1502            let checkpoint = checkpoint.ok().unwrap();
1503
1504            let restored = checkpoint.restore();
1505            prop_assert!(restored.is_ok());
1506            let restored = restored.ok().unwrap();
1507
1508            prop_assert_eq!(restored.num_bodies(), state.num_bodies());
1509            prop_assert!((restored.positions()[0].x - x).abs() < 1e-10);
1510            prop_assert!((restored.positions()[0].y - y).abs() < 1e-10);
1511            prop_assert!((restored.positions()[0].z - z).abs() < 1e-10);
1512        }
1513    }
1514}