Skip to main content

asupersync/trace/
streaming.rs

1//! Streaming replay for large traces.
2//!
3//! This module provides streaming support for processing traces that are too large
4//! to fit in memory. The key types are:
5//!
6//! - [`StreamingReplayer`]: Replays traces directly from file with O(1) memory
7//! - [`ReplayCheckpoint`]: Saves replay state for resumption
8//! - [`ReplayProgress`]: Progress tracking during replay
9//!
10//! # Memory Guarantees
11//!
12//! - [`StreamingReplayer`]: O(1) memory - only buffers current event
13//! - Reading: Uses [`TraceReader`] with streaming reads
14//! - Writing: Uses `TraceWriter` with streaming writes
15//!
16//! # Example
17//!
18//! ```ignore
19//! use asupersync::trace::streaming::{StreamingReplayer, ReplayProgress};
20//! use std::path::Path;
21//!
22//! // Open a large trace file for streaming replay
23//! let mut replayer = StreamingReplayer::open("large_trace.bin")?;
24//!
25//! // Process events one at a time
26//! while let Some(event) = replayer.next_event()? {
27//!     println!("Event: {:?}", event);
28//!
29//!     // Check progress
30//!     if replayer.progress().percent() > 50.0 {
31//!         println!("Halfway done!");
32//!     }
33//! }
34//!
35//! // For very long replays, checkpoint and resume later
36//! let checkpoint = replayer.checkpoint()?;
37//! std::fs::write("checkpoint.bin", checkpoint.to_bytes()?)?;
38//!
39//! // Later: resume from checkpoint
40//! let checkpoint = ReplayCheckpoint::from_bytes(&std::fs::read("checkpoint.bin")?)?;
41//! let mut resumed = StreamingReplayer::resume("large_trace.bin", checkpoint)?;
42//! ```
43
44use super::file::{TraceFileError, TraceReader};
45use super::replay::{ReplayEvent, TraceMetadata};
46use super::replayer::{Breakpoint, DivergenceError, EventSource, ReplayMode};
47use serde::{Deserialize, Serialize};
48use std::io;
49use std::path::Path;
50
51// =============================================================================
52// Errors
53// =============================================================================
54
55/// Errors specific to streaming replay operations.
56#[derive(Debug, thiserror::Error)]
57pub enum StreamingReplayError {
58    /// File operation error.
59    #[error("file error: {0}")]
60    File(#[from] TraceFileError),
61
62    /// I/O error.
63    #[error("I/O error: {0}")]
64    Io(#[from] io::Error),
65
66    /// Checkpoint is invalid or corrupt.
67    #[error("invalid checkpoint: {0}")]
68    InvalidCheckpoint(String),
69
70    /// Checkpoint doesn't match trace file.
71    #[error("checkpoint mismatch: {0}")]
72    CheckpointMismatch(String),
73
74    /// Divergence detected during replay.
75    #[error("{0}")]
76    Divergence(#[from] DivergenceError),
77
78    /// Serialization error.
79    #[error("serialization error: {0}")]
80    Serialize(String),
81}
82
83/// Result type for streaming replay operations.
84pub type StreamingReplayResult<T> = Result<T, StreamingReplayError>;
85
86// =============================================================================
87// Progress Tracking
88// =============================================================================
89
90/// Progress information during streaming replay.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub struct ReplayProgress {
93    /// Number of events processed so far.
94    pub events_processed: u64,
95    /// Total number of events in the trace.
96    pub total_events: u64,
97}
98
99impl ReplayProgress {
100    /// Creates a new progress tracker.
101    #[must_use]
102    pub const fn new(events_processed: u64, total_events: u64) -> Self {
103        Self {
104            events_processed,
105            total_events,
106        }
107    }
108
109    /// Returns progress as a percentage (0.0 to 100.0).
110    #[must_use]
111    #[allow(clippy::cast_precision_loss)] // Precision loss is acceptable for progress display
112    pub fn percent(&self) -> f64 {
113        if self.total_events == 0 {
114            100.0
115        } else {
116            (self.events_processed as f64 / self.total_events as f64) * 100.0
117        }
118    }
119
120    /// Returns progress as a fraction (0.0 to 1.0).
121    #[must_use]
122    #[allow(clippy::cast_precision_loss)] // Precision loss is acceptable for progress display
123    pub fn fraction(&self) -> f64 {
124        if self.total_events == 0 {
125            1.0
126        } else {
127            self.events_processed as f64 / self.total_events as f64
128        }
129    }
130
131    /// Returns true if replay is complete.
132    #[must_use]
133    pub fn is_complete(&self) -> bool {
134        self.events_processed >= self.total_events
135    }
136
137    /// Returns the number of remaining events.
138    #[must_use]
139    pub fn remaining(&self) -> u64 {
140        self.total_events.saturating_sub(self.events_processed)
141    }
142}
143
144impl std::fmt::Display for ReplayProgress {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        write!(
147            f,
148            "{}/{} ({:.1}%)",
149            self.events_processed,
150            self.total_events,
151            self.percent()
152        )
153    }
154}
155
156// =============================================================================
157// Checkpoint
158// =============================================================================
159
160/// A checkpoint for resuming long replays.
161///
162/// Checkpoints capture the current position in the trace, allowing replay
163/// to be suspended and resumed later without re-processing all events.
164///
165/// # Safety
166///
167/// Checkpoints are only valid for the specific trace file they were created from.
168/// Attempting to resume with a checkpoint from a different trace will result in
169/// an error.
170#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
171pub struct ReplayCheckpoint {
172    /// Number of events that have been processed.
173    pub events_processed: u64,
174
175    /// Total number of events in the trace this checkpoint came from.
176    pub total_events: u64,
177
178    /// The seed from the trace metadata (for validation).
179    pub seed: u64,
180
181    /// Hash of the trace metadata (for validation).
182    pub metadata_hash: u64,
183
184    /// Deterministic checkpoint timestamp derived from trace metadata and position.
185    pub created_at: u64,
186}
187
188impl ReplayCheckpoint {
189    /// Creates a new checkpoint.
190    fn new(events_processed: u64, total_events: u64, metadata: &TraceMetadata) -> Self {
191        Self {
192            events_processed,
193            total_events,
194            seed: metadata.seed,
195            metadata_hash: Self::hash_metadata(metadata),
196            // Keep checkpoint artifacts stable for identical replay state instead of
197            // reintroducing ambient wall-clock time into the replay toolchain.
198            created_at: metadata.recorded_at.saturating_add(events_processed),
199        }
200    }
201
202    /// Validates that this checkpoint matches the given trace metadata.
203    fn validate(&self, metadata: &TraceMetadata, total_events: u64) -> StreamingReplayResult<()> {
204        if self.seed != metadata.seed {
205            return Err(StreamingReplayError::CheckpointMismatch(format!(
206                "seed mismatch: checkpoint has {}, trace has {}",
207                self.seed, metadata.seed
208            )));
209        }
210
211        let expected_hash = Self::hash_metadata(metadata);
212        if self.metadata_hash != expected_hash {
213            return Err(StreamingReplayError::CheckpointMismatch(
214                "metadata hash mismatch".to_string(),
215            ));
216        }
217
218        if self.total_events != total_events {
219            return Err(StreamingReplayError::CheckpointMismatch(format!(
220                "event count mismatch: checkpoint has {}, trace has {}",
221                self.total_events, total_events
222            )));
223        }
224
225        if self.events_processed > total_events {
226            return Err(StreamingReplayError::CheckpointMismatch(format!(
227                "checkpoint position {} exceeds trace length {}",
228                self.events_processed, total_events
229            )));
230        }
231
232        Ok(())
233    }
234
235    /// Computes a hash of the trace metadata for validation.
236    fn hash_metadata(metadata: &TraceMetadata) -> u64 {
237        use std::hash::{Hash, Hasher};
238
239        struct SimpleHasher(u64);
240
241        impl Hasher for SimpleHasher {
242            fn finish(&self) -> u64 {
243                self.0
244            }
245
246            fn write(&mut self, bytes: &[u8]) {
247                for byte in bytes {
248                    self.0 = self.0.wrapping_mul(31).wrapping_add(u64::from(*byte));
249                }
250            }
251        }
252
253        let mut hasher = SimpleHasher(0);
254        metadata.seed.hash(&mut hasher);
255        metadata.version.hash(&mut hasher);
256        metadata.recorded_at.hash(&mut hasher);
257        metadata.config_hash.hash(&mut hasher);
258        metadata.description.hash(&mut hasher);
259        hasher.finish()
260    }
261
262    /// Serializes the checkpoint to bytes.
263    ///
264    /// # Errors
265    ///
266    /// Returns an error if serialization fails.
267    pub fn to_bytes(&self) -> StreamingReplayResult<Vec<u8>> {
268        rmp_serde::to_vec(self)
269            .map_err(|e: rmp_serde::encode::Error| StreamingReplayError::Serialize(e.to_string()))
270    }
271
272    /// Deserializes a checkpoint from bytes.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if deserialization fails.
277    pub fn from_bytes(bytes: &[u8]) -> StreamingReplayResult<Self> {
278        rmp_serde::from_slice(bytes).map_err(|e: rmp_serde::decode::Error| {
279            StreamingReplayError::InvalidCheckpoint(e.to_string())
280        })
281    }
282}
283
284// =============================================================================
285// Streaming Replayer
286// =============================================================================
287
288/// A streaming replayer that processes traces with O(1) memory.
289///
290/// Unlike [`TraceReplayer`][super::replayer::TraceReplayer] which loads all events
291/// into memory, `StreamingReplayer` reads events one at a time from disk. This
292/// enables replay of traces with millions of events that wouldn't fit in memory.
293///
294/// # Memory Usage
295///
296/// - File reader buffer: ~64 KB
297/// - Current event: ~64 bytes
298/// - Peeked event: ~64 bytes (optional)
299/// - Total: O(1) regardless of trace size
300///
301/// # Example
302///
303/// ```ignore
304/// let mut replayer = StreamingReplayer::open("trace.bin")?;
305///
306/// while let Some(event) = replayer.next_event()? {
307///     process_event(&event);
308/// }
309/// ```
310pub struct StreamingReplayer {
311    /// The underlying file reader.
312    reader: TraceReader,
313
314    /// Cached metadata.
315    metadata: TraceMetadata,
316
317    /// Total number of events (from file header).
318    total_events: u64,
319
320    /// Number of events that have been consumed.
321    events_consumed: u64,
322
323    /// Peeked event (if any).
324    peeked: Option<ReplayEvent>,
325
326    /// Current replay mode.
327    mode: ReplayMode,
328
329    /// Whether we're at a breakpoint.
330    at_breakpoint: bool,
331    /// Last error observed via the [`EventSource`] adapter path.
332    ///
333    /// This preserves diagnosability for consumers that use the fallible-free
334    /// trait surface.
335    event_source_error: Option<StreamingReplayError>,
336}
337
338impl StreamingReplayer {
339    /// Opens a trace file for streaming replay.
340    ///
341    /// # Errors
342    ///
343    /// Returns an error if the file cannot be opened or has an invalid format.
344    pub fn open(path: impl AsRef<Path>) -> StreamingReplayResult<Self> {
345        let reader = TraceReader::open(path)?;
346        let metadata = reader.metadata().clone();
347        let total_events = reader.event_count();
348
349        Ok(Self {
350            reader,
351            metadata,
352            total_events,
353            events_consumed: 0,
354            peeked: None,
355            mode: ReplayMode::Run,
356            at_breakpoint: false,
357            event_source_error: None,
358        })
359    }
360
361    /// Resumes replay from a checkpoint.
362    ///
363    /// This skips forward to the checkpoint position without processing
364    /// intermediate events.
365    ///
366    /// # Errors
367    ///
368    /// Returns an error if:
369    /// - The file cannot be opened
370    /// - The checkpoint is invalid
371    /// - The checkpoint doesn't match the trace file
372    pub fn resume(
373        path: impl AsRef<Path>,
374        checkpoint: ReplayCheckpoint,
375    ) -> StreamingReplayResult<Self> {
376        let mut reader = TraceReader::open(path)?;
377        let metadata = reader.metadata().clone();
378        let total_events = reader.event_count();
379
380        // Validate checkpoint matches this trace
381        checkpoint.validate(&metadata, total_events)?;
382
383        // Skip to checkpoint position
384        for _ in 0..checkpoint.events_processed {
385            if reader.read_event()?.is_none() {
386                return Err(StreamingReplayError::CheckpointMismatch(
387                    "trace ended before checkpoint position".to_string(),
388                ));
389            }
390        }
391
392        Ok(Self {
393            reader,
394            metadata,
395            total_events,
396            events_consumed: checkpoint.events_processed,
397            peeked: None,
398            mode: ReplayMode::Run,
399            at_breakpoint: false,
400            event_source_error: None,
401        })
402    }
403
404    /// Returns the trace metadata.
405    #[must_use]
406    pub fn metadata(&self) -> &TraceMetadata {
407        &self.metadata
408    }
409
410    /// Returns the total number of events in the trace.
411    #[must_use]
412    pub fn total_events(&self) -> u64 {
413        self.total_events
414    }
415
416    /// Returns the number of events consumed so far.
417    #[must_use]
418    pub fn events_consumed(&self) -> u64 {
419        self.events_consumed
420    }
421
422    /// Returns the current replay progress.
423    #[must_use]
424    pub fn progress(&self) -> ReplayProgress {
425        ReplayProgress::new(self.events_consumed, self.total_events)
426    }
427
428    /// Returns true if all events have been consumed.
429    #[must_use]
430    pub fn is_complete(&self) -> bool {
431        self.events_consumed >= self.total_events && self.peeked.is_none()
432    }
433
434    /// Returns true if we're at a breakpoint.
435    #[must_use]
436    pub fn at_breakpoint(&self) -> bool {
437        self.at_breakpoint
438    }
439
440    /// Returns the most recent [`EventSource`] adapter error, if any.
441    #[must_use]
442    pub fn last_event_source_error(&self) -> Option<&StreamingReplayError> {
443        self.event_source_error.as_ref()
444    }
445
446    /// Takes and clears the most recent [`EventSource`] adapter error.
447    pub fn take_event_source_error(&mut self) -> Option<StreamingReplayError> {
448        self.event_source_error.take()
449    }
450
451    /// Sets the replay mode.
452    pub fn set_mode(&mut self, mode: ReplayMode) {
453        self.mode = mode;
454        self.at_breakpoint = false;
455    }
456
457    /// Returns the current replay mode.
458    #[must_use]
459    pub fn mode(&self) -> &ReplayMode {
460        &self.mode
461    }
462
463    /// Peeks at the next event without consuming it.
464    ///
465    /// # Errors
466    ///
467    /// Returns an error if reading fails.
468    pub fn peek(&mut self) -> StreamingReplayResult<Option<&ReplayEvent>> {
469        if self.peeked.is_none() {
470            self.peeked = self.reader.read_event()?;
471        }
472        Ok(self.peeked.as_ref())
473    }
474
475    /// Reads and consumes the next event.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if reading fails.
480    pub fn next_event(&mut self) -> StreamingReplayResult<Option<ReplayEvent>> {
481        let event = if let Some(peeked) = self.peeked.take() {
482            Some(peeked)
483        } else {
484            self.reader.read_event()?
485        };
486
487        if event.is_some() {
488            self.events_consumed += 1;
489
490            // Check for breakpoint
491            if let Some(ref e) = event {
492                self.at_breakpoint = self.check_breakpoint(e);
493            }
494        }
495
496        Ok(event)
497    }
498
499    /// Verifies that an actual event matches the next expected event.
500    ///
501    /// Does not consume the event - use `verify_and_advance` for that.
502    ///
503    /// # Errors
504    ///
505    /// Returns an error with divergence details if they don't match.
506    pub fn verify(&mut self, actual: &ReplayEvent) -> StreamingReplayResult<()> {
507        // Store position before borrowing self through peek()
508        let current_position = self.events_consumed;
509
510        let expected = self.peek()?;
511
512        let Some(expected) = expected else {
513            return Err(StreamingReplayError::Divergence(DivergenceError {
514                index: current_position as usize,
515                expected: None,
516                actual: actual.clone(),
517                context: "Trace ended but execution continued".to_string(),
518            }));
519        };
520
521        if expected != actual {
522            // Clone the expected event before the borrow ends
523            let expected_clone = expected.clone();
524            return Err(StreamingReplayError::Divergence(DivergenceError {
525                index: current_position as usize,
526                expected: Some(expected_clone),
527                actual: actual.clone(),
528                context: format!("Event mismatch at position {current_position}"),
529            }));
530        }
531
532        Ok(())
533    }
534
535    /// Verifies and consumes the next event.
536    ///
537    /// # Errors
538    ///
539    /// Returns an error if verification fails or reading fails.
540    pub fn verify_and_advance(
541        &mut self,
542        actual: &ReplayEvent,
543    ) -> StreamingReplayResult<ReplayEvent> {
544        self.verify(actual)?;
545        self.next_event()
546            .transpose()
547            .expect("event was peeked so must exist")
548    }
549
550    /// Creates a checkpoint at the current position.
551    ///
552    /// The checkpoint can be used later with [`resume`][Self::resume] to
553    /// continue replay from this point.
554    #[must_use]
555    pub fn checkpoint(&self) -> ReplayCheckpoint {
556        ReplayCheckpoint::new(self.events_consumed, self.total_events, &self.metadata)
557    }
558
559    /// Steps forward according to the current mode.
560    ///
561    /// In Step mode, advances one event and stops.
562    /// In Run mode, advances all events until completion.
563    /// In RunTo mode, advances until the breakpoint is reached.
564    ///
565    /// # Errors
566    ///
567    /// Returns an error if reading fails.
568    pub fn step(&mut self) -> StreamingReplayResult<Option<ReplayEvent>> {
569        self.at_breakpoint = false;
570        self.next_event()
571    }
572
573    /// Runs until completion or breakpoint.
574    ///
575    /// Returns the number of events processed.
576    ///
577    /// # Errors
578    ///
579    /// Returns an error if reading fails.
580    pub fn run(&mut self) -> StreamingReplayResult<u64> {
581        let mut count = 0u64;
582
583        while !self.is_complete() && !self.at_breakpoint {
584            if self.next_event()?.is_some() {
585                count += 1;
586            }
587        }
588
589        Ok(count)
590    }
591
592    /// Runs with a callback for each event.
593    ///
594    /// This is useful for processing events as they're read without
595    /// accumulating them in memory.
596    ///
597    /// # Errors
598    ///
599    /// Returns an error if reading fails or the callback returns an error.
600    pub fn run_with<F, E>(&mut self, mut callback: F) -> Result<u64, E>
601    where
602        F: FnMut(ReplayEvent, ReplayProgress) -> Result<(), E>,
603        E: From<StreamingReplayError>,
604    {
605        let mut count = 0u64;
606
607        while !self.is_complete() && !self.at_breakpoint {
608            if let Some(event) = self.next_event()? {
609                let progress = self.progress();
610                callback(event, progress)?;
611                count += 1;
612            }
613        }
614
615        Ok(count)
616    }
617
618    /// Checks if the current event triggers a breakpoint.
619    fn check_breakpoint(&self, event: &ReplayEvent) -> bool {
620        match &self.mode {
621            ReplayMode::Step => true,
622            ReplayMode::Run => false,
623            ReplayMode::RunTo(breakpoint) => match breakpoint {
624                Breakpoint::EventIndex(idx) => self.events_consumed as usize == *idx + 1,
625                Breakpoint::Tick(tick) => {
626                    if let ReplayEvent::TaskScheduled { at_tick, .. } = event {
627                        *at_tick >= *tick
628                    } else {
629                        false
630                    }
631                }
632                Breakpoint::Task(task_id) => {
633                    if let ReplayEvent::TaskScheduled { task, .. } = event {
634                        task == task_id
635                    } else {
636                        false
637                    }
638                }
639            },
640        }
641    }
642}
643
644impl EventSource for StreamingReplayer {
645    fn next_event(&mut self) -> Option<ReplayEvent> {
646        match Self::next_event(self) {
647            Ok(event) => {
648                self.event_source_error = None;
649                event
650            }
651            Err(err) => {
652                self.event_source_error = Some(err);
653                None
654            }
655        }
656    }
657
658    fn metadata(&self) -> &TraceMetadata {
659        &self.metadata
660    }
661}
662
663// =============================================================================
664// Tests
665// =============================================================================
666
667#[cfg(test)]
668mod tests {
669    use super::*;
670    use crate::trace::file::{HEADER_SIZE, TraceWriter, write_trace};
671    use crate::trace::replay::CompactTaskId;
672    use std::fs::OpenOptions;
673    use std::io::{Seek, SeekFrom, Write};
674    use tempfile::NamedTempFile;
675
676    fn sample_events(count: u64) -> Vec<ReplayEvent> {
677        (0..count)
678            .map(|i| ReplayEvent::TaskScheduled {
679                task: CompactTaskId(i),
680                at_tick: i,
681            })
682            .collect()
683    }
684
685    #[test]
686    fn basic_streaming_replay() {
687        let temp = NamedTempFile::new().unwrap();
688        let path = temp.path();
689
690        // Write a trace
691        let metadata = TraceMetadata::new(42);
692        let events = sample_events(100);
693        write_trace(path, &metadata, &events).unwrap();
694
695        // Stream replay
696        let mut replayer = StreamingReplayer::open(path).unwrap();
697
698        assert_eq!(replayer.total_events(), 100);
699        assert_eq!(replayer.events_consumed(), 0);
700        assert!(!replayer.is_complete());
701
702        // Read all events
703        let mut count = 0u64;
704        while let Some(event) = replayer.next_event().unwrap() {
705            if let ReplayEvent::TaskScheduled { task, at_tick } = event {
706                assert_eq!(task.0, count);
707                assert_eq!(at_tick, count);
708            } else {
709                panic!("unexpected event type");
710            }
711            count += 1;
712        }
713
714        assert_eq!(count, 100);
715        assert!(replayer.is_complete());
716    }
717
718    #[test]
719    fn progress_tracking() {
720        let temp = NamedTempFile::new().unwrap();
721        let path = temp.path();
722
723        let metadata = TraceMetadata::new(42);
724        let events = sample_events(100);
725        write_trace(path, &metadata, &events).unwrap();
726
727        let mut replayer = StreamingReplayer::open(path).unwrap();
728
729        // Check initial progress
730        let progress = replayer.progress();
731        assert_eq!(progress.events_processed, 0);
732        assert_eq!(progress.total_events, 100);
733        assert!((progress.percent() - 0.0).abs() < 0.01);
734
735        // Read 50 events
736        for _ in 0..50 {
737            replayer.next_event().unwrap();
738        }
739
740        // Check midpoint progress
741        let progress = replayer.progress();
742        assert_eq!(progress.events_processed, 50);
743        assert!((progress.percent() - 50.0).abs() < 0.01);
744        assert_eq!(progress.remaining(), 50);
745
746        // Read rest
747        while replayer.next_event().unwrap().is_some() {}
748
749        // Check final progress
750        let progress = replayer.progress();
751        assert!(progress.is_complete());
752        assert!((progress.percent() - 100.0).abs() < 0.01);
753    }
754
755    #[test]
756    fn peek_without_consuming() {
757        let temp = NamedTempFile::new().unwrap();
758        let path = temp.path();
759
760        let metadata = TraceMetadata::new(42);
761        let events = sample_events(10);
762        write_trace(path, &metadata, &events).unwrap();
763
764        let mut replayer = StreamingReplayer::open(path).unwrap();
765
766        // Peek multiple times - should return same event
767        let peeked1 = replayer.peek().unwrap().cloned();
768        let peeked2 = replayer.peek().unwrap().cloned();
769        assert_eq!(peeked1, peeked2);
770        assert_eq!(replayer.events_consumed(), 0);
771
772        // Now consume
773        let consumed = replayer.next_event().unwrap();
774        assert_eq!(consumed, peeked1);
775        assert_eq!(replayer.events_consumed(), 1);
776
777        // Next peek should be different
778        let peeked3 = replayer.peek().unwrap().cloned();
779        assert_ne!(peeked3, peeked1);
780    }
781
782    #[test]
783    fn checkpoint_and_resume() {
784        let temp = NamedTempFile::new().unwrap();
785        let path = temp.path();
786
787        let metadata = TraceMetadata::new(42);
788        let events = sample_events(100);
789        write_trace(path, &metadata, &events).unwrap();
790
791        // Replay partway and checkpoint
792        let mut replayer = StreamingReplayer::open(path).unwrap();
793        for _ in 0..50 {
794            replayer.next_event().unwrap();
795        }
796
797        let checkpoint = replayer.checkpoint();
798        assert_eq!(checkpoint.events_processed, 50);
799        assert_eq!(checkpoint.total_events, 100);
800
801        // Serialize and deserialize checkpoint
802        let checkpoint_bytes = checkpoint.to_bytes().unwrap();
803        let restored_checkpoint = ReplayCheckpoint::from_bytes(&checkpoint_bytes).unwrap();
804
805        // Resume from checkpoint
806        let mut resumed = StreamingReplayer::resume(path, restored_checkpoint).unwrap();
807        assert_eq!(resumed.events_consumed(), 50);
808
809        // Continue reading
810        let mut count = 50u64;
811        while let Some(event) = resumed.next_event().unwrap() {
812            if let ReplayEvent::TaskScheduled { task, .. } = event {
813                assert_eq!(task.0, count);
814            }
815            count += 1;
816        }
817
818        assert_eq!(count, 100);
819    }
820
821    #[test]
822    fn checkpoint_validation() {
823        let temp1 = NamedTempFile::new().unwrap();
824        let temp2 = NamedTempFile::new().unwrap();
825
826        // Write two different traces
827        let metadata1 = TraceMetadata::new(42);
828        let metadata2 = TraceMetadata::new(99);
829        write_trace(temp1.path(), &metadata1, &sample_events(100)).unwrap();
830        write_trace(temp2.path(), &metadata2, &sample_events(100)).unwrap();
831
832        // Checkpoint from first trace
833        let mut replayer = StreamingReplayer::open(temp1.path()).unwrap();
834        for _ in 0..50 {
835            replayer.next_event().unwrap();
836        }
837        let checkpoint = replayer.checkpoint();
838
839        // Try to resume with second trace - should fail
840        let result = StreamingReplayer::resume(temp2.path(), checkpoint);
841        assert!(matches!(
842            result,
843            Err(StreamingReplayError::CheckpointMismatch(_))
844        ));
845    }
846
847    #[test]
848    fn checkpoint_validation_rejects_same_seed_metadata_drift() {
849        let temp1 = NamedTempFile::new().unwrap();
850        let temp2 = NamedTempFile::new().unwrap();
851
852        let metadata1 = TraceMetadata {
853            version: super::super::replay::REPLAY_SCHEMA_VERSION,
854            seed: 42,
855            recorded_at: 100,
856            config_hash: 0xCAFE,
857            description: Some("trace-a".into()),
858        };
859        let metadata2 = TraceMetadata {
860            version: super::super::replay::REPLAY_SCHEMA_VERSION,
861            seed: 42,
862            recorded_at: 200,
863            config_hash: 0xCAFE,
864            description: Some("trace-b".into()),
865        };
866        write_trace(temp1.path(), &metadata1, &sample_events(4)).unwrap();
867        write_trace(temp2.path(), &metadata2, &sample_events(4)).unwrap();
868
869        let mut replayer = StreamingReplayer::open(temp1.path()).unwrap();
870        for _ in 0..2 {
871            replayer.next_event().unwrap();
872        }
873        let checkpoint = replayer.checkpoint();
874
875        let result = StreamingReplayer::resume(temp2.path(), checkpoint);
876        assert!(matches!(
877            result,
878            Err(StreamingReplayError::CheckpointMismatch(_))
879        ));
880    }
881
882    #[test]
883    fn checkpoint_validation_rejects_event_count_drift() {
884        let temp1 = NamedTempFile::new().unwrap();
885        let temp2 = NamedTempFile::new().unwrap();
886
887        let metadata = TraceMetadata {
888            version: super::super::replay::REPLAY_SCHEMA_VERSION,
889            seed: 7,
890            recorded_at: 500,
891            config_hash: 0xBEEF,
892            description: Some("same-metadata".into()),
893        };
894        write_trace(temp1.path(), &metadata, &sample_events(4)).unwrap();
895        write_trace(temp2.path(), &metadata, &sample_events(6)).unwrap();
896
897        let mut replayer = StreamingReplayer::open(temp1.path()).unwrap();
898        for _ in 0..2 {
899            replayer.next_event().unwrap();
900        }
901        let checkpoint = replayer.checkpoint();
902
903        let result = StreamingReplayer::resume(temp2.path(), checkpoint);
904        assert!(matches!(
905            result,
906            Err(StreamingReplayError::CheckpointMismatch(_))
907        ));
908    }
909
910    #[test]
911    fn checkpoint_bytes_are_stable_for_same_position() {
912        let temp = NamedTempFile::new().unwrap();
913        let path = temp.path();
914
915        let metadata = TraceMetadata {
916            version: super::super::replay::REPLAY_SCHEMA_VERSION,
917            seed: 42,
918            recorded_at: 1_000,
919            config_hash: 0xCAFE,
920            description: Some("stable checkpoint".into()),
921        };
922        write_trace(path, &metadata, &sample_events(5)).unwrap();
923
924        let mut replayer = StreamingReplayer::open(path).unwrap();
925        for _ in 0..3 {
926            replayer.next_event().unwrap();
927        }
928
929        let checkpoint_a = replayer.checkpoint();
930        let checkpoint_b = replayer.checkpoint();
931
932        assert_eq!(checkpoint_a.events_processed, 3);
933        assert_eq!(checkpoint_a.total_events, 5);
934        assert_eq!(checkpoint_a.created_at, 1_003);
935        assert_eq!(checkpoint_a.created_at, checkpoint_b.created_at);
936        assert_eq!(
937            checkpoint_a.to_bytes().unwrap(),
938            checkpoint_b.to_bytes().unwrap()
939        );
940    }
941
942    #[test]
943    fn checkpoint_created_at_advances_with_position() {
944        let temp = NamedTempFile::new().unwrap();
945        let path = temp.path();
946
947        let metadata = TraceMetadata {
948            version: super::super::replay::REPLAY_SCHEMA_VERSION,
949            seed: 7,
950            recorded_at: 500,
951            config_hash: 0xBEEF,
952            description: None,
953        };
954        write_trace(path, &metadata, &sample_events(4)).unwrap();
955
956        let mut replayer = StreamingReplayer::open(path).unwrap();
957
958        let first = replayer.checkpoint();
959        assert_eq!(first.created_at, 500);
960
961        replayer.next_event().unwrap();
962        let second = replayer.checkpoint();
963        assert_eq!(second.created_at, 501);
964        assert_eq!(second.created_at, first.created_at + 1);
965
966        replayer.next_event().unwrap();
967        let third = replayer.checkpoint();
968        assert_eq!(third.created_at, 502);
969    }
970
971    #[test]
972    fn run_with_callback() {
973        let temp = NamedTempFile::new().unwrap();
974        let path = temp.path();
975
976        let metadata = TraceMetadata::new(42);
977        let events = sample_events(50);
978        write_trace(path, &metadata, &events).unwrap();
979
980        let mut replayer = StreamingReplayer::open(path).unwrap();
981
982        let mut event_ids = Vec::new();
983        let count = replayer
984            .run_with(|event, progress| {
985                if let ReplayEvent::TaskScheduled { task, .. } = event {
986                    event_ids.push(task.0);
987                }
988                // Check progress is accurate
989                assert!(!progress.is_complete() || progress.events_processed == 50);
990                Ok::<_, StreamingReplayError>(())
991            })
992            .unwrap();
993
994        assert_eq!(count, 50);
995        assert_eq!(event_ids.len(), 50);
996        for (i, id) in event_ids.iter().enumerate() {
997            assert_eq!(*id, i as u64);
998        }
999    }
1000
1001    #[test]
1002    fn large_trace_streaming() {
1003        let temp = NamedTempFile::new().unwrap();
1004        let path = temp.path();
1005
1006        let metadata = TraceMetadata::new(42);
1007        let event_count = 10_000u64;
1008
1009        // Write large trace using streaming writer
1010        {
1011            let mut writer = TraceWriter::create(path).unwrap();
1012            writer.write_metadata(&metadata).unwrap();
1013            for i in 0..event_count {
1014                writer
1015                    .write_event(&ReplayEvent::TaskScheduled {
1016                        task: CompactTaskId(i),
1017                        at_tick: i,
1018                    })
1019                    .unwrap();
1020            }
1021            writer.finish().unwrap();
1022        }
1023
1024        // Stream replay - should use constant memory
1025        let mut replayer = StreamingReplayer::open(path).unwrap();
1026        assert_eq!(replayer.total_events(), event_count);
1027
1028        let mut count = 0u64;
1029        while replayer.next_event().unwrap().is_some() {
1030            count += 1;
1031        }
1032
1033        assert_eq!(count, event_count);
1034    }
1035
1036    #[test]
1037    fn step_mode_streaming() {
1038        let temp = NamedTempFile::new().unwrap();
1039        let path = temp.path();
1040
1041        let metadata = TraceMetadata::new(42);
1042        let events = sample_events(5);
1043        write_trace(path, &metadata, &events).unwrap();
1044
1045        let mut replayer = StreamingReplayer::open(path).unwrap();
1046        replayer.set_mode(ReplayMode::Step);
1047
1048        // Each step should set breakpoint
1049        for _ in 0..5 {
1050            replayer.step().unwrap();
1051            assert!(replayer.at_breakpoint());
1052        }
1053
1054        // Final step returns None
1055        let event = replayer.step().unwrap();
1056        assert!(event.is_none());
1057    }
1058
1059    #[test]
1060    fn breakpoint_at_tick() {
1061        let temp = NamedTempFile::new().unwrap();
1062        let path = temp.path();
1063
1064        let metadata = TraceMetadata::new(42);
1065        let events: Vec<_> = (0..10)
1066            .map(|i| ReplayEvent::TaskScheduled {
1067                task: CompactTaskId(i),
1068                at_tick: i * 10, // Ticks: 0, 10, 20, 30, ...
1069            })
1070            .collect();
1071        write_trace(path, &metadata, &events).unwrap();
1072
1073        let mut replayer = StreamingReplayer::open(path).unwrap();
1074        replayer.set_mode(ReplayMode::RunTo(Breakpoint::Tick(50)));
1075
1076        let count = replayer.run().unwrap();
1077        // Should stop at tick >= 50 (which is at_tick=50, event index 5)
1078        assert!(replayer.at_breakpoint());
1079        assert_eq!(count, 6); // Events 0-5 (ticks 0, 10, 20, 30, 40, 50)
1080    }
1081
1082    #[test]
1083    fn empty_trace() {
1084        let temp = NamedTempFile::new().unwrap();
1085        let path = temp.path();
1086
1087        let metadata = TraceMetadata::new(42);
1088        write_trace(path, &metadata, &[]).unwrap();
1089
1090        let mut replayer = StreamingReplayer::open(path).unwrap();
1091        assert_eq!(replayer.total_events(), 0);
1092        assert!(replayer.progress().is_complete());
1093
1094        let event = replayer.next_event().unwrap();
1095        assert!(event.is_none());
1096    }
1097
1098    #[test]
1099    fn verify_past_end_of_trace_reports_trace_exhausted() {
1100        let temp = NamedTempFile::new().unwrap();
1101        let path = temp.path();
1102
1103        let metadata = TraceMetadata::new(42);
1104        let events = vec![ReplayEvent::RngSeed { seed: 42 }];
1105        write_trace(path, &metadata, &events).unwrap();
1106
1107        let mut replayer = StreamingReplayer::open(path).unwrap();
1108        assert!(replayer.next_event().unwrap().is_some());
1109        assert!(replayer.is_complete());
1110
1111        let actual = ReplayEvent::RngSeed { seed: 99 };
1112        let err = replayer.verify(&actual).unwrap_err();
1113        match err {
1114            StreamingReplayError::Divergence(divergence) => {
1115                assert!(divergence.expected.is_none());
1116                assert_eq!(divergence.index, 1);
1117                assert!(divergence.context.contains("Trace ended"));
1118                assert!(format!("{divergence}").contains("<trace_exhausted>"));
1119            }
1120            other => panic!("expected divergence error, got {other:?}"),
1121        }
1122    }
1123
1124    #[test]
1125    fn verify_mismatch_preserves_expected_event() {
1126        let temp = NamedTempFile::new().unwrap();
1127        let path = temp.path();
1128
1129        let metadata = TraceMetadata::new(42);
1130        let events = vec![ReplayEvent::TaskScheduled {
1131            task: CompactTaskId(1),
1132            at_tick: 10,
1133        }];
1134        write_trace(path, &metadata, &events).unwrap();
1135
1136        let mut replayer = StreamingReplayer::open(path).unwrap();
1137        let actual = ReplayEvent::TaskScheduled {
1138            task: CompactTaskId(2),
1139            at_tick: 10,
1140        };
1141        let err = replayer.verify(&actual).unwrap_err();
1142        match err {
1143            StreamingReplayError::Divergence(divergence) => {
1144                assert_eq!(
1145                    divergence.expected,
1146                    Some(ReplayEvent::TaskScheduled {
1147                        task: CompactTaskId(1),
1148                        at_tick: 10,
1149                    })
1150                );
1151                assert_eq!(divergence.actual, actual);
1152                assert_eq!(divergence.index, 0);
1153            }
1154            other => panic!("expected divergence error, got {other:?}"),
1155        }
1156    }
1157
1158    #[test]
1159    fn progress_display() {
1160        let progress = ReplayProgress::new(250, 1000);
1161        let display = format!("{progress}");
1162        assert!(display.contains("250/1000"));
1163        assert!(display.contains("25.0%"));
1164    }
1165
1166    #[test]
1167    fn run_with_respects_runto_breakpoint() {
1168        let temp = NamedTempFile::new().unwrap();
1169        let path = temp.path();
1170
1171        let metadata = TraceMetadata::new(42);
1172        let events: Vec<_> = (0..10)
1173            .map(|i| ReplayEvent::TaskScheduled {
1174                task: CompactTaskId(i),
1175                at_tick: i * 10,
1176            })
1177            .collect();
1178        write_trace(path, &metadata, &events).unwrap();
1179
1180        let mut replayer = StreamingReplayer::open(path).unwrap();
1181        replayer.set_mode(ReplayMode::RunTo(Breakpoint::Tick(50)));
1182
1183        let count = replayer
1184            .run_with(|_, _| Ok::<_, StreamingReplayError>(()))
1185            .unwrap();
1186        assert_eq!(count, 6);
1187        assert!(replayer.at_breakpoint());
1188    }
1189
1190    #[test]
1191    fn run_with_respects_step_mode() {
1192        let temp = NamedTempFile::new().unwrap();
1193        let path = temp.path();
1194
1195        let metadata = TraceMetadata::new(7);
1196        let events = sample_events(5);
1197        write_trace(path, &metadata, &events).unwrap();
1198
1199        let mut replayer = StreamingReplayer::open(path).unwrap();
1200        replayer.set_mode(ReplayMode::Step);
1201
1202        let count = replayer
1203            .run_with(|_, _| Ok::<_, StreamingReplayError>(()))
1204            .unwrap();
1205        assert_eq!(count, 1);
1206        assert!(replayer.at_breakpoint());
1207    }
1208
1209    #[test]
1210    fn event_source_adapter_captures_stream_error() {
1211        let temp = NamedTempFile::new().unwrap();
1212        let path = temp.path();
1213
1214        let metadata = TraceMetadata::new(42);
1215        let events = vec![ReplayEvent::RngSeed { seed: 42 }];
1216        write_trace(path, &metadata, &events).unwrap();
1217
1218        // Corrupt the first event payload byte while preserving file structure.
1219        let meta_len = rmp_serde::to_vec(&metadata).unwrap().len() as u64;
1220        let first_event_payload = HEADER_SIZE as u64 + meta_len + 8 + 4;
1221        let mut file = OpenOptions::new().write(true).open(path).unwrap();
1222        file.seek(SeekFrom::Start(first_event_payload)).unwrap();
1223        file.write_all(&[0xC1]).unwrap(); // MessagePack never-used marker => decode error.
1224        file.flush().unwrap();
1225
1226        let mut replayer = StreamingReplayer::open(path).unwrap();
1227        let event = <StreamingReplayer as EventSource>::next_event(&mut replayer);
1228        assert!(event.is_none());
1229
1230        let err = replayer
1231            .take_event_source_error()
1232            .expect("expected captured event-source error");
1233        assert!(matches!(err, StreamingReplayError::File(_)));
1234        assert!(replayer.last_event_source_error().is_none());
1235    }
1236}