1use super::file::{TraceFileError, TraceReader};
45use super::replay::{ReplayEvent, TraceMetadata};
46use super::replayer::{Breakpoint, DivergenceError, EventSource, ReplayMode};
47use serde::{Deserialize, Serialize};
48use std::io;
49use std::path::Path;
50
51#[derive(Debug, thiserror::Error)]
57pub enum StreamingReplayError {
58 #[error("file error: {0}")]
60 File(#[from] TraceFileError),
61
62 #[error("I/O error: {0}")]
64 Io(#[from] io::Error),
65
66 #[error("invalid checkpoint: {0}")]
68 InvalidCheckpoint(String),
69
70 #[error("checkpoint mismatch: {0}")]
72 CheckpointMismatch(String),
73
74 #[error("{0}")]
76 Divergence(#[from] DivergenceError),
77
78 #[error("serialization error: {0}")]
80 Serialize(String),
81}
82
83pub type StreamingReplayResult<T> = Result<T, StreamingReplayError>;
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub struct ReplayProgress {
93 pub events_processed: u64,
95 pub total_events: u64,
97}
98
99impl ReplayProgress {
100 #[must_use]
102 pub const fn new(events_processed: u64, total_events: u64) -> Self {
103 Self {
104 events_processed,
105 total_events,
106 }
107 }
108
109 #[must_use]
111 #[allow(clippy::cast_precision_loss)] pub fn percent(&self) -> f64 {
113 if self.total_events == 0 {
114 100.0
115 } else {
116 (self.events_processed as f64 / self.total_events as f64) * 100.0
117 }
118 }
119
120 #[must_use]
122 #[allow(clippy::cast_precision_loss)] pub fn fraction(&self) -> f64 {
124 if self.total_events == 0 {
125 1.0
126 } else {
127 self.events_processed as f64 / self.total_events as f64
128 }
129 }
130
131 #[must_use]
133 pub fn is_complete(&self) -> bool {
134 self.events_processed >= self.total_events
135 }
136
137 #[must_use]
139 pub fn remaining(&self) -> u64 {
140 self.total_events.saturating_sub(self.events_processed)
141 }
142}
143
144impl std::fmt::Display for ReplayProgress {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 write!(
147 f,
148 "{}/{} ({:.1}%)",
149 self.events_processed,
150 self.total_events,
151 self.percent()
152 )
153 }
154}
155
156#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
171pub struct ReplayCheckpoint {
172 pub events_processed: u64,
174
175 pub total_events: u64,
177
178 pub seed: u64,
180
181 pub metadata_hash: u64,
183
184 pub created_at: u64,
186}
187
188impl ReplayCheckpoint {
189 fn new(events_processed: u64, total_events: u64, metadata: &TraceMetadata) -> Self {
191 Self {
192 events_processed,
193 total_events,
194 seed: metadata.seed,
195 metadata_hash: Self::hash_metadata(metadata),
196 created_at: metadata.recorded_at.saturating_add(events_processed),
199 }
200 }
201
202 fn validate(&self, metadata: &TraceMetadata, total_events: u64) -> StreamingReplayResult<()> {
204 if self.seed != metadata.seed {
205 return Err(StreamingReplayError::CheckpointMismatch(format!(
206 "seed mismatch: checkpoint has {}, trace has {}",
207 self.seed, metadata.seed
208 )));
209 }
210
211 let expected_hash = Self::hash_metadata(metadata);
212 if self.metadata_hash != expected_hash {
213 return Err(StreamingReplayError::CheckpointMismatch(
214 "metadata hash mismatch".to_string(),
215 ));
216 }
217
218 if self.total_events != total_events {
219 return Err(StreamingReplayError::CheckpointMismatch(format!(
220 "event count mismatch: checkpoint has {}, trace has {}",
221 self.total_events, total_events
222 )));
223 }
224
225 if self.events_processed > total_events {
226 return Err(StreamingReplayError::CheckpointMismatch(format!(
227 "checkpoint position {} exceeds trace length {}",
228 self.events_processed, total_events
229 )));
230 }
231
232 Ok(())
233 }
234
235 fn hash_metadata(metadata: &TraceMetadata) -> u64 {
237 use std::hash::{Hash, Hasher};
238
239 struct SimpleHasher(u64);
240
241 impl Hasher for SimpleHasher {
242 fn finish(&self) -> u64 {
243 self.0
244 }
245
246 fn write(&mut self, bytes: &[u8]) {
247 for byte in bytes {
248 self.0 = self.0.wrapping_mul(31).wrapping_add(u64::from(*byte));
249 }
250 }
251 }
252
253 let mut hasher = SimpleHasher(0);
254 metadata.seed.hash(&mut hasher);
255 metadata.version.hash(&mut hasher);
256 metadata.recorded_at.hash(&mut hasher);
257 metadata.config_hash.hash(&mut hasher);
258 metadata.description.hash(&mut hasher);
259 hasher.finish()
260 }
261
262 pub fn to_bytes(&self) -> StreamingReplayResult<Vec<u8>> {
268 rmp_serde::to_vec(self)
269 .map_err(|e: rmp_serde::encode::Error| StreamingReplayError::Serialize(e.to_string()))
270 }
271
272 pub fn from_bytes(bytes: &[u8]) -> StreamingReplayResult<Self> {
278 rmp_serde::from_slice(bytes).map_err(|e: rmp_serde::decode::Error| {
279 StreamingReplayError::InvalidCheckpoint(e.to_string())
280 })
281 }
282}
283
284pub struct StreamingReplayer {
311 reader: TraceReader,
313
314 metadata: TraceMetadata,
316
317 total_events: u64,
319
320 events_consumed: u64,
322
323 peeked: Option<ReplayEvent>,
325
326 mode: ReplayMode,
328
329 at_breakpoint: bool,
331 event_source_error: Option<StreamingReplayError>,
336}
337
338impl StreamingReplayer {
339 pub fn open(path: impl AsRef<Path>) -> StreamingReplayResult<Self> {
345 let reader = TraceReader::open(path)?;
346 let metadata = reader.metadata().clone();
347 let total_events = reader.event_count();
348
349 Ok(Self {
350 reader,
351 metadata,
352 total_events,
353 events_consumed: 0,
354 peeked: None,
355 mode: ReplayMode::Run,
356 at_breakpoint: false,
357 event_source_error: None,
358 })
359 }
360
361 pub fn resume(
373 path: impl AsRef<Path>,
374 checkpoint: ReplayCheckpoint,
375 ) -> StreamingReplayResult<Self> {
376 let mut reader = TraceReader::open(path)?;
377 let metadata = reader.metadata().clone();
378 let total_events = reader.event_count();
379
380 checkpoint.validate(&metadata, total_events)?;
382
383 for _ in 0..checkpoint.events_processed {
385 if reader.read_event()?.is_none() {
386 return Err(StreamingReplayError::CheckpointMismatch(
387 "trace ended before checkpoint position".to_string(),
388 ));
389 }
390 }
391
392 Ok(Self {
393 reader,
394 metadata,
395 total_events,
396 events_consumed: checkpoint.events_processed,
397 peeked: None,
398 mode: ReplayMode::Run,
399 at_breakpoint: false,
400 event_source_error: None,
401 })
402 }
403
404 #[must_use]
406 pub fn metadata(&self) -> &TraceMetadata {
407 &self.metadata
408 }
409
410 #[must_use]
412 pub fn total_events(&self) -> u64 {
413 self.total_events
414 }
415
416 #[must_use]
418 pub fn events_consumed(&self) -> u64 {
419 self.events_consumed
420 }
421
422 #[must_use]
424 pub fn progress(&self) -> ReplayProgress {
425 ReplayProgress::new(self.events_consumed, self.total_events)
426 }
427
428 #[must_use]
430 pub fn is_complete(&self) -> bool {
431 self.events_consumed >= self.total_events && self.peeked.is_none()
432 }
433
434 #[must_use]
436 pub fn at_breakpoint(&self) -> bool {
437 self.at_breakpoint
438 }
439
440 #[must_use]
442 pub fn last_event_source_error(&self) -> Option<&StreamingReplayError> {
443 self.event_source_error.as_ref()
444 }
445
446 pub fn take_event_source_error(&mut self) -> Option<StreamingReplayError> {
448 self.event_source_error.take()
449 }
450
451 pub fn set_mode(&mut self, mode: ReplayMode) {
453 self.mode = mode;
454 self.at_breakpoint = false;
455 }
456
457 #[must_use]
459 pub fn mode(&self) -> &ReplayMode {
460 &self.mode
461 }
462
463 pub fn peek(&mut self) -> StreamingReplayResult<Option<&ReplayEvent>> {
469 if self.peeked.is_none() {
470 self.peeked = self.reader.read_event()?;
471 }
472 Ok(self.peeked.as_ref())
473 }
474
475 pub fn next_event(&mut self) -> StreamingReplayResult<Option<ReplayEvent>> {
481 let event = if let Some(peeked) = self.peeked.take() {
482 Some(peeked)
483 } else {
484 self.reader.read_event()?
485 };
486
487 if event.is_some() {
488 self.events_consumed += 1;
489
490 if let Some(ref e) = event {
492 self.at_breakpoint = self.check_breakpoint(e);
493 }
494 }
495
496 Ok(event)
497 }
498
499 pub fn verify(&mut self, actual: &ReplayEvent) -> StreamingReplayResult<()> {
507 let current_position = self.events_consumed;
509
510 let expected = self.peek()?;
511
512 let Some(expected) = expected else {
513 return Err(StreamingReplayError::Divergence(DivergenceError {
514 index: current_position as usize,
515 expected: None,
516 actual: actual.clone(),
517 context: "Trace ended but execution continued".to_string(),
518 }));
519 };
520
521 if expected != actual {
522 let expected_clone = expected.clone();
524 return Err(StreamingReplayError::Divergence(DivergenceError {
525 index: current_position as usize,
526 expected: Some(expected_clone),
527 actual: actual.clone(),
528 context: format!("Event mismatch at position {current_position}"),
529 }));
530 }
531
532 Ok(())
533 }
534
535 pub fn verify_and_advance(
541 &mut self,
542 actual: &ReplayEvent,
543 ) -> StreamingReplayResult<ReplayEvent> {
544 self.verify(actual)?;
545 self.next_event()
546 .transpose()
547 .expect("event was peeked so must exist")
548 }
549
550 #[must_use]
555 pub fn checkpoint(&self) -> ReplayCheckpoint {
556 ReplayCheckpoint::new(self.events_consumed, self.total_events, &self.metadata)
557 }
558
559 pub fn step(&mut self) -> StreamingReplayResult<Option<ReplayEvent>> {
569 self.at_breakpoint = false;
570 self.next_event()
571 }
572
573 pub fn run(&mut self) -> StreamingReplayResult<u64> {
581 let mut count = 0u64;
582
583 while !self.is_complete() && !self.at_breakpoint {
584 if self.next_event()?.is_some() {
585 count += 1;
586 }
587 }
588
589 Ok(count)
590 }
591
592 pub fn run_with<F, E>(&mut self, mut callback: F) -> Result<u64, E>
601 where
602 F: FnMut(ReplayEvent, ReplayProgress) -> Result<(), E>,
603 E: From<StreamingReplayError>,
604 {
605 let mut count = 0u64;
606
607 while !self.is_complete() && !self.at_breakpoint {
608 if let Some(event) = self.next_event()? {
609 let progress = self.progress();
610 callback(event, progress)?;
611 count += 1;
612 }
613 }
614
615 Ok(count)
616 }
617
618 fn check_breakpoint(&self, event: &ReplayEvent) -> bool {
620 match &self.mode {
621 ReplayMode::Step => true,
622 ReplayMode::Run => false,
623 ReplayMode::RunTo(breakpoint) => match breakpoint {
624 Breakpoint::EventIndex(idx) => self.events_consumed as usize == *idx + 1,
625 Breakpoint::Tick(tick) => {
626 if let ReplayEvent::TaskScheduled { at_tick, .. } = event {
627 *at_tick >= *tick
628 } else {
629 false
630 }
631 }
632 Breakpoint::Task(task_id) => {
633 if let ReplayEvent::TaskScheduled { task, .. } = event {
634 task == task_id
635 } else {
636 false
637 }
638 }
639 },
640 }
641 }
642}
643
644impl EventSource for StreamingReplayer {
645 fn next_event(&mut self) -> Option<ReplayEvent> {
646 match Self::next_event(self) {
647 Ok(event) => {
648 self.event_source_error = None;
649 event
650 }
651 Err(err) => {
652 self.event_source_error = Some(err);
653 None
654 }
655 }
656 }
657
658 fn metadata(&self) -> &TraceMetadata {
659 &self.metadata
660 }
661}
662
663#[cfg(test)]
668mod tests {
669 use super::*;
670 use crate::trace::file::{HEADER_SIZE, TraceWriter, write_trace};
671 use crate::trace::replay::CompactTaskId;
672 use std::fs::OpenOptions;
673 use std::io::{Seek, SeekFrom, Write};
674 use tempfile::NamedTempFile;
675
676 fn sample_events(count: u64) -> Vec<ReplayEvent> {
677 (0..count)
678 .map(|i| ReplayEvent::TaskScheduled {
679 task: CompactTaskId(i),
680 at_tick: i,
681 })
682 .collect()
683 }
684
685 #[test]
686 fn basic_streaming_replay() {
687 let temp = NamedTempFile::new().unwrap();
688 let path = temp.path();
689
690 let metadata = TraceMetadata::new(42);
692 let events = sample_events(100);
693 write_trace(path, &metadata, &events).unwrap();
694
695 let mut replayer = StreamingReplayer::open(path).unwrap();
697
698 assert_eq!(replayer.total_events(), 100);
699 assert_eq!(replayer.events_consumed(), 0);
700 assert!(!replayer.is_complete());
701
702 let mut count = 0u64;
704 while let Some(event) = replayer.next_event().unwrap() {
705 if let ReplayEvent::TaskScheduled { task, at_tick } = event {
706 assert_eq!(task.0, count);
707 assert_eq!(at_tick, count);
708 } else {
709 panic!("unexpected event type");
710 }
711 count += 1;
712 }
713
714 assert_eq!(count, 100);
715 assert!(replayer.is_complete());
716 }
717
718 #[test]
719 fn progress_tracking() {
720 let temp = NamedTempFile::new().unwrap();
721 let path = temp.path();
722
723 let metadata = TraceMetadata::new(42);
724 let events = sample_events(100);
725 write_trace(path, &metadata, &events).unwrap();
726
727 let mut replayer = StreamingReplayer::open(path).unwrap();
728
729 let progress = replayer.progress();
731 assert_eq!(progress.events_processed, 0);
732 assert_eq!(progress.total_events, 100);
733 assert!((progress.percent() - 0.0).abs() < 0.01);
734
735 for _ in 0..50 {
737 replayer.next_event().unwrap();
738 }
739
740 let progress = replayer.progress();
742 assert_eq!(progress.events_processed, 50);
743 assert!((progress.percent() - 50.0).abs() < 0.01);
744 assert_eq!(progress.remaining(), 50);
745
746 while replayer.next_event().unwrap().is_some() {}
748
749 let progress = replayer.progress();
751 assert!(progress.is_complete());
752 assert!((progress.percent() - 100.0).abs() < 0.01);
753 }
754
755 #[test]
756 fn peek_without_consuming() {
757 let temp = NamedTempFile::new().unwrap();
758 let path = temp.path();
759
760 let metadata = TraceMetadata::new(42);
761 let events = sample_events(10);
762 write_trace(path, &metadata, &events).unwrap();
763
764 let mut replayer = StreamingReplayer::open(path).unwrap();
765
766 let peeked1 = replayer.peek().unwrap().cloned();
768 let peeked2 = replayer.peek().unwrap().cloned();
769 assert_eq!(peeked1, peeked2);
770 assert_eq!(replayer.events_consumed(), 0);
771
772 let consumed = replayer.next_event().unwrap();
774 assert_eq!(consumed, peeked1);
775 assert_eq!(replayer.events_consumed(), 1);
776
777 let peeked3 = replayer.peek().unwrap().cloned();
779 assert_ne!(peeked3, peeked1);
780 }
781
782 #[test]
783 fn checkpoint_and_resume() {
784 let temp = NamedTempFile::new().unwrap();
785 let path = temp.path();
786
787 let metadata = TraceMetadata::new(42);
788 let events = sample_events(100);
789 write_trace(path, &metadata, &events).unwrap();
790
791 let mut replayer = StreamingReplayer::open(path).unwrap();
793 for _ in 0..50 {
794 replayer.next_event().unwrap();
795 }
796
797 let checkpoint = replayer.checkpoint();
798 assert_eq!(checkpoint.events_processed, 50);
799 assert_eq!(checkpoint.total_events, 100);
800
801 let checkpoint_bytes = checkpoint.to_bytes().unwrap();
803 let restored_checkpoint = ReplayCheckpoint::from_bytes(&checkpoint_bytes).unwrap();
804
805 let mut resumed = StreamingReplayer::resume(path, restored_checkpoint).unwrap();
807 assert_eq!(resumed.events_consumed(), 50);
808
809 let mut count = 50u64;
811 while let Some(event) = resumed.next_event().unwrap() {
812 if let ReplayEvent::TaskScheduled { task, .. } = event {
813 assert_eq!(task.0, count);
814 }
815 count += 1;
816 }
817
818 assert_eq!(count, 100);
819 }
820
821 #[test]
822 fn checkpoint_validation() {
823 let temp1 = NamedTempFile::new().unwrap();
824 let temp2 = NamedTempFile::new().unwrap();
825
826 let metadata1 = TraceMetadata::new(42);
828 let metadata2 = TraceMetadata::new(99);
829 write_trace(temp1.path(), &metadata1, &sample_events(100)).unwrap();
830 write_trace(temp2.path(), &metadata2, &sample_events(100)).unwrap();
831
832 let mut replayer = StreamingReplayer::open(temp1.path()).unwrap();
834 for _ in 0..50 {
835 replayer.next_event().unwrap();
836 }
837 let checkpoint = replayer.checkpoint();
838
839 let result = StreamingReplayer::resume(temp2.path(), checkpoint);
841 assert!(matches!(
842 result,
843 Err(StreamingReplayError::CheckpointMismatch(_))
844 ));
845 }
846
847 #[test]
848 fn checkpoint_validation_rejects_same_seed_metadata_drift() {
849 let temp1 = NamedTempFile::new().unwrap();
850 let temp2 = NamedTempFile::new().unwrap();
851
852 let metadata1 = TraceMetadata {
853 version: super::super::replay::REPLAY_SCHEMA_VERSION,
854 seed: 42,
855 recorded_at: 100,
856 config_hash: 0xCAFE,
857 description: Some("trace-a".into()),
858 };
859 let metadata2 = TraceMetadata {
860 version: super::super::replay::REPLAY_SCHEMA_VERSION,
861 seed: 42,
862 recorded_at: 200,
863 config_hash: 0xCAFE,
864 description: Some("trace-b".into()),
865 };
866 write_trace(temp1.path(), &metadata1, &sample_events(4)).unwrap();
867 write_trace(temp2.path(), &metadata2, &sample_events(4)).unwrap();
868
869 let mut replayer = StreamingReplayer::open(temp1.path()).unwrap();
870 for _ in 0..2 {
871 replayer.next_event().unwrap();
872 }
873 let checkpoint = replayer.checkpoint();
874
875 let result = StreamingReplayer::resume(temp2.path(), checkpoint);
876 assert!(matches!(
877 result,
878 Err(StreamingReplayError::CheckpointMismatch(_))
879 ));
880 }
881
882 #[test]
883 fn checkpoint_validation_rejects_event_count_drift() {
884 let temp1 = NamedTempFile::new().unwrap();
885 let temp2 = NamedTempFile::new().unwrap();
886
887 let metadata = TraceMetadata {
888 version: super::super::replay::REPLAY_SCHEMA_VERSION,
889 seed: 7,
890 recorded_at: 500,
891 config_hash: 0xBEEF,
892 description: Some("same-metadata".into()),
893 };
894 write_trace(temp1.path(), &metadata, &sample_events(4)).unwrap();
895 write_trace(temp2.path(), &metadata, &sample_events(6)).unwrap();
896
897 let mut replayer = StreamingReplayer::open(temp1.path()).unwrap();
898 for _ in 0..2 {
899 replayer.next_event().unwrap();
900 }
901 let checkpoint = replayer.checkpoint();
902
903 let result = StreamingReplayer::resume(temp2.path(), checkpoint);
904 assert!(matches!(
905 result,
906 Err(StreamingReplayError::CheckpointMismatch(_))
907 ));
908 }
909
910 #[test]
911 fn checkpoint_bytes_are_stable_for_same_position() {
912 let temp = NamedTempFile::new().unwrap();
913 let path = temp.path();
914
915 let metadata = TraceMetadata {
916 version: super::super::replay::REPLAY_SCHEMA_VERSION,
917 seed: 42,
918 recorded_at: 1_000,
919 config_hash: 0xCAFE,
920 description: Some("stable checkpoint".into()),
921 };
922 write_trace(path, &metadata, &sample_events(5)).unwrap();
923
924 let mut replayer = StreamingReplayer::open(path).unwrap();
925 for _ in 0..3 {
926 replayer.next_event().unwrap();
927 }
928
929 let checkpoint_a = replayer.checkpoint();
930 let checkpoint_b = replayer.checkpoint();
931
932 assert_eq!(checkpoint_a.events_processed, 3);
933 assert_eq!(checkpoint_a.total_events, 5);
934 assert_eq!(checkpoint_a.created_at, 1_003);
935 assert_eq!(checkpoint_a.created_at, checkpoint_b.created_at);
936 assert_eq!(
937 checkpoint_a.to_bytes().unwrap(),
938 checkpoint_b.to_bytes().unwrap()
939 );
940 }
941
942 #[test]
943 fn checkpoint_created_at_advances_with_position() {
944 let temp = NamedTempFile::new().unwrap();
945 let path = temp.path();
946
947 let metadata = TraceMetadata {
948 version: super::super::replay::REPLAY_SCHEMA_VERSION,
949 seed: 7,
950 recorded_at: 500,
951 config_hash: 0xBEEF,
952 description: None,
953 };
954 write_trace(path, &metadata, &sample_events(4)).unwrap();
955
956 let mut replayer = StreamingReplayer::open(path).unwrap();
957
958 let first = replayer.checkpoint();
959 assert_eq!(first.created_at, 500);
960
961 replayer.next_event().unwrap();
962 let second = replayer.checkpoint();
963 assert_eq!(second.created_at, 501);
964 assert_eq!(second.created_at, first.created_at + 1);
965
966 replayer.next_event().unwrap();
967 let third = replayer.checkpoint();
968 assert_eq!(third.created_at, 502);
969 }
970
971 #[test]
972 fn run_with_callback() {
973 let temp = NamedTempFile::new().unwrap();
974 let path = temp.path();
975
976 let metadata = TraceMetadata::new(42);
977 let events = sample_events(50);
978 write_trace(path, &metadata, &events).unwrap();
979
980 let mut replayer = StreamingReplayer::open(path).unwrap();
981
982 let mut event_ids = Vec::new();
983 let count = replayer
984 .run_with(|event, progress| {
985 if let ReplayEvent::TaskScheduled { task, .. } = event {
986 event_ids.push(task.0);
987 }
988 assert!(!progress.is_complete() || progress.events_processed == 50);
990 Ok::<_, StreamingReplayError>(())
991 })
992 .unwrap();
993
994 assert_eq!(count, 50);
995 assert_eq!(event_ids.len(), 50);
996 for (i, id) in event_ids.iter().enumerate() {
997 assert_eq!(*id, i as u64);
998 }
999 }
1000
1001 #[test]
1002 fn large_trace_streaming() {
1003 let temp = NamedTempFile::new().unwrap();
1004 let path = temp.path();
1005
1006 let metadata = TraceMetadata::new(42);
1007 let event_count = 10_000u64;
1008
1009 {
1011 let mut writer = TraceWriter::create(path).unwrap();
1012 writer.write_metadata(&metadata).unwrap();
1013 for i in 0..event_count {
1014 writer
1015 .write_event(&ReplayEvent::TaskScheduled {
1016 task: CompactTaskId(i),
1017 at_tick: i,
1018 })
1019 .unwrap();
1020 }
1021 writer.finish().unwrap();
1022 }
1023
1024 let mut replayer = StreamingReplayer::open(path).unwrap();
1026 assert_eq!(replayer.total_events(), event_count);
1027
1028 let mut count = 0u64;
1029 while replayer.next_event().unwrap().is_some() {
1030 count += 1;
1031 }
1032
1033 assert_eq!(count, event_count);
1034 }
1035
1036 #[test]
1037 fn step_mode_streaming() {
1038 let temp = NamedTempFile::new().unwrap();
1039 let path = temp.path();
1040
1041 let metadata = TraceMetadata::new(42);
1042 let events = sample_events(5);
1043 write_trace(path, &metadata, &events).unwrap();
1044
1045 let mut replayer = StreamingReplayer::open(path).unwrap();
1046 replayer.set_mode(ReplayMode::Step);
1047
1048 for _ in 0..5 {
1050 replayer.step().unwrap();
1051 assert!(replayer.at_breakpoint());
1052 }
1053
1054 let event = replayer.step().unwrap();
1056 assert!(event.is_none());
1057 }
1058
1059 #[test]
1060 fn breakpoint_at_tick() {
1061 let temp = NamedTempFile::new().unwrap();
1062 let path = temp.path();
1063
1064 let metadata = TraceMetadata::new(42);
1065 let events: Vec<_> = (0..10)
1066 .map(|i| ReplayEvent::TaskScheduled {
1067 task: CompactTaskId(i),
1068 at_tick: i * 10, })
1070 .collect();
1071 write_trace(path, &metadata, &events).unwrap();
1072
1073 let mut replayer = StreamingReplayer::open(path).unwrap();
1074 replayer.set_mode(ReplayMode::RunTo(Breakpoint::Tick(50)));
1075
1076 let count = replayer.run().unwrap();
1077 assert!(replayer.at_breakpoint());
1079 assert_eq!(count, 6); }
1081
1082 #[test]
1083 fn empty_trace() {
1084 let temp = NamedTempFile::new().unwrap();
1085 let path = temp.path();
1086
1087 let metadata = TraceMetadata::new(42);
1088 write_trace(path, &metadata, &[]).unwrap();
1089
1090 let mut replayer = StreamingReplayer::open(path).unwrap();
1091 assert_eq!(replayer.total_events(), 0);
1092 assert!(replayer.progress().is_complete());
1093
1094 let event = replayer.next_event().unwrap();
1095 assert!(event.is_none());
1096 }
1097
1098 #[test]
1099 fn verify_past_end_of_trace_reports_trace_exhausted() {
1100 let temp = NamedTempFile::new().unwrap();
1101 let path = temp.path();
1102
1103 let metadata = TraceMetadata::new(42);
1104 let events = vec![ReplayEvent::RngSeed { seed: 42 }];
1105 write_trace(path, &metadata, &events).unwrap();
1106
1107 let mut replayer = StreamingReplayer::open(path).unwrap();
1108 assert!(replayer.next_event().unwrap().is_some());
1109 assert!(replayer.is_complete());
1110
1111 let actual = ReplayEvent::RngSeed { seed: 99 };
1112 let err = replayer.verify(&actual).unwrap_err();
1113 match err {
1114 StreamingReplayError::Divergence(divergence) => {
1115 assert!(divergence.expected.is_none());
1116 assert_eq!(divergence.index, 1);
1117 assert!(divergence.context.contains("Trace ended"));
1118 assert!(format!("{divergence}").contains("<trace_exhausted>"));
1119 }
1120 other => panic!("expected divergence error, got {other:?}"),
1121 }
1122 }
1123
1124 #[test]
1125 fn verify_mismatch_preserves_expected_event() {
1126 let temp = NamedTempFile::new().unwrap();
1127 let path = temp.path();
1128
1129 let metadata = TraceMetadata::new(42);
1130 let events = vec![ReplayEvent::TaskScheduled {
1131 task: CompactTaskId(1),
1132 at_tick: 10,
1133 }];
1134 write_trace(path, &metadata, &events).unwrap();
1135
1136 let mut replayer = StreamingReplayer::open(path).unwrap();
1137 let actual = ReplayEvent::TaskScheduled {
1138 task: CompactTaskId(2),
1139 at_tick: 10,
1140 };
1141 let err = replayer.verify(&actual).unwrap_err();
1142 match err {
1143 StreamingReplayError::Divergence(divergence) => {
1144 assert_eq!(
1145 divergence.expected,
1146 Some(ReplayEvent::TaskScheduled {
1147 task: CompactTaskId(1),
1148 at_tick: 10,
1149 })
1150 );
1151 assert_eq!(divergence.actual, actual);
1152 assert_eq!(divergence.index, 0);
1153 }
1154 other => panic!("expected divergence error, got {other:?}"),
1155 }
1156 }
1157
1158 #[test]
1159 fn progress_display() {
1160 let progress = ReplayProgress::new(250, 1000);
1161 let display = format!("{progress}");
1162 assert!(display.contains("250/1000"));
1163 assert!(display.contains("25.0%"));
1164 }
1165
1166 #[test]
1167 fn run_with_respects_runto_breakpoint() {
1168 let temp = NamedTempFile::new().unwrap();
1169 let path = temp.path();
1170
1171 let metadata = TraceMetadata::new(42);
1172 let events: Vec<_> = (0..10)
1173 .map(|i| ReplayEvent::TaskScheduled {
1174 task: CompactTaskId(i),
1175 at_tick: i * 10,
1176 })
1177 .collect();
1178 write_trace(path, &metadata, &events).unwrap();
1179
1180 let mut replayer = StreamingReplayer::open(path).unwrap();
1181 replayer.set_mode(ReplayMode::RunTo(Breakpoint::Tick(50)));
1182
1183 let count = replayer
1184 .run_with(|_, _| Ok::<_, StreamingReplayError>(()))
1185 .unwrap();
1186 assert_eq!(count, 6);
1187 assert!(replayer.at_breakpoint());
1188 }
1189
1190 #[test]
1191 fn run_with_respects_step_mode() {
1192 let temp = NamedTempFile::new().unwrap();
1193 let path = temp.path();
1194
1195 let metadata = TraceMetadata::new(7);
1196 let events = sample_events(5);
1197 write_trace(path, &metadata, &events).unwrap();
1198
1199 let mut replayer = StreamingReplayer::open(path).unwrap();
1200 replayer.set_mode(ReplayMode::Step);
1201
1202 let count = replayer
1203 .run_with(|_, _| Ok::<_, StreamingReplayError>(()))
1204 .unwrap();
1205 assert_eq!(count, 1);
1206 assert!(replayer.at_breakpoint());
1207 }
1208
1209 #[test]
1210 fn event_source_adapter_captures_stream_error() {
1211 let temp = NamedTempFile::new().unwrap();
1212 let path = temp.path();
1213
1214 let metadata = TraceMetadata::new(42);
1215 let events = vec![ReplayEvent::RngSeed { seed: 42 }];
1216 write_trace(path, &metadata, &events).unwrap();
1217
1218 let meta_len = rmp_serde::to_vec(&metadata).unwrap().len() as u64;
1220 let first_event_payload = HEADER_SIZE as u64 + meta_len + 8 + 4;
1221 let mut file = OpenOptions::new().write(true).open(path).unwrap();
1222 file.seek(SeekFrom::Start(first_event_payload)).unwrap();
1223 file.write_all(&[0xC1]).unwrap(); file.flush().unwrap();
1225
1226 let mut replayer = StreamingReplayer::open(path).unwrap();
1227 let event = <StreamingReplayer as EventSource>::next_event(&mut replayer);
1228 assert!(event.is_none());
1229
1230 let err = replayer
1231 .take_event_source_error()
1232 .expect("expected captured event-source error");
1233 assert!(matches!(err, StreamingReplayError::File(_)));
1234 assert!(replayer.last_event_source_error().is_none());
1235 }
1236}