use super::file::{TraceFileError, TraceReader};
use super::replay::{ReplayEvent, TraceMetadata};
use super::replayer::{Breakpoint, DivergenceError, EventSource, ReplayMode};
use serde::{Deserialize, Serialize};
use std::io;
use std::path::Path;
#[derive(Debug, thiserror::Error)]
pub enum StreamingReplayError {
#[error("file error: {0}")]
File(#[from] TraceFileError),
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("invalid checkpoint: {0}")]
InvalidCheckpoint(String),
#[error("checkpoint mismatch: {0}")]
CheckpointMismatch(String),
#[error("{0}")]
Divergence(#[from] DivergenceError),
#[error("serialization error: {0}")]
Serialize(String),
}
pub type StreamingReplayResult<T> = Result<T, StreamingReplayError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReplayProgress {
pub events_processed: u64,
pub total_events: u64,
}
impl ReplayProgress {
#[must_use]
pub const fn new(events_processed: u64, total_events: u64) -> Self {
Self {
events_processed,
total_events,
}
}
#[must_use]
#[allow(clippy::cast_precision_loss)] pub fn percent(&self) -> f64 {
if self.total_events == 0 {
100.0
} else {
(self.events_processed as f64 / self.total_events as f64) * 100.0
}
}
#[must_use]
#[allow(clippy::cast_precision_loss)] pub fn fraction(&self) -> f64 {
if self.total_events == 0 {
1.0
} else {
self.events_processed as f64 / self.total_events as f64
}
}
#[must_use]
pub fn is_complete(&self) -> bool {
self.events_processed >= self.total_events
}
#[must_use]
pub fn remaining(&self) -> u64 {
self.total_events.saturating_sub(self.events_processed)
}
}
impl std::fmt::Display for ReplayProgress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}/{} ({:.1}%)",
self.events_processed,
self.total_events,
self.percent()
)
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct ReplayCheckpoint {
pub events_processed: u64,
pub total_events: u64,
pub seed: u64,
pub metadata_hash: u64,
pub created_at: u64,
}
impl ReplayCheckpoint {
fn new(events_processed: u64, total_events: u64, metadata: &TraceMetadata) -> Self {
Self {
events_processed,
total_events,
seed: metadata.seed,
metadata_hash: Self::hash_metadata(metadata),
created_at: metadata.recorded_at.saturating_add(events_processed),
}
}
fn validate(&self, metadata: &TraceMetadata, total_events: u64) -> StreamingReplayResult<()> {
if self.seed != metadata.seed {
return Err(StreamingReplayError::CheckpointMismatch(format!(
"seed mismatch: checkpoint has {}, trace has {}",
self.seed, metadata.seed
)));
}
let expected_hash = Self::hash_metadata(metadata);
if self.metadata_hash != expected_hash {
return Err(StreamingReplayError::CheckpointMismatch(
"metadata hash mismatch".to_string(),
));
}
if self.total_events != total_events {
return Err(StreamingReplayError::CheckpointMismatch(format!(
"event count mismatch: checkpoint has {}, trace has {}",
self.total_events, total_events
)));
}
if self.events_processed > total_events {
return Err(StreamingReplayError::CheckpointMismatch(format!(
"checkpoint position {} exceeds trace length {}",
self.events_processed, total_events
)));
}
Ok(())
}
fn hash_metadata(metadata: &TraceMetadata) -> u64 {
use std::hash::{Hash, Hasher};
struct SimpleHasher(u64);
impl Hasher for SimpleHasher {
fn finish(&self) -> u64 {
self.0
}
fn write(&mut self, bytes: &[u8]) {
for byte in bytes {
self.0 = self.0.wrapping_mul(31).wrapping_add(u64::from(*byte));
}
}
}
let mut hasher = SimpleHasher(0);
metadata.seed.hash(&mut hasher);
metadata.version.hash(&mut hasher);
metadata.recorded_at.hash(&mut hasher);
metadata.config_hash.hash(&mut hasher);
metadata.description.hash(&mut hasher);
hasher.finish()
}
pub fn to_bytes(&self) -> StreamingReplayResult<Vec<u8>> {
rmp_serde::to_vec(self)
.map_err(|e: rmp_serde::encode::Error| StreamingReplayError::Serialize(e.to_string()))
}
pub fn from_bytes(bytes: &[u8]) -> StreamingReplayResult<Self> {
rmp_serde::from_slice(bytes).map_err(|e: rmp_serde::decode::Error| {
StreamingReplayError::InvalidCheckpoint(e.to_string())
})
}
}
pub struct StreamingReplayer {
reader: TraceReader,
metadata: TraceMetadata,
total_events: u64,
events_consumed: u64,
peeked: Option<ReplayEvent>,
mode: ReplayMode,
at_breakpoint: bool,
event_source_error: Option<StreamingReplayError>,
}
impl StreamingReplayer {
pub fn open(path: impl AsRef<Path>) -> StreamingReplayResult<Self> {
let reader = TraceReader::open(path)?;
let metadata = reader.metadata().clone();
let total_events = reader.event_count();
Ok(Self {
reader,
metadata,
total_events,
events_consumed: 0,
peeked: None,
mode: ReplayMode::Run,
at_breakpoint: false,
event_source_error: None,
})
}
pub fn resume(
path: impl AsRef<Path>,
checkpoint: ReplayCheckpoint,
) -> StreamingReplayResult<Self> {
let mut reader = TraceReader::open(path)?;
let metadata = reader.metadata().clone();
let total_events = reader.event_count();
checkpoint.validate(&metadata, total_events)?;
for _ in 0..checkpoint.events_processed {
if reader.read_event()?.is_none() {
return Err(StreamingReplayError::CheckpointMismatch(
"trace ended before checkpoint position".to_string(),
));
}
}
Ok(Self {
reader,
metadata,
total_events,
events_consumed: checkpoint.events_processed,
peeked: None,
mode: ReplayMode::Run,
at_breakpoint: false,
event_source_error: None,
})
}
#[must_use]
pub fn metadata(&self) -> &TraceMetadata {
&self.metadata
}
#[must_use]
pub fn total_events(&self) -> u64 {
self.total_events
}
#[must_use]
pub fn events_consumed(&self) -> u64 {
self.events_consumed
}
#[must_use]
pub fn progress(&self) -> ReplayProgress {
ReplayProgress::new(self.events_consumed, self.total_events)
}
#[must_use]
pub fn is_complete(&self) -> bool {
self.events_consumed >= self.total_events && self.peeked.is_none()
}
#[must_use]
pub fn at_breakpoint(&self) -> bool {
self.at_breakpoint
}
#[must_use]
pub fn last_event_source_error(&self) -> Option<&StreamingReplayError> {
self.event_source_error.as_ref()
}
pub fn take_event_source_error(&mut self) -> Option<StreamingReplayError> {
self.event_source_error.take()
}
pub fn set_mode(&mut self, mode: ReplayMode) {
self.mode = mode;
self.at_breakpoint = false;
}
#[must_use]
pub fn mode(&self) -> &ReplayMode {
&self.mode
}
pub fn peek(&mut self) -> StreamingReplayResult<Option<&ReplayEvent>> {
if self.peeked.is_none() {
self.peeked = self.reader.read_event()?;
}
Ok(self.peeked.as_ref())
}
pub fn next_event(&mut self) -> StreamingReplayResult<Option<ReplayEvent>> {
let event = if let Some(peeked) = self.peeked.take() {
Some(peeked)
} else {
self.reader.read_event()?
};
if event.is_some() {
self.events_consumed += 1;
if let Some(ref e) = event {
self.at_breakpoint = self.check_breakpoint(e);
}
}
Ok(event)
}
pub fn verify(&mut self, actual: &ReplayEvent) -> StreamingReplayResult<()> {
let current_position = self.events_consumed;
let expected = self.peek()?;
let Some(expected) = expected else {
return Err(StreamingReplayError::Divergence(DivergenceError {
index: current_position as usize,
expected: None,
actual: actual.clone(),
context: "Trace ended but execution continued".to_string(),
}));
};
if expected != actual {
let expected_clone = expected.clone();
return Err(StreamingReplayError::Divergence(DivergenceError {
index: current_position as usize,
expected: Some(expected_clone),
actual: actual.clone(),
context: format!("Event mismatch at position {current_position}"),
}));
}
Ok(())
}
pub fn verify_and_advance(
&mut self,
actual: &ReplayEvent,
) -> StreamingReplayResult<ReplayEvent> {
self.verify(actual)?;
self.next_event()
.transpose()
.expect("event was peeked so must exist")
}
#[must_use]
pub fn checkpoint(&self) -> ReplayCheckpoint {
ReplayCheckpoint::new(self.events_consumed, self.total_events, &self.metadata)
}
pub fn step(&mut self) -> StreamingReplayResult<Option<ReplayEvent>> {
self.at_breakpoint = false;
self.next_event()
}
pub fn run(&mut self) -> StreamingReplayResult<u64> {
let mut count = 0u64;
while !self.is_complete() && !self.at_breakpoint {
if self.next_event()?.is_some() {
count += 1;
}
}
Ok(count)
}
pub fn run_with<F, E>(&mut self, mut callback: F) -> Result<u64, E>
where
F: FnMut(ReplayEvent, ReplayProgress) -> Result<(), E>,
E: From<StreamingReplayError>,
{
let mut count = 0u64;
while !self.is_complete() && !self.at_breakpoint {
if let Some(event) = self.next_event()? {
let progress = self.progress();
callback(event, progress)?;
count += 1;
}
}
Ok(count)
}
fn check_breakpoint(&self, event: &ReplayEvent) -> bool {
match &self.mode {
ReplayMode::Step => true,
ReplayMode::Run => false,
ReplayMode::RunTo(breakpoint) => match breakpoint {
Breakpoint::EventIndex(idx) => self.events_consumed as usize == *idx + 1,
Breakpoint::Tick(tick) => {
if let ReplayEvent::TaskScheduled { at_tick, .. } = event {
*at_tick >= *tick
} else {
false
}
}
Breakpoint::Task(task_id) => {
if let ReplayEvent::TaskScheduled { task, .. } = event {
task == task_id
} else {
false
}
}
},
}
}
}
impl EventSource for StreamingReplayer {
fn next_event(&mut self) -> Option<ReplayEvent> {
match Self::next_event(self) {
Ok(event) => {
self.event_source_error = None;
event
}
Err(err) => {
self.event_source_error = Some(err);
None
}
}
}
fn metadata(&self) -> &TraceMetadata {
&self.metadata
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::trace::file::{HEADER_SIZE, TraceWriter, write_trace};
use crate::trace::replay::CompactTaskId;
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom, Write};
use tempfile::NamedTempFile;
fn sample_events(count: u64) -> Vec<ReplayEvent> {
(0..count)
.map(|i| ReplayEvent::TaskScheduled {
task: CompactTaskId(i),
at_tick: i,
})
.collect()
}
#[test]
fn basic_streaming_replay() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = sample_events(100);
write_trace(path, &metadata, &events).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
assert_eq!(replayer.total_events(), 100);
assert_eq!(replayer.events_consumed(), 0);
assert!(!replayer.is_complete());
let mut count = 0u64;
while let Some(event) = replayer.next_event().unwrap() {
if let ReplayEvent::TaskScheduled { task, at_tick } = event {
assert_eq!(task.0, count);
assert_eq!(at_tick, count);
} else {
panic!("unexpected event type");
}
count += 1;
}
assert_eq!(count, 100);
assert!(replayer.is_complete());
}
#[test]
fn progress_tracking() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = sample_events(100);
write_trace(path, &metadata, &events).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
let progress = replayer.progress();
assert_eq!(progress.events_processed, 0);
assert_eq!(progress.total_events, 100);
assert!((progress.percent() - 0.0).abs() < 0.01);
for _ in 0..50 {
replayer.next_event().unwrap();
}
let progress = replayer.progress();
assert_eq!(progress.events_processed, 50);
assert!((progress.percent() - 50.0).abs() < 0.01);
assert_eq!(progress.remaining(), 50);
while replayer.next_event().unwrap().is_some() {}
let progress = replayer.progress();
assert!(progress.is_complete());
assert!((progress.percent() - 100.0).abs() < 0.01);
}
#[test]
fn peek_without_consuming() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = sample_events(10);
write_trace(path, &metadata, &events).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
let peeked1 = replayer.peek().unwrap().cloned();
let peeked2 = replayer.peek().unwrap().cloned();
assert_eq!(peeked1, peeked2);
assert_eq!(replayer.events_consumed(), 0);
let consumed = replayer.next_event().unwrap();
assert_eq!(consumed, peeked1);
assert_eq!(replayer.events_consumed(), 1);
let peeked3 = replayer.peek().unwrap().cloned();
assert_ne!(peeked3, peeked1);
}
#[test]
fn checkpoint_and_resume() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = sample_events(100);
write_trace(path, &metadata, &events).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
for _ in 0..50 {
replayer.next_event().unwrap();
}
let checkpoint = replayer.checkpoint();
assert_eq!(checkpoint.events_processed, 50);
assert_eq!(checkpoint.total_events, 100);
let checkpoint_bytes = checkpoint.to_bytes().unwrap();
let restored_checkpoint = ReplayCheckpoint::from_bytes(&checkpoint_bytes).unwrap();
let mut resumed = StreamingReplayer::resume(path, restored_checkpoint).unwrap();
assert_eq!(resumed.events_consumed(), 50);
let mut count = 50u64;
while let Some(event) = resumed.next_event().unwrap() {
if let ReplayEvent::TaskScheduled { task, .. } = event {
assert_eq!(task.0, count);
}
count += 1;
}
assert_eq!(count, 100);
}
#[test]
fn checkpoint_validation() {
let temp1 = NamedTempFile::new().unwrap();
let temp2 = NamedTempFile::new().unwrap();
let metadata1 = TraceMetadata::new(42);
let metadata2 = TraceMetadata::new(99);
write_trace(temp1.path(), &metadata1, &sample_events(100)).unwrap();
write_trace(temp2.path(), &metadata2, &sample_events(100)).unwrap();
let mut replayer = StreamingReplayer::open(temp1.path()).unwrap();
for _ in 0..50 {
replayer.next_event().unwrap();
}
let checkpoint = replayer.checkpoint();
let result = StreamingReplayer::resume(temp2.path(), checkpoint);
assert!(matches!(
result,
Err(StreamingReplayError::CheckpointMismatch(_))
));
}
#[test]
fn checkpoint_validation_rejects_same_seed_metadata_drift() {
let temp1 = NamedTempFile::new().unwrap();
let temp2 = NamedTempFile::new().unwrap();
let metadata1 = TraceMetadata {
version: super::super::replay::REPLAY_SCHEMA_VERSION,
seed: 42,
recorded_at: 100,
config_hash: 0xCAFE,
description: Some("trace-a".into()),
};
let metadata2 = TraceMetadata {
version: super::super::replay::REPLAY_SCHEMA_VERSION,
seed: 42,
recorded_at: 200,
config_hash: 0xCAFE,
description: Some("trace-b".into()),
};
write_trace(temp1.path(), &metadata1, &sample_events(4)).unwrap();
write_trace(temp2.path(), &metadata2, &sample_events(4)).unwrap();
let mut replayer = StreamingReplayer::open(temp1.path()).unwrap();
for _ in 0..2 {
replayer.next_event().unwrap();
}
let checkpoint = replayer.checkpoint();
let result = StreamingReplayer::resume(temp2.path(), checkpoint);
assert!(matches!(
result,
Err(StreamingReplayError::CheckpointMismatch(_))
));
}
#[test]
fn checkpoint_validation_rejects_event_count_drift() {
let temp1 = NamedTempFile::new().unwrap();
let temp2 = NamedTempFile::new().unwrap();
let metadata = TraceMetadata {
version: super::super::replay::REPLAY_SCHEMA_VERSION,
seed: 7,
recorded_at: 500,
config_hash: 0xBEEF,
description: Some("same-metadata".into()),
};
write_trace(temp1.path(), &metadata, &sample_events(4)).unwrap();
write_trace(temp2.path(), &metadata, &sample_events(6)).unwrap();
let mut replayer = StreamingReplayer::open(temp1.path()).unwrap();
for _ in 0..2 {
replayer.next_event().unwrap();
}
let checkpoint = replayer.checkpoint();
let result = StreamingReplayer::resume(temp2.path(), checkpoint);
assert!(matches!(
result,
Err(StreamingReplayError::CheckpointMismatch(_))
));
}
#[test]
fn checkpoint_bytes_are_stable_for_same_position() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata {
version: super::super::replay::REPLAY_SCHEMA_VERSION,
seed: 42,
recorded_at: 1_000,
config_hash: 0xCAFE,
description: Some("stable checkpoint".into()),
};
write_trace(path, &metadata, &sample_events(5)).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
for _ in 0..3 {
replayer.next_event().unwrap();
}
let checkpoint_a = replayer.checkpoint();
let checkpoint_b = replayer.checkpoint();
assert_eq!(checkpoint_a.events_processed, 3);
assert_eq!(checkpoint_a.total_events, 5);
assert_eq!(checkpoint_a.created_at, 1_003);
assert_eq!(checkpoint_a.created_at, checkpoint_b.created_at);
assert_eq!(
checkpoint_a.to_bytes().unwrap(),
checkpoint_b.to_bytes().unwrap()
);
}
#[test]
fn checkpoint_created_at_advances_with_position() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata {
version: super::super::replay::REPLAY_SCHEMA_VERSION,
seed: 7,
recorded_at: 500,
config_hash: 0xBEEF,
description: None,
};
write_trace(path, &metadata, &sample_events(4)).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
let first = replayer.checkpoint();
assert_eq!(first.created_at, 500);
replayer.next_event().unwrap();
let second = replayer.checkpoint();
assert_eq!(second.created_at, 501);
assert_eq!(second.created_at, first.created_at + 1);
replayer.next_event().unwrap();
let third = replayer.checkpoint();
assert_eq!(third.created_at, 502);
}
#[test]
fn run_with_callback() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = sample_events(50);
write_trace(path, &metadata, &events).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
let mut event_ids = Vec::new();
let count = replayer
.run_with(|event, progress| {
if let ReplayEvent::TaskScheduled { task, .. } = event {
event_ids.push(task.0);
}
assert!(!progress.is_complete() || progress.events_processed == 50);
Ok::<_, StreamingReplayError>(())
})
.unwrap();
assert_eq!(count, 50);
assert_eq!(event_ids.len(), 50);
for (i, id) in event_ids.iter().enumerate() {
assert_eq!(*id, i as u64);
}
}
#[test]
fn large_trace_streaming() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
let event_count = 10_000u64;
{
let mut writer = TraceWriter::create(path).unwrap();
writer.write_metadata(&metadata).unwrap();
for i in 0..event_count {
writer
.write_event(&ReplayEvent::TaskScheduled {
task: CompactTaskId(i),
at_tick: i,
})
.unwrap();
}
writer.finish().unwrap();
}
let mut replayer = StreamingReplayer::open(path).unwrap();
assert_eq!(replayer.total_events(), event_count);
let mut count = 0u64;
while replayer.next_event().unwrap().is_some() {
count += 1;
}
assert_eq!(count, event_count);
}
#[test]
fn step_mode_streaming() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = sample_events(5);
write_trace(path, &metadata, &events).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
replayer.set_mode(ReplayMode::Step);
for _ in 0..5 {
replayer.step().unwrap();
assert!(replayer.at_breakpoint());
}
let event = replayer.step().unwrap();
assert!(event.is_none());
}
#[test]
fn breakpoint_at_tick() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events: Vec<_> = (0..10)
.map(|i| ReplayEvent::TaskScheduled {
task: CompactTaskId(i),
at_tick: i * 10, })
.collect();
write_trace(path, &metadata, &events).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
replayer.set_mode(ReplayMode::RunTo(Breakpoint::Tick(50)));
let count = replayer.run().unwrap();
assert!(replayer.at_breakpoint());
assert_eq!(count, 6); }
#[test]
fn empty_trace() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
write_trace(path, &metadata, &[]).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
assert_eq!(replayer.total_events(), 0);
assert!(replayer.progress().is_complete());
let event = replayer.next_event().unwrap();
assert!(event.is_none());
}
#[test]
fn verify_past_end_of_trace_reports_trace_exhausted() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = vec![ReplayEvent::RngSeed { seed: 42 }];
write_trace(path, &metadata, &events).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
assert!(replayer.next_event().unwrap().is_some());
assert!(replayer.is_complete());
let actual = ReplayEvent::RngSeed { seed: 99 };
let err = replayer.verify(&actual).unwrap_err();
match err {
StreamingReplayError::Divergence(divergence) => {
assert!(divergence.expected.is_none());
assert_eq!(divergence.index, 1);
assert!(divergence.context.contains("Trace ended"));
assert!(format!("{divergence}").contains("<trace_exhausted>"));
}
other => panic!("expected divergence error, got {other:?}"),
}
}
#[test]
fn verify_mismatch_preserves_expected_event() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = vec![ReplayEvent::TaskScheduled {
task: CompactTaskId(1),
at_tick: 10,
}];
write_trace(path, &metadata, &events).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
let actual = ReplayEvent::TaskScheduled {
task: CompactTaskId(2),
at_tick: 10,
};
let err = replayer.verify(&actual).unwrap_err();
match err {
StreamingReplayError::Divergence(divergence) => {
assert_eq!(
divergence.expected,
Some(ReplayEvent::TaskScheduled {
task: CompactTaskId(1),
at_tick: 10,
})
);
assert_eq!(divergence.actual, actual);
assert_eq!(divergence.index, 0);
}
other => panic!("expected divergence error, got {other:?}"),
}
}
#[test]
fn progress_display() {
let progress = ReplayProgress::new(250, 1000);
let display = format!("{progress}");
assert!(display.contains("250/1000"));
assert!(display.contains("25.0%"));
}
#[test]
fn run_with_respects_runto_breakpoint() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events: Vec<_> = (0..10)
.map(|i| ReplayEvent::TaskScheduled {
task: CompactTaskId(i),
at_tick: i * 10,
})
.collect();
write_trace(path, &metadata, &events).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
replayer.set_mode(ReplayMode::RunTo(Breakpoint::Tick(50)));
let count = replayer
.run_with(|_, _| Ok::<_, StreamingReplayError>(()))
.unwrap();
assert_eq!(count, 6);
assert!(replayer.at_breakpoint());
}
#[test]
fn run_with_respects_step_mode() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(7);
let events = sample_events(5);
write_trace(path, &metadata, &events).unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
replayer.set_mode(ReplayMode::Step);
let count = replayer
.run_with(|_, _| Ok::<_, StreamingReplayError>(()))
.unwrap();
assert_eq!(count, 1);
assert!(replayer.at_breakpoint());
}
#[test]
fn event_source_adapter_captures_stream_error() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = vec![ReplayEvent::RngSeed { seed: 42 }];
write_trace(path, &metadata, &events).unwrap();
let meta_len = rmp_serde::to_vec(&metadata).unwrap().len() as u64;
let first_event_payload = HEADER_SIZE as u64 + meta_len + 8 + 4;
let mut file = OpenOptions::new().write(true).open(path).unwrap();
file.seek(SeekFrom::Start(first_event_payload)).unwrap();
file.write_all(&[0xC1]).unwrap(); file.flush().unwrap();
let mut replayer = StreamingReplayer::open(path).unwrap();
let event = <StreamingReplayer as EventSource>::next_event(&mut replayer);
assert!(event.is_none());
let err = replayer
.take_event_source_error()
.expect("expected captured event-source error");
assert!(matches!(err, StreamingReplayError::File(_)));
assert!(replayer.last_event_source_error().is_none());
}
}