1use serde::{Deserialize, Serialize};
15use std::collections::BTreeMap;
16
17use crate::engine::rng::RngState;
18use crate::engine::{SimState, SimTime};
19use crate::error::{SimError, SimResult};
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct Checkpoint {
24 pub time: SimTime,
26 pub step: u64,
28 pub data: Vec<u8>,
30 pub hash: [u8; 32],
32 pub rng_state: RngState,
34}
35
36impl Checkpoint {
37 pub fn create(
43 time: SimTime,
44 step: u64,
45 state: &SimState,
46 rng_state: RngState,
47 compression_level: i32,
48 ) -> SimResult<Self> {
49 let serialized =
51 bincode::serialize(state).map_err(|e| SimError::serialization(e.to_string()))?;
52
53 let compressed = zstd::encode_all(&serialized[..], compression_level)?;
55
56 let hash = blake3::hash(&compressed);
58
59 Ok(Self {
60 time,
61 step,
62 data: compressed,
63 hash: *hash.as_bytes(),
64 rng_state,
65 })
66 }
67
68 pub fn restore(&self) -> SimResult<SimState> {
74 let computed_hash = blake3::hash(&self.data);
76 if computed_hash.as_bytes() != &self.hash {
77 return Err(SimError::CheckpointIntegrity);
78 }
79
80 let decompressed = zstd::decode_all(&self.data[..])?;
82
83 bincode::deserialize(&decompressed).map_err(|e| SimError::serialization(e.to_string()))
85 }
86
87 #[must_use]
89 pub fn compressed_size(&self) -> usize {
90 self.data.len()
91 }
92}
93
94#[derive(Debug, Default)]
96pub struct CheckpointManager {
97 checkpoints: BTreeMap<SimTime, Checkpoint>,
99 interval: u64,
101 max_storage: usize,
103 current_storage: usize,
105 compression_level: i32,
107}
108
109impl CheckpointManager {
110 #[must_use]
112 pub fn new(interval: u64, max_storage: usize, compression_level: i32) -> Self {
113 Self {
114 checkpoints: BTreeMap::new(),
115 interval,
116 max_storage,
117 current_storage: 0,
118 compression_level,
119 }
120 }
121
122 #[must_use]
124 pub const fn should_checkpoint(&self, step: u64) -> bool {
125 step % self.interval == 0
126 }
127
128 pub fn checkpoint(
134 &mut self,
135 time: SimTime,
136 step: u64,
137 state: &SimState,
138 rng_state: RngState,
139 ) -> SimResult<()> {
140 let checkpoint = Checkpoint::create(time, step, state, rng_state, self.compression_level)?;
141
142 let size = checkpoint.compressed_size();
143
144 while self.current_storage + size > self.max_storage && !self.checkpoints.is_empty() {
146 self.remove_oldest();
147 }
148
149 self.current_storage += size;
150 self.checkpoints.insert(time, checkpoint);
151
152 Ok(())
153 }
154
155 #[must_use]
157 pub fn get_checkpoint_at(&self, time: SimTime) -> Option<&Checkpoint> {
158 self.checkpoints
159 .range(..=time)
160 .next_back()
161 .map(|(_, cp)| cp)
162 }
163
164 pub fn restore_at(&self, time: SimTime) -> SimResult<(SimState, SimTime)> {
170 let checkpoint = self
171 .get_checkpoint_at(time)
172 .ok_or(SimError::CheckpointNotFound(time))?;
173
174 let state = checkpoint.restore()?;
175 Ok((state, checkpoint.time))
176 }
177
178 fn remove_oldest(&mut self) {
180 if let Some((&time, _)) = self.checkpoints.iter().next() {
181 if let Some(cp) = self.checkpoints.remove(&time) {
182 self.current_storage = self.current_storage.saturating_sub(cp.compressed_size());
183 }
184 }
185 }
186
187 #[must_use]
189 pub fn num_checkpoints(&self) -> usize {
190 self.checkpoints.len()
191 }
192
193 #[must_use]
195 pub const fn storage_used(&self) -> usize {
196 self.current_storage
197 }
198
199 pub fn clear(&mut self) {
201 self.checkpoints.clear();
202 self.current_storage = 0;
203 }
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct JournalEntry {
209 pub time: SimTime,
211 pub step: u64,
213 pub sequence: u64,
215 pub event_data: Vec<u8>,
217 pub rng_state: Option<RngState>,
219}
220
221#[derive(Debug, Default)]
223pub struct EventJournal {
224 entries: Vec<JournalEntry>,
226 time_index: BTreeMap<SimTime, usize>,
228 sequence: u64,
230 record_rng_state: bool,
232}
233
234impl EventJournal {
235 #[must_use]
237 pub fn new(record_rng_state: bool) -> Self {
238 Self {
239 entries: Vec::new(),
240 time_index: BTreeMap::new(),
241 sequence: 0,
242 record_rng_state,
243 }
244 }
245
246 pub fn append<T: Serialize>(
252 &mut self,
253 time: SimTime,
254 step: u64,
255 event: &T,
256 rng_state: Option<&RngState>,
257 ) -> SimResult<()> {
258 let event_data =
259 bincode::serialize(event).map_err(|e| SimError::serialization(e.to_string()))?;
260
261 let rng_state = if self.record_rng_state {
262 rng_state.cloned()
263 } else {
264 None
265 };
266
267 let entry = JournalEntry {
268 time,
269 step,
270 sequence: self.sequence,
271 event_data,
272 rng_state,
273 };
274
275 let index = self.entries.len();
276 self.time_index.insert(time, index);
277 self.entries.push(entry);
278 self.sequence += 1;
279
280 Ok(())
281 }
282
283 pub fn entries_from(&self, time: SimTime) -> impl Iterator<Item = &JournalEntry> {
285 let start_idx = self
286 .time_index
287 .range(..=time)
288 .next_back()
289 .map_or(0, |(_, &idx)| idx);
290
291 self.entries[start_idx..].iter()
292 }
293
294 #[must_use]
296 pub fn entries(&self) -> &[JournalEntry] {
297 &self.entries
298 }
299
300 #[must_use]
302 pub fn len(&self) -> usize {
303 self.entries.len()
304 }
305
306 #[must_use]
308 pub fn is_empty(&self) -> bool {
309 self.entries.is_empty()
310 }
311
312 pub fn clear(&mut self) {
314 self.entries.clear();
315 self.time_index.clear();
316 self.sequence = 0;
317 }
318}
319
320#[derive(Debug)]
322pub struct TimeScrubber {
323 checkpoints: CheckpointManager,
325 journal: EventJournal,
327 current_time: SimTime,
329 current_state: SimState,
331}
332
333impl TimeScrubber {
334 #[must_use]
336 pub fn new(
337 checkpoint_interval: u64,
338 max_storage: usize,
339 compression_level: i32,
340 record_rng_state: bool,
341 ) -> Self {
342 Self {
343 checkpoints: CheckpointManager::new(
344 checkpoint_interval,
345 max_storage,
346 compression_level,
347 ),
348 journal: EventJournal::new(record_rng_state),
349 current_time: SimTime::ZERO,
350 current_state: SimState::default(),
351 }
352 }
353
354 pub fn seek_to(&mut self, target: SimTime) -> SimResult<&SimState> {
360 if target == self.current_time {
361 return Ok(&self.current_state);
362 }
363
364 let (state, checkpoint_time) = self.checkpoints.restore_at(target)?;
366 self.current_state = state;
367 self.current_time = checkpoint_time;
368
369 for entry in self.journal.entries_from(checkpoint_time) {
371 if entry.time > target {
372 break;
373 }
374 self.current_time = entry.time;
376 }
377
378 Ok(&self.current_state)
379 }
380
381 #[must_use]
383 pub const fn current_time(&self) -> SimTime {
384 self.current_time
385 }
386
387 #[must_use]
389 pub const fn current_state(&self) -> &SimState {
390 &self.current_state
391 }
392
393 #[must_use]
395 pub const fn checkpoints(&self) -> &CheckpointManager {
396 &self.checkpoints
397 }
398
399 #[must_use]
401 pub fn checkpoints_mut(&mut self) -> &mut CheckpointManager {
402 &mut self.checkpoints
403 }
404
405 #[must_use]
407 pub const fn journal(&self) -> &EventJournal {
408 &self.journal
409 }
410
411 #[must_use]
413 pub fn journal_mut(&mut self) -> &mut EventJournal {
414 &mut self.journal
415 }
416}
417
418#[derive(Debug)]
427pub struct StreamingCheckpointManager {
428 base_path: std::path::PathBuf,
430 compression_level: i32,
432 interval: u64,
434 checkpoint_count: usize,
436 total_bytes_written: usize,
438}
439
440impl StreamingCheckpointManager {
441 pub fn new(
447 base_path: impl Into<std::path::PathBuf>,
448 interval: u64,
449 compression_level: i32,
450 ) -> SimResult<Self> {
451 let base_path = base_path.into();
452 std::fs::create_dir_all(&base_path)?;
453
454 Ok(Self {
455 base_path,
456 compression_level,
457 interval,
458 checkpoint_count: 0,
459 total_bytes_written: 0,
460 })
461 }
462
463 #[must_use]
465 pub const fn should_checkpoint(&self, step: u64) -> bool {
466 step % self.interval == 0
467 }
468
469 pub fn checkpoint_streaming<S: Serialize>(
475 &mut self,
476 time: SimTime,
477 step: u64,
478 state: &S,
479 rng_state: &RngState,
480 ) -> SimResult<std::path::PathBuf> {
481 let filename = format!("checkpoint_{step:012}.zst");
482 let path = self.base_path.join(&filename);
483
484 let file = std::fs::File::create(&path)?;
486 let mut encoder = zstd::stream::Encoder::new(file, self.compression_level)
487 .map_err(|e| SimError::serialization(format!("Zstd encoder init: {e}")))?;
488
489 let header = CheckpointHeader {
491 time,
492 step,
493 rng_state: rng_state.clone(),
494 version: (0, 1, 0),
495 };
496 bincode::serialize_into(&mut encoder, &header)
497 .map_err(|e| SimError::serialization(format!("Header serialize: {e}")))?;
498
499 bincode::serialize_into(&mut encoder, state)
501 .map_err(|e| SimError::serialization(format!("State serialize: {e}")))?;
502
503 let file = encoder
504 .finish()
505 .map_err(|e| SimError::serialization(format!("Zstd finish: {e}")))?;
506
507 let metadata = file.metadata()?;
509 self.total_bytes_written += metadata.len() as usize;
510 self.checkpoint_count += 1;
511
512 Ok(path)
513 }
514
515 pub fn restore_streaming<S: serde::de::DeserializeOwned>(
521 &self,
522 path: &std::path::Path,
523 ) -> SimResult<(CheckpointHeader, S)> {
524 let file = std::fs::File::open(path)?;
525 let mut decoder = zstd::stream::Decoder::new(file)
526 .map_err(|e| SimError::serialization(format!("Zstd decoder init: {e}")))?;
527
528 let header: CheckpointHeader = bincode::deserialize_from(&mut decoder)
529 .map_err(|e| SimError::serialization(format!("Header deserialize: {e}")))?;
530
531 let state: S = bincode::deserialize_from(&mut decoder)
532 .map_err(|e| SimError::serialization(format!("State deserialize: {e}")))?;
533
534 Ok((header, state))
535 }
536
537 #[must_use]
539 pub const fn checkpoint_count(&self) -> usize {
540 self.checkpoint_count
541 }
542
543 #[must_use]
545 pub const fn total_bytes_written(&self) -> usize {
546 self.total_bytes_written
547 }
548}
549
550#[derive(Debug, Clone, Serialize, Deserialize)]
552pub struct CheckpointHeader {
553 pub time: SimTime,
555 pub step: u64,
557 pub rng_state: RngState,
559 pub version: (u16, u16, u16),
561}
562
563#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
571pub struct EventHeader {
572 pub time: SimTime,
574 pub event_type: u32,
576 pub payload_offset: u64,
578 pub payload_size: u32,
580 pub sequence: u64,
582}
583
584#[derive(Debug, Default)]
589pub struct SplitEventJournal {
590 headers: Vec<EventHeader>,
592 payloads: Vec<u8>,
594 sequence: u64,
596}
597
598impl SplitEventJournal {
599 #[must_use]
601 pub fn new() -> Self {
602 Self::default()
603 }
604
605 pub fn append<T: Serialize>(
611 &mut self,
612 time: SimTime,
613 event_type: u32,
614 event: &T,
615 ) -> SimResult<()> {
616 let payload =
617 bincode::serialize(event).map_err(|e| SimError::serialization(e.to_string()))?;
618
619 let header = EventHeader {
620 time,
621 event_type,
622 payload_offset: self.payloads.len() as u64,
623 payload_size: payload.len() as u32,
624 sequence: self.sequence,
625 };
626
627 self.headers.push(header);
628 self.payloads.extend(payload);
629 self.sequence += 1;
630
631 Ok(())
632 }
633
634 #[must_use]
636 pub fn seek_to_time(&self, target: SimTime) -> Option<usize> {
637 self.headers
638 .binary_search_by(|h| h.time.cmp(&target))
639 .ok()
640 .or_else(|| {
641 self.headers.iter().position(|h| h.time >= target)
643 })
644 }
645
646 pub fn load_payload<T: serde::de::DeserializeOwned>(
652 &self,
653 header: &EventHeader,
654 ) -> SimResult<T> {
655 let start = header.payload_offset as usize;
656 let end = start + header.payload_size as usize;
657
658 if end > self.payloads.len() {
659 return Err(SimError::journal("Payload offset out of bounds"));
660 }
661
662 bincode::deserialize(&self.payloads[start..end])
663 .map_err(|e| SimError::journal(format!("Payload deserialize: {e}")))
664 }
665
666 pub fn headers_in_range(
668 &self,
669 start: SimTime,
670 end: SimTime,
671 ) -> impl Iterator<Item = &EventHeader> {
672 self.headers
673 .iter()
674 .filter(move |h| h.time >= start && h.time <= end)
675 }
676
677 #[must_use]
679 pub fn headers(&self) -> &[EventHeader] {
680 &self.headers
681 }
682
683 #[must_use]
685 pub fn header_count(&self) -> usize {
686 self.headers.len()
687 }
688
689 #[must_use]
691 pub fn payload_bytes(&self) -> usize {
692 self.payloads.len()
693 }
694
695 pub fn clear(&mut self) {
697 self.headers.clear();
698 self.payloads.clear();
699 self.sequence = 0;
700 }
701}
702
703#[derive(Debug, Clone, Serialize, Deserialize)]
709pub struct VersionedEntry {
710 pub version: (u16, u16, u16),
712 pub entry_type: String,
714 pub payload: Vec<u8>,
716}
717
718impl VersionedEntry {
719 pub fn new<T: Serialize>(
725 version: (u16, u16, u16),
726 entry_type: impl Into<String>,
727 data: &T,
728 ) -> SimResult<Self> {
729 let payload =
730 bincode::serialize(data).map_err(|e| SimError::serialization(e.to_string()))?;
731
732 Ok(Self {
733 version,
734 entry_type: entry_type.into(),
735 payload,
736 })
737 }
738
739 pub fn deserialize<T: serde::de::DeserializeOwned>(&self) -> SimResult<T> {
745 bincode::deserialize(&self.payload).map_err(|e| SimError::serialization(e.to_string()))
746 }
747}
748
749type MigrationFn = Box<dyn Fn(&[u8]) -> SimResult<Vec<u8>> + Send + Sync>;
751
752type SchemaVersion = (u16, u16, u16);
754
755type MigrationKey = (SchemaVersion, SchemaVersion);
757
758pub struct SchemaMigrator {
762 current_version: SchemaVersion,
764 migrations: std::collections::HashMap<MigrationKey, MigrationFn>,
766}
767
768impl std::fmt::Debug for SchemaMigrator {
769 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
770 f.debug_struct("SchemaMigrator")
771 .field("current_version", &self.current_version)
772 .field("migration_count", &self.migrations.len())
773 .finish()
774 }
775}
776
777impl SchemaMigrator {
778 #[must_use]
780 pub fn new(current_version: SchemaVersion) -> Self {
781 Self {
782 current_version,
783 migrations: std::collections::HashMap::new(),
784 }
785 }
786
787 pub fn register<F>(&mut self, from: SchemaVersion, to: SchemaVersion, migrate: F)
789 where
790 F: Fn(&[u8]) -> SimResult<Vec<u8>> + Send + Sync + 'static,
791 {
792 self.migrations.insert((from, to), Box::new(migrate));
793 }
794
795 #[must_use]
797 pub fn needs_migration(&self, entry: &VersionedEntry) -> bool {
798 entry.version != self.current_version
799 }
800
801 pub fn migrate_to_current(&self, entry: &VersionedEntry) -> SimResult<Vec<u8>> {
807 if entry.version == self.current_version {
808 return Ok(entry.payload.clone());
809 }
810
811 if let Some(migration) = self.migrations.get(&(entry.version, self.current_version)) {
813 return migration(&entry.payload);
814 }
815
816 for (&(from, intermediate), first_step) in &self.migrations {
819 if from == entry.version {
820 if let Some(second_step) =
821 self.migrations.get(&(intermediate, self.current_version))
822 {
823 let intermediate_payload = first_step(&entry.payload)?;
824 return second_step(&intermediate_payload);
825 }
826 }
827 }
828
829 Err(SimError::journal(format!(
830 "No migration path from {:?} to {:?}",
831 entry.version, self.current_version
832 )))
833 }
834
835 #[must_use]
837 pub const fn current_version(&self) -> SchemaVersion {
838 self.current_version
839 }
840
841 #[must_use]
843 pub fn migration_count(&self) -> usize {
844 self.migrations.len()
845 }
846}
847
848#[cfg(test)]
849mod tests {
850 use super::*;
851 use crate::engine::rng::SimRng;
852 use crate::engine::state::Vec3;
853
854 #[test]
855 fn test_checkpoint_create_restore() {
856 let mut state = SimState::new();
857 state.add_body(1.0, Vec3::new(1.0, 2.0, 3.0), Vec3::new(4.0, 5.0, 6.0));
858
859 let rng = SimRng::new(42);
860 let rng_state = rng.save_state();
861
862 let checkpoint = Checkpoint::create(SimTime::from_secs(1.0), 100, &state, rng_state, 3);
863
864 assert!(checkpoint.is_ok());
865 let checkpoint = checkpoint.ok().unwrap();
866
867 let restored = checkpoint.restore();
868 assert!(restored.is_ok());
869 let restored = restored.ok().unwrap();
870
871 assert_eq!(restored.num_bodies(), 1);
872 assert!((restored.positions()[0].x - 1.0).abs() < f64::EPSILON);
873 }
874
875 #[test]
876 fn test_checkpoint_integrity_check() {
877 let state = SimState::new();
878 let rng = SimRng::new(42);
879
880 let mut checkpoint =
881 Checkpoint::create(SimTime::from_secs(1.0), 100, &state, rng.save_state(), 3)
882 .ok()
883 .unwrap();
884
885 if !checkpoint.data.is_empty() {
887 checkpoint.data[0] ^= 0xFF;
888 }
889
890 let result = checkpoint.restore();
891 assert!(result.is_err());
892 }
893
894 #[test]
895 fn test_checkpoint_manager() {
896 let mut manager = CheckpointManager::new(10, 1024 * 1024, 3);
897
898 let state = SimState::new();
899 let rng = SimRng::new(42);
900
901 for step in (0..100).step_by(10) {
903 let time = SimTime::from_secs(step as f64 * 0.1);
904 manager
905 .checkpoint(time, step as u64, &state, rng.save_state())
906 .ok();
907 }
908
909 assert_eq!(manager.num_checkpoints(), 10);
910
911 let cp = manager.get_checkpoint_at(SimTime::from_secs(0.55));
913 assert!(cp.is_some());
914 assert!(cp.map(|c| c.time.as_secs_f64()).unwrap_or(0.0) <= 0.55);
915 }
916
917 #[test]
918 fn test_checkpoint_manager_gc() {
919 let mut manager = CheckpointManager::new(1, 100, 1);
921
922 let state = SimState::new();
923 let rng = SimRng::new(42);
924
925 for step in 0..100 {
927 let time = SimTime::from_secs(step as f64 * 0.01);
928 manager
929 .checkpoint(time, step, &state, rng.save_state())
930 .ok();
931 }
932
933 assert!(manager.storage_used() <= 100);
935 }
936
937 #[test]
938 fn test_event_journal() {
939 let mut journal = EventJournal::new(true);
940
941 let event1 = "event1";
942 let event2 = "event2";
943
944 journal
945 .append(SimTime::from_secs(1.0), 100, &event1, None)
946 .ok();
947 journal
948 .append(SimTime::from_secs(2.0), 200, &event2, None)
949 .ok();
950
951 assert_eq!(journal.len(), 2);
952 assert!(!journal.is_empty());
953
954 let entries: Vec<_> = journal.entries_from(SimTime::from_secs(1.5)).collect();
955 assert!(!entries.is_empty());
956 }
957
958 #[test]
959 fn test_time_scrubber() {
960 let mut scrubber = TimeScrubber::new(10, 1024 * 1024, 3, false);
961
962 assert_eq!(scrubber.current_time(), SimTime::ZERO);
963
964 let state = SimState::new();
966 let rng = SimRng::new(42);
967
968 scrubber
969 .checkpoints_mut()
970 .checkpoint(SimTime::from_secs(1.0), 100, &state, rng.save_state())
971 .ok();
972
973 let result = scrubber.seek_to(SimTime::from_secs(1.0));
975 assert!(result.is_ok());
976 }
977
978 #[test]
981 fn test_split_journal_append() {
982 let mut journal = SplitEventJournal::new();
983
984 journal.append(SimTime::from_secs(1.0), 1, &"event1").ok();
985 journal.append(SimTime::from_secs(2.0), 2, &"event2").ok();
986
987 assert_eq!(journal.header_count(), 2);
988 assert!(journal.payload_bytes() > 0);
989 }
990
991 #[test]
992 fn test_split_journal_load_payload() {
993 let mut journal = SplitEventJournal::new();
994
995 let event = "test_event_data";
996 journal.append(SimTime::from_secs(1.0), 1, &event).ok();
997
998 let header = &journal.headers()[0];
999 let loaded: String = journal.load_payload(header).ok().unwrap();
1000
1001 assert_eq!(loaded, event);
1002 }
1003
1004 #[test]
1005 fn test_split_journal_seek_to_time() {
1006 let mut journal = SplitEventJournal::new();
1007
1008 for i in 0..10 {
1009 journal
1010 .append(SimTime::from_secs(i as f64), i as u32, &format!("event{i}"))
1011 .ok();
1012 }
1013
1014 let idx = journal.seek_to_time(SimTime::from_secs(5.0));
1016 assert_eq!(idx, Some(5));
1017
1018 let idx = journal.seek_to_time(SimTime::from_secs(5.5));
1020 assert_eq!(idx, Some(6)); let idx = journal.seek_to_time(SimTime::ZERO);
1024 assert_eq!(idx, Some(0));
1025 }
1026
1027 #[test]
1028 fn test_split_journal_headers_in_range() {
1029 let mut journal = SplitEventJournal::new();
1030
1031 for i in 0..10 {
1032 journal
1033 .append(SimTime::from_secs(i as f64), i as u32, &i)
1034 .ok();
1035 }
1036
1037 let headers: Vec<_> = journal
1038 .headers_in_range(SimTime::from_secs(3.0), SimTime::from_secs(7.0))
1039 .collect();
1040
1041 assert_eq!(headers.len(), 5); assert_eq!(headers[0].event_type, 3);
1043 assert_eq!(headers[4].event_type, 7);
1044 }
1045
1046 #[test]
1047 fn test_split_journal_clear() {
1048 let mut journal = SplitEventJournal::new();
1049
1050 journal.append(SimTime::from_secs(1.0), 1, &"event").ok();
1051 assert_eq!(journal.header_count(), 1);
1052
1053 journal.clear();
1054 assert_eq!(journal.header_count(), 0);
1055 assert_eq!(journal.payload_bytes(), 0);
1056 }
1057
1058 #[test]
1061 fn test_versioned_entry_create() {
1062 let data = "test_data";
1063 let entry = VersionedEntry::new((1, 0, 0), "test_type", &data);
1064
1065 assert!(entry.is_ok());
1066 let entry = entry.ok().unwrap();
1067
1068 assert_eq!(entry.version, (1, 0, 0));
1069 assert_eq!(entry.entry_type, "test_type");
1070 }
1071
1072 #[test]
1073 fn test_versioned_entry_deserialize() {
1074 let data = vec![1u32, 2, 3, 4, 5];
1075 let entry = VersionedEntry::new((1, 0, 0), "vec_u32", &data)
1076 .ok()
1077 .unwrap();
1078
1079 let restored: Vec<u32> = entry.deserialize().ok().unwrap();
1080 assert_eq!(restored, data);
1081 }
1082
1083 #[test]
1084 fn test_schema_migrator_no_migration_needed() {
1085 let migrator = SchemaMigrator::new((1, 0, 0));
1086 let entry = VersionedEntry::new((1, 0, 0), "test", &"data")
1087 .ok()
1088 .unwrap();
1089
1090 assert!(!migrator.needs_migration(&entry));
1091
1092 let result = migrator.migrate_to_current(&entry);
1093 assert!(result.is_ok());
1094 assert_eq!(result.ok().unwrap(), entry.payload);
1095 }
1096
1097 #[test]
1098 fn test_schema_migrator_direct_migration() {
1099 let mut migrator = SchemaMigrator::new((1, 1, 0));
1100
1101 migrator.register((1, 0, 0), (1, 1, 0), |payload| {
1103 let mut new_payload = vec![0xFF];
1105 new_payload.extend_from_slice(payload);
1106 Ok(new_payload)
1107 });
1108
1109 let entry = VersionedEntry {
1110 version: (1, 0, 0),
1111 entry_type: "test".to_string(),
1112 payload: vec![1, 2, 3],
1113 };
1114
1115 assert!(migrator.needs_migration(&entry));
1116
1117 let result = migrator.migrate_to_current(&entry);
1118 assert!(result.is_ok());
1119
1120 let migrated = result.ok().unwrap();
1121 assert_eq!(migrated, vec![0xFF, 1, 2, 3]);
1122 }
1123
1124 #[test]
1125 fn test_schema_migrator_no_path() {
1126 let migrator = SchemaMigrator::new((2, 0, 0));
1127
1128 let entry = VersionedEntry {
1129 version: (1, 0, 0),
1130 entry_type: "test".to_string(),
1131 payload: vec![1, 2, 3],
1132 };
1133
1134 let result = migrator.migrate_to_current(&entry);
1135 assert!(result.is_err());
1136 }
1137
1138 #[test]
1139 fn test_schema_migrator_chained_migration() {
1140 let mut migrator = SchemaMigrator::new((1, 2, 0));
1141
1142 migrator.register((1, 0, 0), (1, 1, 0), |payload| {
1144 let mut new = vec![0xAA];
1145 new.extend_from_slice(payload);
1146 Ok(new)
1147 });
1148
1149 migrator.register((1, 1, 0), (1, 2, 0), |payload| {
1151 let mut new = vec![0xBB];
1152 new.extend_from_slice(payload);
1153 Ok(new)
1154 });
1155
1156 let entry = VersionedEntry {
1157 version: (1, 0, 0),
1158 entry_type: "test".to_string(),
1159 payload: vec![1, 2, 3],
1160 };
1161
1162 let result = migrator.migrate_to_current(&entry);
1163 assert!(result.is_ok());
1164
1165 let migrated = result.ok().unwrap();
1166 assert_eq!(migrated, vec![0xBB, 0xAA, 1, 2, 3]);
1168 }
1169
1170 #[test]
1173 fn test_streaming_checkpoint_roundtrip() {
1174 let temp_dir = tempfile::tempdir().ok().unwrap();
1175 let mut manager = StreamingCheckpointManager::new(temp_dir.path(), 10, 3)
1176 .ok()
1177 .unwrap();
1178
1179 let mut state = SimState::new();
1180 state.add_body(1.0, Vec3::new(1.0, 2.0, 3.0), Vec3::new(4.0, 5.0, 6.0));
1181
1182 let rng = SimRng::new(42);
1183 let rng_state = rng.save_state();
1184
1185 let path = manager
1187 .checkpoint_streaming(SimTime::from_secs(1.0), 100, &state, &rng_state)
1188 .ok()
1189 .unwrap();
1190
1191 assert!(path.exists());
1192 assert_eq!(manager.checkpoint_count(), 1);
1193
1194 let (header, restored): (CheckpointHeader, SimState) =
1196 manager.restore_streaming(&path).ok().unwrap();
1197
1198 assert_eq!(header.step, 100);
1199 assert!((header.time.as_secs_f64() - 1.0).abs() < f64::EPSILON);
1200 assert_eq!(restored.num_bodies(), 1);
1201 assert!((restored.positions()[0].x - 1.0).abs() < f64::EPSILON);
1202 }
1203
1204 #[test]
1205 fn test_streaming_checkpoint_should_checkpoint() {
1206 let temp_dir = tempfile::tempdir().ok().unwrap();
1207 let manager = StreamingCheckpointManager::new(temp_dir.path(), 10, 3)
1208 .ok()
1209 .unwrap();
1210
1211 assert!(manager.should_checkpoint(0));
1212 assert!(!manager.should_checkpoint(5));
1213 assert!(manager.should_checkpoint(10));
1214 assert!(manager.should_checkpoint(100));
1215 }
1216
1217 #[test]
1220 fn test_checkpoint_manager_clear() {
1221 let mut manager = CheckpointManager::new(10, 1024 * 1024, 3);
1222 let state = SimState::new();
1223 let rng = SimRng::new(42);
1224
1225 manager
1226 .checkpoint(SimTime::from_secs(1.0), 10, &state, rng.save_state())
1227 .ok();
1228
1229 assert_eq!(manager.num_checkpoints(), 1);
1230 assert!(manager.storage_used() > 0);
1231
1232 manager.clear();
1233 assert_eq!(manager.num_checkpoints(), 0);
1234 assert_eq!(manager.storage_used(), 0);
1235 }
1236
1237 #[test]
1238 fn test_checkpoint_manager_restore_not_found() {
1239 let manager = CheckpointManager::new(10, 1024 * 1024, 3);
1240
1241 let result = manager.restore_at(SimTime::from_secs(5.0));
1242 assert!(result.is_err());
1243 }
1244
1245 #[test]
1246 fn test_event_journal_entries() {
1247 let mut journal = EventJournal::new(false);
1248
1249 journal
1250 .append(SimTime::from_secs(1.0), 100, &"event1", None)
1251 .ok();
1252 journal
1253 .append(SimTime::from_secs(2.0), 200, &"event2", None)
1254 .ok();
1255
1256 let entries = journal.entries();
1257 assert_eq!(entries.len(), 2);
1258 assert_eq!(entries[0].step, 100);
1259 assert_eq!(entries[1].step, 200);
1260 }
1261
1262 #[test]
1263 fn test_event_journal_clear() {
1264 let mut journal = EventJournal::new(false);
1265
1266 journal
1267 .append(SimTime::from_secs(1.0), 100, &"event", None)
1268 .ok();
1269 assert!(!journal.is_empty());
1270
1271 journal.clear();
1272 assert!(journal.is_empty());
1273 assert_eq!(journal.len(), 0);
1274 }
1275
1276 #[test]
1277 fn test_event_journal_with_rng() {
1278 let mut journal = EventJournal::new(true); let rng = SimRng::new(42);
1280 let rng_state = rng.save_state();
1281
1282 journal
1283 .append(SimTime::from_secs(1.0), 100, &"event", Some(&rng_state))
1284 .ok();
1285
1286 let entries = journal.entries();
1287 assert!(entries[0].rng_state.is_some());
1288 }
1289
1290 #[test]
1291 fn test_time_scrubber_seek_same_time() {
1292 let mut scrubber = TimeScrubber::new(10, 1024 * 1024, 3, false);
1293 let state = SimState::new();
1294 let rng = SimRng::new(42);
1295
1296 scrubber
1297 .checkpoints_mut()
1298 .checkpoint(SimTime::ZERO, 0, &state, rng.save_state())
1299 .ok();
1300
1301 let _ = scrubber.seek_to(SimTime::ZERO);
1303 let result = scrubber.seek_to(SimTime::ZERO);
1305 assert!(result.is_ok());
1306 }
1307
1308 #[test]
1309 fn test_time_scrubber_current_state() {
1310 let scrubber = TimeScrubber::new(10, 1024 * 1024, 3, false);
1311 let state = scrubber.current_state();
1312 assert_eq!(state.num_bodies(), 0);
1313 }
1314
1315 #[test]
1316 fn test_time_scrubber_journal_accessors() {
1317 let mut scrubber = TimeScrubber::new(10, 1024 * 1024, 3, false);
1318
1319 assert!(scrubber.journal().is_empty());
1321
1322 scrubber
1324 .journal_mut()
1325 .append(SimTime::from_secs(1.0), 100, &"event", None)
1326 .ok();
1327
1328 assert!(!scrubber.journal().is_empty());
1329 }
1330
1331 #[test]
1332 fn test_streaming_checkpoint_total_bytes() {
1333 let temp_dir = tempfile::tempdir().ok().unwrap();
1334 let mut manager = StreamingCheckpointManager::new(temp_dir.path(), 10, 3)
1335 .ok()
1336 .unwrap();
1337
1338 assert_eq!(manager.total_bytes_written(), 0);
1339
1340 let state = SimState::new();
1341 let rng = SimRng::new(42);
1342
1343 manager
1344 .checkpoint_streaming(SimTime::from_secs(1.0), 10, &state, &rng.save_state())
1345 .ok();
1346
1347 assert!(manager.total_bytes_written() > 0);
1348 }
1349
1350 #[test]
1351 fn test_checkpoint_compressed_size() {
1352 let state = SimState::new();
1353 let rng = SimRng::new(42);
1354
1355 let checkpoint =
1356 Checkpoint::create(SimTime::from_secs(1.0), 100, &state, rng.save_state(), 3)
1357 .ok()
1358 .unwrap();
1359
1360 assert!(checkpoint.compressed_size() > 0);
1361 }
1362
1363 #[test]
1364 fn test_checkpoint_clone() {
1365 let state = SimState::new();
1366 let rng = SimRng::new(42);
1367
1368 let checkpoint =
1369 Checkpoint::create(SimTime::from_secs(1.0), 100, &state, rng.save_state(), 3)
1370 .ok()
1371 .unwrap();
1372
1373 let cloned = checkpoint.clone();
1374 assert_eq!(cloned.step, checkpoint.step);
1375 assert_eq!(cloned.hash, checkpoint.hash);
1376 }
1377
1378 #[test]
1379 fn test_journal_entry_clone() {
1380 let mut journal = EventJournal::new(false);
1381 journal
1382 .append(SimTime::from_secs(1.0), 100, &"event", None)
1383 .ok();
1384
1385 let entry = &journal.entries()[0];
1386 let cloned = entry.clone();
1387 assert_eq!(cloned.step, entry.step);
1388 }
1389
1390 #[test]
1391 fn test_versioned_entry_debug() {
1392 let entry = VersionedEntry::new((1, 0, 0), "test", &"data")
1393 .ok()
1394 .unwrap();
1395 let debug = format!("{:?}", entry);
1396 assert!(debug.contains("VersionedEntry"));
1397 }
1398
1399 #[test]
1400 fn test_event_header_debug() {
1401 let header = EventHeader {
1402 time: SimTime::from_secs(1.0),
1403 event_type: 42,
1404 payload_offset: 0,
1405 payload_size: 100,
1406 sequence: 1,
1407 };
1408 let debug = format!("{:?}", header);
1409 assert!(debug.contains("EventHeader"));
1410 }
1411
1412 #[test]
1413 fn test_checkpoint_header_debug() {
1414 let rng = SimRng::new(42);
1415 let header = CheckpointHeader {
1416 time: SimTime::from_secs(1.0),
1417 step: 100,
1418 rng_state: rng.save_state(),
1419 version: (1, 0, 0),
1420 };
1421 let debug = format!("{:?}", header);
1422 assert!(debug.contains("CheckpointHeader"));
1423 }
1424
1425 #[test]
1426 fn test_checkpoint_manager_should_checkpoint() {
1427 let manager = CheckpointManager::new(10, 1024 * 1024, 3);
1428 assert!(manager.should_checkpoint(0));
1429 assert!(!manager.should_checkpoint(5));
1430 assert!(manager.should_checkpoint(10));
1431 assert!(manager.should_checkpoint(20));
1432 }
1433
1434 #[test]
1435 fn test_split_journal_seek_before_start() {
1436 let mut journal = SplitEventJournal::new();
1437
1438 for i in 1..10 {
1439 journal
1440 .append(SimTime::from_secs(i as f64), i as u32, &i)
1441 .ok();
1442 }
1443
1444 let idx = journal.seek_to_time(SimTime::from_secs(0.5));
1446 assert_eq!(idx, Some(0));
1447 }
1448
1449 #[test]
1450 fn test_split_journal_seek_after_end() {
1451 let mut journal = SplitEventJournal::new();
1452
1453 for i in 0..5 {
1454 journal
1455 .append(SimTime::from_secs(i as f64), i as u32, &i)
1456 .ok();
1457 }
1458
1459 let idx = journal.seek_to_time(SimTime::from_secs(100.0));
1461 assert_eq!(idx, None);
1462 }
1463
1464 #[test]
1465 fn test_split_journal_empty_seek() {
1466 let journal = SplitEventJournal::new();
1467 let idx = journal.seek_to_time(SimTime::from_secs(1.0));
1468 assert_eq!(idx, None);
1469 }
1470}
1471
1472#[cfg(test)]
1473mod proptests {
1474 use super::*;
1475 use crate::engine::rng::SimRng;
1476 use crate::engine::state::Vec3;
1477 use proptest::prelude::*;
1478
1479 proptest! {
1480 #[test]
1482 fn prop_checkpoint_roundtrip(
1483 x in -1000.0f64..1000.0,
1484 y in -1000.0f64..1000.0,
1485 z in -1000.0f64..1000.0,
1486 mass in 0.1f64..1000.0,
1487 seed in 0u64..u64::MAX,
1488 ) {
1489 let mut state = SimState::new();
1490 state.add_body(mass, Vec3::new(x, y, z), Vec3::zero());
1491
1492 let rng = SimRng::new(seed);
1493 let checkpoint = Checkpoint::create(
1494 SimTime::from_secs(1.0),
1495 100,
1496 &state,
1497 rng.save_state(),
1498 3,
1499 );
1500
1501 prop_assert!(checkpoint.is_ok());
1502 let checkpoint = checkpoint.ok().unwrap();
1503
1504 let restored = checkpoint.restore();
1505 prop_assert!(restored.is_ok());
1506 let restored = restored.ok().unwrap();
1507
1508 prop_assert_eq!(restored.num_bodies(), state.num_bodies());
1509 prop_assert!((restored.positions()[0].x - x).abs() < 1e-10);
1510 prop_assert!((restored.positions()[0].y - y).abs() < 1e-10);
1511 prop_assert!((restored.positions()[0].z - z).abs() < 1e-10);
1512 }
1513 }
1514}