Skip to main content

asupersync/trace/
file.rs

1//! Trace file format for persisting and loading replay traces.
2//!
3//! This module provides a binary file format for saving traces to disk and
4//! loading them for replay. The format is designed for:
5//!
6//! - **Compactness**: Uses MessagePack for efficient binary encoding
7//! - **Versioning**: Format version in header for forward compatibility
8//! - **Streaming**: Events can be read incrementally without loading all into memory
9//! - **Compression**: Optional LZ4 compression for reduced storage (feature-gated)
10//!
11//! # File Format
12//!
13//! ```text
14//! +-------------------+
15//! | Magic (11 bytes)  |  "ASUPERTRACE"
16//! +-------------------+
17//! | Version (2 bytes) |  u16 little-endian
18//! +-------------------+
19//! | Flags (2 bytes)   |  u16 little-endian (bit 0 = compressed)
20//! +-------------------+
21//! | Compression (1 b) |  u8 (0=none, 1=lz4)
22//! +-------------------+
23//! | Meta len (4 bytes)|  u32 little-endian
24//! +-------------------+
25//! | Metadata (msgpack)|  TraceMetadata
26//! +-------------------+
27//! | Event count (8 b) |  u64 little-endian
28//! +-------------------+
29//! | Events (msgpack)  |  [ReplayEvent] length-prefixed (optionally compressed)
30//! +-------------------+
31//! ```
32//!
33//! # Compression
34//!
35//! When compression is enabled (via the `trace-compression` feature), events are
36//! compressed in chunks using LZ4 for efficient streaming compression/decompression.
37//! Compression is auto-detected on read based on the flags in the header.
38//!
39//! # Example
40//!
41//! ```ignore
42//! use asupersync::trace::file::{TraceWriter, TraceReader, CompressionMode};
43//! use asupersync::trace::replay::{ReplayEvent, TraceMetadata};
44//!
45//! // Writing a compressed trace
46//! let config = TraceFileConfig::default().with_compression(CompressionMode::Lz4 { level: 1 });
47//! let mut writer = TraceWriter::create_with_config("trace.bin", config)?;
48//! writer.write_metadata(&TraceMetadata::new(42))?;
49//! writer.write_event(&ReplayEvent::RngSeed { seed: 42 })?;
50//! writer.finish()?;
51//!
52//! // Reading auto-detects compression
53//! let reader = TraceReader::open("trace.bin")?;
54//! println!("Seed: {}", reader.metadata().seed);
55//! for event in reader.events() {
56//!     let event = event?;
57//!     println!("{:?}", event);
58//! }
59//! ```
60
61use super::recorder::{DEFAULT_MAX_FILE_SIZE, LimitAction, LimitKind, LimitReached};
62use super::replay::{REPLAY_SCHEMA_VERSION, ReplayEvent, TraceMetadata};
63use crate::tracing_compat::{error, warn};
64use libc::ENOSPC;
65use std::fs::File;
66use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
67use std::path::Path;
68
69// =============================================================================
70// Constants
71// =============================================================================
72
73/// Magic bytes at the start of every trace file.
74pub const TRACE_MAGIC: &[u8; 11] = b"ASUPERTRACE";
75
76/// Current file format version.
77/// Version 2 adds compression byte after flags.
78pub const TRACE_FILE_VERSION: u16 = 2;
79
80/// Flag: Events are LZ4 compressed.
81pub const FLAG_COMPRESSED: u16 = 0x0001;
82
83/// Header size (magic + version + flags + compression + meta_len).
84pub const HEADER_SIZE: usize = 11 + 2 + 2 + 1 + 4;
85
86/// Default chunk size for streaming compression (64KB).
87pub const DEFAULT_COMPRESSION_CHUNK_SIZE: usize = 64 * 1024;
88
89/// Threshold for auto-compression (1MB).
90pub const AUTO_COMPRESSION_THRESHOLD: usize = 1024 * 1024;
91
92// =============================================================================
93// Compression Types
94// =============================================================================
95
96/// Compression mode for trace files.
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
98pub enum CompressionMode {
99    /// No compression.
100    #[default]
101    None,
102
103    /// LZ4 compression with configurable level.
104    ///
105    /// Level ranges from -1 (fast) to 16 (best compression).
106    /// Default level is 1 which provides good balance.
107    #[cfg(feature = "trace-compression")]
108    Lz4 {
109        /// Compression level (-1 to 16, default 1).
110        level: i32,
111    },
112
113    /// Auto-select compression based on trace size.
114    ///
115    /// Compresses if estimated size exceeds 1MB.
116    #[cfg(feature = "trace-compression")]
117    Auto,
118}
119
120impl CompressionMode {
121    /// Returns true if this mode enables compression.
122    #[must_use]
123    pub fn is_compressed(&self) -> bool {
124        match self {
125            Self::None => false,
126            #[cfg(feature = "trace-compression")]
127            Self::Lz4 { .. } | Self::Auto => true,
128        }
129    }
130
131    /// Returns the compression byte for the file header.
132    fn to_byte(self) -> u8 {
133        match self {
134            Self::None => 0,
135            #[cfg(feature = "trace-compression")]
136            Self::Lz4 { .. } | Self::Auto => 1,
137        }
138    }
139
140    /// Creates a compression mode from the header byte.
141    fn from_byte(byte: u8) -> Option<Self> {
142        match byte {
143            0 => Some(Self::None),
144            #[cfg(feature = "trace-compression")]
145            1 => Some(Self::Lz4 { level: 1 }),
146            #[cfg(not(feature = "trace-compression"))]
147            1 => None, // Compressed but feature not enabled
148            _ => None,
149        }
150    }
151}
152
153/// Configuration for trace file operations.
154#[derive(Debug, Clone)]
155pub struct TraceFileConfig {
156    /// Compression mode for writing.
157    pub compression: CompressionMode,
158
159    /// Chunk size for streaming compression (default: 64KB).
160    pub chunk_size: usize,
161
162    /// Maximum events to write before stopping.
163    /// Default: None (unlimited).
164    pub max_events: Option<u64>,
165
166    /// Maximum file size for trace file.
167    /// Default: 1GB.
168    pub max_file_size: u64,
169
170    /// Action when limit reached.
171    pub on_limit: LimitAction,
172}
173
174impl Default for TraceFileConfig {
175    fn default() -> Self {
176        Self {
177            compression: CompressionMode::None,
178            chunk_size: DEFAULT_COMPRESSION_CHUNK_SIZE,
179            max_events: None,
180            max_file_size: DEFAULT_MAX_FILE_SIZE,
181            on_limit: LimitAction::StopRecording,
182        }
183    }
184}
185
186impl TraceFileConfig {
187    /// Creates a new config with default settings.
188    #[must_use]
189    pub fn new() -> Self {
190        Self::default()
191    }
192
193    /// Sets the compression mode.
194    #[must_use]
195    pub fn with_compression(mut self, mode: CompressionMode) -> Self {
196        self.compression = mode;
197        self
198    }
199
200    /// Sets the chunk size for streaming compression.
201    #[must_use]
202    pub fn with_chunk_size(mut self, size: usize) -> Self {
203        self.chunk_size = size;
204        self
205    }
206
207    /// Sets a maximum number of events to write.
208    #[must_use]
209    pub const fn with_max_events(mut self, max_events: Option<u64>) -> Self {
210        self.max_events = max_events;
211        self
212    }
213
214    /// Sets a maximum file size for the trace file.
215    #[must_use]
216    pub const fn with_max_file_size(mut self, max_file_size: u64) -> Self {
217        self.max_file_size = max_file_size;
218        self
219    }
220
221    /// Sets the limit action policy.
222    #[must_use]
223    pub fn on_limit(mut self, action: LimitAction) -> Self {
224        self.on_limit = action;
225        self
226    }
227}
228
229// =============================================================================
230// Error Types
231// =============================================================================
232
233/// Errors that can occur when working with trace files.
234#[derive(Debug, thiserror::Error)]
235pub enum TraceFileError {
236    /// I/O error during file operations.
237    #[error("I/O error: {0}")]
238    Io(#[from] io::Error),
239
240    /// Invalid magic bytes in file header.
241    #[error("invalid magic bytes: not a trace file")]
242    InvalidMagic,
243
244    /// Unsupported file format version.
245    #[error("unsupported file version: expected <= {expected}, found {found}")]
246    UnsupportedVersion {
247        /// Maximum supported version.
248        expected: u16,
249        /// Found version.
250        found: u16,
251    },
252
253    /// Unsupported flags in header.
254    #[error("unsupported flags: {0:#06x}")]
255    UnsupportedFlags(u16),
256
257    /// Unsupported compression format.
258    #[error("unsupported compression format: {0}")]
259    UnsupportedCompression(u8),
260
261    /// Compression not available (feature not enabled).
262    #[error("file is compressed but trace-compression feature is not enabled")]
263    CompressionNotAvailable,
264
265    /// Compression error.
266    #[error("compression error: {0}")]
267    Compression(String),
268
269    /// Decompression error.
270    #[error("decompression error: {0}")]
271    Decompression(String),
272
273    /// Error serializing data.
274    #[error("serialization error: {0}")]
275    Serialize(String),
276
277    /// Error deserializing data.
278    #[error("deserialization error: {0}")]
279    Deserialize(String),
280
281    /// Metadata mismatch (schema version).
282    #[error("schema version mismatch: expected {expected}, found {found}")]
283    SchemaMismatch {
284        /// Expected schema version.
285        expected: u32,
286        /// Found schema version.
287        found: u32,
288    },
289
290    /// Writer already finished.
291    #[error("writer already finished")]
292    AlreadyFinished,
293
294    /// File is truncated or corrupt.
295    #[error("file truncated or corrupt")]
296    Truncated,
297}
298
299impl From<rmp_serde::encode::Error> for TraceFileError {
300    fn from(e: rmp_serde::encode::Error) -> Self {
301        Self::Serialize(e.to_string())
302    }
303}
304
305impl From<rmp_serde::decode::Error> for TraceFileError {
306    fn from(e: rmp_serde::decode::Error) -> Self {
307        Self::Deserialize(e.to_string())
308    }
309}
310
311/// Result type for trace file operations.
312pub type TraceFileResult<T> = Result<T, TraceFileError>;
313
314// =============================================================================
315// TraceWriter
316// =============================================================================
317
318/// Writer for streaming trace events to a file.
319///
320/// Events are written incrementally, allowing large traces to be written
321/// without holding all events in memory. When compression is enabled,
322/// events are buffered and compressed in chunks.
323///
324/// # Example
325///
326/// ```ignore
327/// let mut writer = TraceWriter::create("trace.bin")?;
328/// writer.write_metadata(&TraceMetadata::new(42))?;
329/// for event in events {
330///     writer.write_event(&event)?;
331/// }
332/// writer.finish()?;
333/// ```
334pub struct TraceWriter {
335    writer: BufWriter<File>,
336    event_count: u64,
337    event_count_pos: u64,
338    finished: bool,
339    config: TraceFileConfig,
340    bytes_written: u64,
341    buffered_bytes: u64,
342    stopped: bool,
343    halted: bool,
344    /// Buffer for uncompressed event data (used in chunked compression).
345    #[cfg(feature = "trace-compression")]
346    event_buffer: Vec<u8>,
347}
348
349impl TraceWriter {
350    /// Creates a new trace file for writing with default configuration.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if the file cannot be created.
355    pub fn create(path: impl AsRef<Path>) -> TraceFileResult<Self> {
356        Self::create_with_config(path, TraceFileConfig::default())
357    }
358
359    /// Creates a new trace file for writing with custom configuration.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if the file cannot be created.
364    pub fn create_with_config(
365        path: impl AsRef<Path>,
366        config: TraceFileConfig,
367    ) -> TraceFileResult<Self> {
368        let file = File::create(path)?;
369        let writer = BufWriter::new(file);
370
371        Ok(Self {
372            writer,
373            event_count: 0,
374            event_count_pos: 0,
375            finished: false,
376            config,
377            bytes_written: 0,
378            buffered_bytes: 0,
379            stopped: false,
380            halted: false,
381            #[cfg(feature = "trace-compression")]
382            event_buffer: Vec::new(),
383        })
384    }
385
386    fn should_write(&self) -> bool {
387        !self.stopped && !self.halted
388    }
389
390    fn resolve_limit_action(&self, info: &LimitReached) -> LimitAction {
391        match &self.config.on_limit {
392            LimitAction::Callback(cb) => (cb)(info.clone()),
393            other => other.clone(),
394        }
395    }
396
397    fn handle_limit(&mut self, info: &LimitReached) -> TraceFileResult<bool> {
398        let mut action = self.resolve_limit_action(info);
399        if matches!(action, LimitAction::Callback(_)) {
400            action = LimitAction::StopRecording;
401        }
402
403        match action {
404            LimitAction::StopRecording => {
405                warn!(
406                    kind = ?info.kind,
407                    current_events = info.current_events,
408                    max_events = ?info.max_events,
409                    current_bytes = info.current_bytes,
410                    max_bytes = info.max_bytes,
411                    "trace write stopped: limit reached"
412                );
413                self.stopped = true;
414                Ok(false)
415            }
416            LimitAction::DropOldest => {
417                warn!(
418                    kind = ?info.kind,
419                    "trace write stopped: drop-oldest not supported for file writer"
420                );
421                self.stopped = true;
422                Ok(false)
423            }
424            LimitAction::Fail => {
425                error!(
426                    kind = ?info.kind,
427                    current_events = info.current_events,
428                    max_events = ?info.max_events,
429                    current_bytes = info.current_bytes,
430                    max_bytes = info.max_bytes,
431                    "trace write failed: limit exceeded"
432                );
433                self.stopped = true;
434                Err(TraceFileError::Io(io::Error::other(
435                    "trace write limit exceeded",
436                )))
437            }
438            LimitAction::Callback(_) => {
439                self.stopped = true;
440                Ok(false)
441            }
442        }
443    }
444
445    fn is_disk_full(err: &io::Error) -> bool {
446        err.raw_os_error() == Some(ENOSPC)
447    }
448
449    fn handle_disk_full(&mut self, err: io::Error) -> TraceFileError {
450        warn!("trace write halted: disk full (ENOSPC). Free space and retry recording.");
451        self.halted = true;
452        TraceFileError::Io(err)
453    }
454
455    fn write_bytes(&mut self, bytes: &[u8]) -> TraceFileResult<()> {
456        if self.halted {
457            return Ok(());
458        }
459        match self.writer.write_all(bytes) {
460            Ok(()) => {
461                self.bytes_written = self.bytes_written.saturating_add(bytes.len() as u64);
462                Ok(())
463            }
464            Err(err) if Self::is_disk_full(&err) => Err(self.handle_disk_full(err)),
465            Err(err) => Err(TraceFileError::Io(err)),
466        }
467    }
468
469    fn update_event_count(&mut self) -> TraceFileResult<()> {
470        let file = self.writer.get_mut();
471        file.seek(SeekFrom::Start(self.event_count_pos))?;
472        file.write_all(&self.event_count.to_le_bytes())?;
473        file.flush()?;
474        Ok(())
475    }
476
477    fn update_event_count_best_effort(&mut self) {
478        if let Err(err) = self.update_event_count() {
479            if matches!(
480                &err,
481                TraceFileError::Io(io_err) if Self::is_disk_full(io_err)
482            ) {
483                warn!("trace event count update skipped: disk full");
484            }
485            warn!("trace event count update skipped: {err}");
486        }
487    }
488
489    /// Writes the trace metadata (must be called first).
490    ///
491    /// This writes the file header including magic bytes, version,
492    /// flags, compression mode, and the serialized metadata.
493    ///
494    /// # Errors
495    ///
496    /// Returns an error if writing fails or the writer was already finished.
497    pub fn write_metadata(&mut self, metadata: &TraceMetadata) -> TraceFileResult<()> {
498        if self.finished {
499            return Err(TraceFileError::AlreadyFinished);
500        }
501
502        // Serialize metadata to get its length
503        let meta_bytes = rmp_serde::to_vec(metadata)?;
504
505        // Determine flags
506        let flags = if self.config.compression.is_compressed() {
507            FLAG_COMPRESSED
508        } else {
509            0
510        };
511
512        // Write header
513        self.write_bytes(TRACE_MAGIC)?;
514        self.write_bytes(&TRACE_FILE_VERSION.to_le_bytes())?;
515        self.write_bytes(&flags.to_le_bytes())?;
516        self.write_bytes(&[self.config.compression.to_byte()])?; // compression byte
517
518        // Write metadata length and data
519        let meta_len = meta_bytes.len() as u32;
520        self.write_bytes(&meta_len.to_le_bytes())?;
521        self.write_bytes(&meta_bytes)?;
522
523        // Write placeholder for event count (we'll update this in finish())
524        self.event_count_pos = HEADER_SIZE as u64 + u64::from(meta_len);
525        self.write_bytes(&0u64.to_le_bytes())?;
526
527        Ok(())
528    }
529
530    /// Writes a single replay event.
531    ///
532    /// Events are length-prefixed for streaming reads. When compression is
533    /// enabled, events are buffered and written in compressed chunks.
534    ///
535    /// # Errors
536    ///
537    /// Returns an error if serialization or writing fails.
538    pub fn write_event(&mut self, event: &ReplayEvent) -> TraceFileResult<()> {
539        if self.finished {
540            return Err(TraceFileError::AlreadyFinished);
541        }
542        if !self.should_write() {
543            return Ok(());
544        }
545
546        if let Some(max_events) = self.config.max_events {
547            if self.event_count.saturating_add(1) > max_events {
548                let info = LimitReached {
549                    kind: LimitKind::MaxEvents,
550                    current_events: self.event_count,
551                    max_events: Some(max_events),
552                    current_bytes: self.bytes_written,
553                    max_bytes: self.config.max_file_size,
554                    needed_bytes: 0,
555                };
556                if !self.handle_limit(&info)? {
557                    return Ok(());
558                }
559            }
560        }
561
562        // Serialize event with length prefix
563        let event_bytes = rmp_serde::to_vec(event)?;
564        let len = event_bytes.len() as u32;
565        let estimated_bytes = 4u64 + event_bytes.len() as u64;
566        let pending_bytes = self.bytes_written.saturating_add(self.buffered_bytes);
567
568        if self.config.max_file_size > 0
569            && pending_bytes.saturating_add(estimated_bytes) > self.config.max_file_size
570        {
571            let info = LimitReached {
572                kind: LimitKind::MaxFileSize,
573                current_events: self.event_count,
574                max_events: self.config.max_events,
575                current_bytes: pending_bytes,
576                max_bytes: self.config.max_file_size,
577                needed_bytes: estimated_bytes,
578            };
579            if !self.handle_limit(&info)? {
580                return Ok(());
581            }
582        }
583
584        #[cfg(feature = "trace-compression")]
585        if self.config.compression.is_compressed() {
586            // Buffer the event for chunk compression
587            self.event_buffer.extend_from_slice(&len.to_le_bytes());
588            self.event_buffer.extend_from_slice(&event_bytes);
589            self.buffered_bytes = self.buffered_bytes.saturating_add(estimated_bytes);
590            self.event_count += 1;
591
592            // Flush chunk if buffer exceeds threshold
593            if self.event_buffer.len() >= self.config.chunk_size {
594                self.flush_compressed_chunk()?;
595            }
596            return Ok(());
597        }
598
599        // Uncompressed: write directly
600        self.write_bytes(&len.to_le_bytes())?;
601        self.write_bytes(&event_bytes)?;
602        self.event_count += 1;
603        Ok(())
604    }
605
606    /// Flushes a compressed chunk of events to the file.
607    #[cfg(feature = "trace-compression")]
608    fn flush_compressed_chunk(&mut self) -> TraceFileResult<()> {
609        if self.event_buffer.is_empty() {
610            return Ok(());
611        }
612
613        // Compress the buffer
614        let compressed = lz4_flex::compress_prepend_size(&self.event_buffer);
615
616        // Write chunk: compressed_len (u32) + compressed_data
617        let chunk_len = compressed.len() as u32;
618        self.write_bytes(&chunk_len.to_le_bytes())?;
619        self.write_bytes(&compressed)?;
620
621        self.event_buffer.clear();
622        self.buffered_bytes = 0;
623        Ok(())
624    }
625
626    /// Finishes writing the trace file.
627    ///
628    /// This flushes any remaining compressed data, updates the event count
629    /// in the header, and flushes all data. Must be called to complete the
630    /// file properly.
631    ///
632    /// # Errors
633    ///
634    /// Returns an error if flushing or seeking fails.
635    pub fn finish(mut self) -> TraceFileResult<()> {
636        self.finished = true;
637
638        // Flush any remaining compressed data
639        #[cfg(feature = "trace-compression")]
640        if self.config.compression.is_compressed() {
641            self.flush_compressed_chunk()?;
642        }
643
644        if self.halted {
645            let _ = self.writer.flush();
646            self.update_event_count_best_effort();
647            return Ok(());
648        }
649
650        // Flush buffered data
651        self.writer.flush()?;
652
653        // Seek back and update event count
654        self.update_event_count()?;
655
656        Ok(())
657    }
658
659    /// Returns the number of events written so far.
660    #[must_use]
661    pub fn event_count(&self) -> u64 {
662        self.event_count
663    }
664}
665
666impl Drop for TraceWriter {
667    fn drop(&mut self) {
668        if !self.finished {
669            // Best-effort: try to flush but don't panic
670            let _ = self.writer.flush();
671        }
672    }
673}
674
675// =============================================================================
676// TraceReader
677// =============================================================================
678
679/// Reader for loading trace files.
680///
681/// Supports streaming reads where events are loaded incrementally.
682/// Compression is auto-detected from the file header.
683///
684/// # Example
685///
686/// ```ignore
687/// let reader = TraceReader::open("trace.bin")?;
688/// println!("Seed: {}", reader.metadata().seed);
689/// println!("Events: {}", reader.event_count());
690/// println!("Compressed: {}", reader.is_compressed());
691///
692/// for event in reader.events() {
693///     let event = event?;
694///     println!("{:?}", event);
695/// }
696/// ```
697pub struct TraceReader {
698    reader: BufReader<File>,
699    metadata: TraceMetadata,
700    event_count: u64,
701    events_read: u64,
702    events_start_pos: u64,
703    compression: CompressionMode,
704    /// Buffer for decompressed event data.
705    #[cfg(feature = "trace-compression")]
706    decompressed_buffer: Vec<u8>,
707    /// Position in decompressed buffer.
708    #[cfg(feature = "trace-compression")]
709    buffer_pos: usize,
710}
711
712impl TraceReader {
713    /// Opens a trace file for reading.
714    ///
715    /// Compression is auto-detected from the file header.
716    ///
717    /// # Errors
718    ///
719    /// Returns an error if:
720    /// - The file cannot be opened
721    /// - The file has invalid magic bytes
722    /// - The file version is unsupported
723    /// - The file is compressed but the `trace-compression` feature is not enabled
724    /// - The metadata is corrupt
725    pub fn open(path: impl AsRef<Path>) -> TraceFileResult<Self> {
726        let file = File::open(path)?;
727        let mut reader = BufReader::new(file);
728
729        // Read and validate magic
730        let mut magic = [0u8; 11];
731        reader.read_exact(&mut magic)?;
732        if &magic != TRACE_MAGIC {
733            return Err(TraceFileError::InvalidMagic);
734        }
735
736        // Read version
737        let mut version_bytes = [0u8; 2];
738        reader.read_exact(&mut version_bytes)?;
739        let version = u16::from_le_bytes(version_bytes);
740        if version > TRACE_FILE_VERSION {
741            return Err(TraceFileError::UnsupportedVersion {
742                expected: TRACE_FILE_VERSION,
743                found: version,
744            });
745        }
746
747        // Read flags
748        let mut flags_bytes = [0u8; 2];
749        reader.read_exact(&mut flags_bytes)?;
750        let flags = u16::from_le_bytes(flags_bytes);
751        let is_compressed = flags & FLAG_COMPRESSED != 0;
752
753        // Read compression byte (only in version 2+)
754        let compression = if version >= 2 {
755            let mut comp_byte = [0u8; 1];
756            reader.read_exact(&mut comp_byte)?;
757            match CompressionMode::from_byte(comp_byte[0]) {
758                Some(mode) => mode,
759                None if is_compressed => {
760                    return Err(TraceFileError::UnsupportedCompression(comp_byte[0]));
761                }
762                None => CompressionMode::None,
763            }
764        } else {
765            // Version 1 files don't have compression byte
766            if is_compressed {
767                return Err(TraceFileError::UnsupportedFlags(flags));
768            }
769            CompressionMode::None
770        };
771
772        // Check if we can handle compression
773        #[cfg(not(feature = "trace-compression"))]
774        if compression.is_compressed() {
775            return Err(TraceFileError::CompressionNotAvailable);
776        }
777
778        // Read metadata length
779        let mut meta_len_bytes = [0u8; 4];
780        reader.read_exact(&mut meta_len_bytes)?;
781        let meta_len = u32::from_le_bytes(meta_len_bytes) as usize;
782
783        // Read metadata
784        let mut meta_bytes = vec![0u8; meta_len];
785        reader.read_exact(&mut meta_bytes)?;
786        let metadata: TraceMetadata = rmp_serde::from_slice(&meta_bytes)?;
787
788        // Validate schema version
789        if metadata.version != REPLAY_SCHEMA_VERSION {
790            return Err(TraceFileError::SchemaMismatch {
791                expected: REPLAY_SCHEMA_VERSION,
792                found: metadata.version,
793            });
794        }
795
796        // Read event count
797        let mut event_count_bytes = [0u8; 8];
798        reader.read_exact(&mut event_count_bytes)?;
799        let event_count = u64::from_le_bytes(event_count_bytes);
800
801        // Calculate events start position (header size depends on version)
802        let header_size = if version >= 2 {
803            HEADER_SIZE
804        } else {
805            HEADER_SIZE - 1
806        };
807        let events_start_pos = header_size as u64 + meta_len as u64 + 8;
808
809        Ok(Self {
810            reader,
811            metadata,
812            event_count,
813            events_read: 0,
814            events_start_pos,
815            compression,
816            #[cfg(feature = "trace-compression")]
817            decompressed_buffer: Vec::new(),
818            #[cfg(feature = "trace-compression")]
819            buffer_pos: 0,
820        })
821    }
822
823    /// Returns true if the trace file is compressed.
824    #[must_use]
825    pub fn is_compressed(&self) -> bool {
826        self.compression.is_compressed()
827    }
828
829    /// Returns the compression mode of the trace file.
830    #[must_use]
831    pub fn compression(&self) -> CompressionMode {
832        self.compression
833    }
834
835    /// Returns the trace metadata.
836    #[must_use]
837    pub fn metadata(&self) -> &TraceMetadata {
838        &self.metadata
839    }
840
841    /// Returns the total number of events in the trace.
842    #[must_use]
843    pub fn event_count(&self) -> u64 {
844        self.event_count
845    }
846
847    /// Returns the number of events read so far.
848    #[must_use]
849    pub fn events_read(&self) -> u64 {
850        self.events_read
851    }
852
853    /// Returns an iterator over the events in the trace.
854    ///
855    /// Events are read incrementally from the file.
856    /// Automatically handles decompression for compressed files.
857    #[must_use]
858    pub fn events(self) -> TraceEventIterator {
859        TraceEventIterator {
860            reader: self.reader,
861            remaining: self.event_count,
862            compression: self.compression,
863            #[cfg(feature = "trace-compression")]
864            decompressed_buffer: self.decompressed_buffer,
865            #[cfg(feature = "trace-compression")]
866            buffer_pos: self.buffer_pos,
867        }
868    }
869
870    /// Reads the next event from the trace.
871    ///
872    /// Returns `None` when all events have been read.
873    /// Automatically handles decompression for compressed files.
874    ///
875    /// # Errors
876    ///
877    /// Returns an error if reading or deserialization fails.
878    pub fn read_event(&mut self) -> TraceFileResult<Option<ReplayEvent>> {
879        if self.events_read >= self.event_count {
880            return Ok(None);
881        }
882
883        #[cfg(feature = "trace-compression")]
884        if self.compression.is_compressed() {
885            return self.read_compressed_event();
886        }
887
888        // Uncompressed read
889        self.read_uncompressed_event()
890    }
891
892    /// Reads an event from uncompressed data.
893    fn read_uncompressed_event(&mut self) -> TraceFileResult<Option<ReplayEvent>> {
894        // Read event length
895        let mut len_bytes = [0u8; 4];
896        if self.reader.read_exact(&mut len_bytes).is_err() {
897            return Ok(None);
898        }
899        let len = u32::from_le_bytes(len_bytes) as usize;
900
901        // Read event data
902        let mut event_bytes = vec![0u8; len];
903        self.reader.read_exact(&mut event_bytes)?;
904
905        let event: ReplayEvent = rmp_serde::from_slice(&event_bytes)?;
906        self.events_read += 1;
907
908        Ok(Some(event))
909    }
910
911    /// Reads an event from compressed data.
912    #[cfg(feature = "trace-compression")]
913    fn read_compressed_event(&mut self) -> TraceFileResult<Option<ReplayEvent>> {
914        // Refill buffer if needed
915        if self.buffer_pos >= self.decompressed_buffer.len()
916            && !self.refill_decompressed_buffer()?
917        {
918            return Ok(None);
919        }
920
921        // Read event length from buffer
922        if self.buffer_pos + 4 > self.decompressed_buffer.len() {
923            return Err(TraceFileError::Truncated);
924        }
925        let len_bytes: [u8; 4] = self.decompressed_buffer[self.buffer_pos..self.buffer_pos + 4]
926            .try_into()
927            .map_err(|_| TraceFileError::Truncated)?;
928        let len = u32::from_le_bytes(len_bytes) as usize;
929        self.buffer_pos += 4;
930
931        // Read event data from buffer
932        if self.buffer_pos + len > self.decompressed_buffer.len() {
933            return Err(TraceFileError::Truncated);
934        }
935        let event_bytes = &self.decompressed_buffer[self.buffer_pos..self.buffer_pos + len];
936        let event: ReplayEvent = rmp_serde::from_slice(event_bytes)?;
937        self.buffer_pos += len;
938
939        self.events_read += 1;
940        Ok(Some(event))
941    }
942
943    /// Refills the decompressed buffer from the next compressed chunk.
944    #[cfg(feature = "trace-compression")]
945    fn refill_decompressed_buffer(&mut self) -> TraceFileResult<bool> {
946        // Read chunk length
947        let mut chunk_len_bytes = [0u8; 4];
948        match self.reader.read_exact(&mut chunk_len_bytes) {
949            Ok(()) => {}
950            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(false),
951            Err(e) => return Err(TraceFileError::Io(e)),
952        }
953        let chunk_len = u32::from_le_bytes(chunk_len_bytes) as usize;
954
955        if chunk_len == 0 {
956            return Ok(false);
957        }
958
959        // Read compressed chunk
960        let mut compressed = vec![0u8; chunk_len];
961        self.reader.read_exact(&mut compressed)?;
962
963        // Decompress
964        self.decompressed_buffer = lz4_flex::decompress_size_prepended(&compressed)
965            .map_err(|e| TraceFileError::Decompression(e.to_string()))?;
966        self.buffer_pos = 0;
967
968        Ok(true)
969    }
970
971    /// Resets the reader to the beginning of the events section.
972    ///
973    /// # Errors
974    ///
975    /// Returns an error if seeking fails.
976    pub fn rewind(&mut self) -> TraceFileResult<()> {
977        self.reader.seek(SeekFrom::Start(self.events_start_pos))?;
978        self.events_read = 0;
979
980        #[cfg(feature = "trace-compression")]
981        {
982            self.decompressed_buffer.clear();
983            self.buffer_pos = 0;
984        }
985
986        Ok(())
987    }
988
989    /// Loads all events into memory.
990    ///
991    /// This is convenient for small traces but may use significant memory
992    /// for large traces. Use [`events()`][Self::events] for streaming.
993    ///
994    /// # Errors
995    ///
996    /// Returns an error if reading fails.
997    pub fn load_all(mut self) -> TraceFileResult<Vec<ReplayEvent>> {
998        let mut events = Vec::with_capacity(self.event_count as usize);
999        while let Some(event) = self.read_event()? {
1000            events.push(event);
1001        }
1002        Ok(events)
1003    }
1004}
1005
1006// =============================================================================
1007// Iterator
1008// =============================================================================
1009
1010/// Iterator over trace events.
1011pub struct TraceEventIterator {
1012    reader: BufReader<File>,
1013    remaining: u64,
1014    compression: CompressionMode,
1015    /// Buffer for decompressed event data.
1016    #[cfg(feature = "trace-compression")]
1017    decompressed_buffer: Vec<u8>,
1018    /// Position in decompressed buffer.
1019    #[cfg(feature = "trace-compression")]
1020    buffer_pos: usize,
1021}
1022
1023impl Iterator for TraceEventIterator {
1024    type Item = TraceFileResult<ReplayEvent>;
1025
1026    fn next(&mut self) -> Option<Self::Item> {
1027        if self.remaining == 0 {
1028            return None;
1029        }
1030
1031        #[cfg(feature = "trace-compression")]
1032        if self.compression.is_compressed() {
1033            return self.next_compressed();
1034        }
1035
1036        self.next_uncompressed()
1037    }
1038
1039    fn size_hint(&self) -> (usize, Option<usize>) {
1040        let remaining = self.remaining as usize;
1041        (remaining, Some(remaining))
1042    }
1043}
1044
1045impl TraceEventIterator {
1046    /// Reads the next uncompressed event.
1047    fn next_uncompressed(&mut self) -> Option<TraceFileResult<ReplayEvent>> {
1048        // Read event length
1049        let mut len_bytes = [0u8; 4];
1050        if let Err(e) = self.reader.read_exact(&mut len_bytes) {
1051            if e.kind() == io::ErrorKind::UnexpectedEof {
1052                return None;
1053            }
1054            return Some(Err(TraceFileError::Io(e)));
1055        }
1056        let len = u32::from_le_bytes(len_bytes) as usize;
1057
1058        // Read event data
1059        let mut event_bytes = vec![0u8; len];
1060        if let Err(e) = self.reader.read_exact(&mut event_bytes) {
1061            return Some(Err(TraceFileError::Io(e)));
1062        }
1063
1064        match rmp_serde::from_slice(&event_bytes) {
1065            Ok(event) => {
1066                self.remaining -= 1;
1067                Some(Ok(event))
1068            }
1069            Err(e) => Some(Err(TraceFileError::from(e))),
1070        }
1071    }
1072
1073    /// Reads the next compressed event.
1074    #[cfg(feature = "trace-compression")]
1075    fn next_compressed(&mut self) -> Option<TraceFileResult<ReplayEvent>> {
1076        // Refill buffer if needed
1077        if self.buffer_pos >= self.decompressed_buffer.len() {
1078            match self.refill_buffer() {
1079                Ok(true) => {}
1080                Ok(false) => return None,
1081                Err(e) => return Some(Err(e)),
1082            }
1083        }
1084
1085        // Read event length from buffer
1086        if self.buffer_pos + 4 > self.decompressed_buffer.len() {
1087            return Some(Err(TraceFileError::Truncated));
1088        }
1089        let len_bytes: [u8; 4] =
1090            match self.decompressed_buffer[self.buffer_pos..self.buffer_pos + 4].try_into() {
1091                Ok(b) => b,
1092                Err(_) => return Some(Err(TraceFileError::Truncated)),
1093            };
1094        let len = u32::from_le_bytes(len_bytes) as usize;
1095        self.buffer_pos += 4;
1096
1097        // Read event data from buffer
1098        if self.buffer_pos + len > self.decompressed_buffer.len() {
1099            return Some(Err(TraceFileError::Truncated));
1100        }
1101        let event_bytes = &self.decompressed_buffer[self.buffer_pos..self.buffer_pos + len];
1102
1103        match rmp_serde::from_slice(event_bytes) {
1104            Ok(event) => {
1105                self.buffer_pos += len;
1106                self.remaining -= 1;
1107                Some(Ok(event))
1108            }
1109            Err(e) => Some(Err(TraceFileError::from(e))),
1110        }
1111    }
1112
1113    /// Refills the decompressed buffer from the next compressed chunk.
1114    #[cfg(feature = "trace-compression")]
1115    fn refill_buffer(&mut self) -> TraceFileResult<bool> {
1116        // Read chunk length
1117        let mut chunk_len_bytes = [0u8; 4];
1118        match self.reader.read_exact(&mut chunk_len_bytes) {
1119            Ok(()) => {}
1120            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(false),
1121            Err(e) => return Err(TraceFileError::Io(e)),
1122        }
1123        let chunk_len = u32::from_le_bytes(chunk_len_bytes) as usize;
1124
1125        if chunk_len == 0 {
1126            return Ok(false);
1127        }
1128
1129        // Read compressed chunk
1130        let mut compressed = vec![0u8; chunk_len];
1131        self.reader.read_exact(&mut compressed)?;
1132
1133        // Decompress
1134        self.decompressed_buffer = lz4_flex::decompress_size_prepended(&compressed)
1135            .map_err(|e| TraceFileError::Decompression(e.to_string()))?;
1136        self.buffer_pos = 0;
1137
1138        Ok(true)
1139    }
1140}
1141
1142impl ExactSizeIterator for TraceEventIterator {}
1143
1144// =============================================================================
1145// Convenience Functions
1146// =============================================================================
1147
1148/// Writes a complete trace to a file.
1149///
1150/// This is a convenience function for writing small traces.
1151/// For large traces, use [`TraceWriter`] for streaming writes.
1152///
1153/// # Errors
1154///
1155/// Returns an error if file creation or writing fails.
1156pub fn write_trace(
1157    path: impl AsRef<Path>,
1158    metadata: &TraceMetadata,
1159    events: &[ReplayEvent],
1160) -> TraceFileResult<()> {
1161    let mut writer = TraceWriter::create(path)?;
1162    writer.write_metadata(metadata)?;
1163    for event in events {
1164        writer.write_event(event)?;
1165    }
1166    writer.finish()
1167}
1168
1169/// Reads a complete trace from a file.
1170///
1171/// This is a convenience function for reading small traces.
1172/// For large traces, use [`TraceReader`] for streaming reads.
1173///
1174/// # Errors
1175///
1176/// Returns an error if file opening or reading fails.
1177pub fn read_trace(path: impl AsRef<Path>) -> TraceFileResult<(TraceMetadata, Vec<ReplayEvent>)> {
1178    let reader = TraceReader::open(path)?;
1179    let metadata = reader.metadata().clone();
1180    let events = reader.load_all()?;
1181    Ok((metadata, events))
1182}
1183
1184// =============================================================================
1185// Tests
1186// =============================================================================
1187
1188#[cfg(test)]
1189mod tests {
1190    use super::*;
1191    use crate::trace::replay::CompactTaskId;
1192    use std::sync::Arc;
1193    use std::sync::atomic::{AtomicUsize, Ordering};
1194    use tempfile::NamedTempFile;
1195
1196    fn sample_events() -> Vec<ReplayEvent> {
1197        vec![
1198            ReplayEvent::RngSeed { seed: 42 },
1199            ReplayEvent::TaskScheduled {
1200                task: CompactTaskId(1),
1201                at_tick: 0,
1202            },
1203            ReplayEvent::TimeAdvanced {
1204                from_nanos: 0,
1205                to_nanos: 1_000_000,
1206            },
1207            ReplayEvent::TaskYielded {
1208                task: CompactTaskId(1),
1209            },
1210            ReplayEvent::TaskScheduled {
1211                task: CompactTaskId(1),
1212                at_tick: 1,
1213            },
1214            ReplayEvent::TaskCompleted {
1215                task: CompactTaskId(1),
1216                outcome: 0,
1217            },
1218        ]
1219    }
1220
1221    // =========================================================================
1222    // Pure data-type tests (wave 40 – CyanBarn)
1223    // =========================================================================
1224
1225    #[test]
1226    fn compression_mode_debug_clone_copy_eq_default() {
1227        let def = CompressionMode::default();
1228        assert_eq!(def, CompressionMode::None);
1229        let copied = def;
1230        let cloned = def;
1231        assert_eq!(copied, cloned);
1232        assert!(!def.is_compressed());
1233        let dbg = format!("{def:?}");
1234        assert!(dbg.contains("None"));
1235    }
1236
1237    #[test]
1238    fn trace_file_config_debug_clone_default() {
1239        let def = TraceFileConfig::default();
1240        assert_eq!(def.compression, CompressionMode::None);
1241        assert_eq!(def.chunk_size, DEFAULT_COMPRESSION_CHUNK_SIZE);
1242        assert!(def.max_events.is_none());
1243        let cloned = def.clone();
1244        assert_eq!(cloned.compression, CompressionMode::None);
1245        let dbg = format!("{def:?}");
1246        assert!(dbg.contains("TraceFileConfig"));
1247    }
1248
1249    #[test]
1250    fn trace_file_error_debug_display() {
1251        let err = TraceFileError::InvalidMagic;
1252        let dbg = format!("{err:?}");
1253        assert!(dbg.contains("InvalidMagic"));
1254        let display = format!("{err}");
1255        assert!(display.contains("magic"));
1256
1257        let version_err = TraceFileError::UnsupportedVersion {
1258            expected: 2,
1259            found: 99,
1260        };
1261        let display2 = format!("{version_err}");
1262        assert!(display2.contains("99"));
1263    }
1264
1265    #[test]
1266    fn write_and_read_roundtrip() {
1267        let temp = NamedTempFile::new().expect("create temp file");
1268        let path = temp.path();
1269
1270        let metadata = TraceMetadata::new(42).with_description("test trace");
1271        let events = sample_events();
1272
1273        // Write
1274        write_trace(path, &metadata, &events).expect("write trace");
1275
1276        // Read
1277        let (read_meta, read_events) = read_trace(path).expect("read trace");
1278
1279        assert_eq!(read_meta.seed, metadata.seed);
1280        assert_eq!(read_meta.description, metadata.description);
1281        assert_eq!(read_events.len(), events.len());
1282
1283        for (orig, read) in events.iter().zip(read_events.iter()) {
1284            assert_eq!(orig, read);
1285        }
1286    }
1287
1288    #[test]
1289    fn streaming_write_and_read() {
1290        let temp = NamedTempFile::new().expect("create temp file");
1291        let path = temp.path();
1292
1293        let metadata = TraceMetadata::new(123);
1294        let events = sample_events();
1295
1296        // Streaming write
1297        {
1298            let mut writer = TraceWriter::create(path).expect("create writer");
1299            writer.write_metadata(&metadata).expect("write metadata");
1300            for event in &events {
1301                writer.write_event(event).expect("write event");
1302            }
1303            assert_eq!(writer.event_count(), events.len() as u64);
1304            writer.finish().expect("finish");
1305        }
1306
1307        // Streaming read
1308        {
1309            let reader = TraceReader::open(path).expect("open reader");
1310            assert_eq!(reader.metadata().seed, 123);
1311            assert_eq!(reader.event_count(), events.len() as u64);
1312
1313            let mut count = 0;
1314            for result in reader.events() {
1315                let event = result.expect("read event");
1316                assert_eq!(event, events[count]);
1317                count += 1;
1318            }
1319            assert_eq!(count, events.len());
1320        }
1321    }
1322
1323    #[test]
1324    fn reader_rewind() {
1325        let temp = NamedTempFile::new().expect("create temp file");
1326        let path = temp.path();
1327
1328        let metadata = TraceMetadata::new(42);
1329        let events = sample_events();
1330        write_trace(path, &metadata, &events).expect("write trace");
1331
1332        let mut reader = TraceReader::open(path).expect("open reader");
1333
1334        // Read first two events
1335        let e1 = reader.read_event().expect("read").expect("event");
1336        let e2 = reader.read_event().expect("read").expect("event");
1337        assert_eq!(reader.events_read(), 2);
1338
1339        // Rewind and verify we get the same events
1340        reader.rewind().expect("rewind");
1341        assert_eq!(reader.events_read(), 0);
1342
1343        let e1_again = reader.read_event().expect("read").expect("event");
1344        let e2_again = reader.read_event().expect("read").expect("event");
1345        assert_eq!(e1, e1_again);
1346        assert_eq!(e2, e2_again);
1347    }
1348
1349    #[test]
1350    fn empty_trace() {
1351        let temp = NamedTempFile::new().expect("create temp file");
1352        let path = temp.path();
1353
1354        let metadata = TraceMetadata::new(0);
1355        write_trace(path, &metadata, &[]).expect("write empty trace");
1356
1357        let (read_meta, read_events) = read_trace(path).expect("read empty trace");
1358        assert_eq!(read_meta.seed, 0);
1359        assert!(read_events.is_empty());
1360    }
1361
1362    #[test]
1363    fn large_trace() {
1364        let temp = NamedTempFile::new().expect("create temp file");
1365        let path = temp.path();
1366
1367        let metadata = TraceMetadata::new(42);
1368        let event_count = 10_000;
1369
1370        // Generate large trace
1371        let events: Vec<_> = (0..event_count)
1372            .map(|i| ReplayEvent::TaskScheduled {
1373                task: CompactTaskId(i),
1374                at_tick: i,
1375            })
1376            .collect();
1377
1378        write_trace(path, &metadata, &events).expect("write large trace");
1379
1380        // Read with streaming
1381        let reader = TraceReader::open(path).expect("open reader");
1382        assert_eq!(reader.event_count(), event_count);
1383
1384        let mut count = 0u64;
1385        for result in reader.events() {
1386            let event = result.expect("read event");
1387            if let ReplayEvent::TaskScheduled { task, at_tick } = event {
1388                assert_eq!(task.0, count);
1389                assert_eq!(at_tick, count);
1390            } else {
1391                unreachable!("unexpected event type");
1392            }
1393            count += 1;
1394        }
1395        assert_eq!(count, event_count);
1396    }
1397
1398    #[test]
1399    fn invalid_magic() {
1400        let temp = NamedTempFile::new().expect("create temp file");
1401        let path = temp.path();
1402
1403        // Write garbage
1404        std::fs::write(path, b"NOT A TRACE FILE").expect("write garbage");
1405
1406        let result = TraceReader::open(path);
1407        assert!(matches!(result, Err(TraceFileError::InvalidMagic)));
1408    }
1409
1410    #[test]
1411    fn file_size_reasonable() {
1412        let temp = NamedTempFile::new().expect("create temp file");
1413        let path = temp.path();
1414
1415        let metadata = TraceMetadata::new(42);
1416        let events: Vec<_> = (0..1000)
1417            .map(|i| ReplayEvent::TaskScheduled {
1418                task: CompactTaskId(i),
1419                at_tick: i,
1420            })
1421            .collect();
1422
1423        write_trace(path, &metadata, &events).expect("write trace");
1424
1425        let file_size = std::fs::metadata(path).expect("metadata").len();
1426        let file_size = u32::try_from(file_size).expect("trace file size fits u32 for test");
1427        let bytes_per_event = f64::from(file_size) / 1000.0;
1428
1429        // Should be well under 64 bytes per event
1430        assert!(
1431            bytes_per_event < 40.0,
1432            "File size too large: {bytes_per_event:.1} bytes/event"
1433        );
1434    }
1435
1436    #[test]
1437    fn writer_already_finished_error() {
1438        let temp = NamedTempFile::new().expect("create temp file");
1439        let path = temp.path();
1440
1441        let mut writer = TraceWriter::create(path).expect("create writer");
1442        writer
1443            .write_metadata(&TraceMetadata::new(42))
1444            .expect("write metadata");
1445        writer.finish().expect("finish");
1446
1447        // Attempting to use a finished writer should not be possible
1448        // because finish() consumes self, so this is compile-time safety
1449    }
1450
1451    #[test]
1452    fn write_stops_at_max_events() {
1453        let temp = NamedTempFile::new().expect("create temp file");
1454        let path = temp.path();
1455        let metadata = TraceMetadata::new(42);
1456        let events = sample_events();
1457
1458        let config = TraceFileConfig::new().with_max_events(Some(2));
1459        let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1460        writer.write_metadata(&metadata).expect("write metadata");
1461        for event in &events {
1462            writer.write_event(event).expect("write event");
1463        }
1464        writer.finish().expect("finish");
1465
1466        let reader = TraceReader::open(path).expect("open reader");
1467        assert_eq!(reader.event_count(), 2);
1468    }
1469
1470    #[test]
1471    fn write_stops_at_max_file_size() {
1472        let temp = NamedTempFile::new().expect("create temp file");
1473        let path = temp.path();
1474
1475        let metadata = TraceMetadata::new(42);
1476        let meta_len = rmp_serde::to_vec(&metadata)
1477            .expect("serialize metadata")
1478            .len() as u64;
1479        let header_bytes = HEADER_SIZE as u64 + meta_len + 8;
1480
1481        let config = TraceFileConfig::new().with_max_file_size(header_bytes);
1482        let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1483        writer.write_metadata(&metadata).expect("write metadata");
1484        writer
1485            .write_event(&ReplayEvent::RngSeed { seed: 42 })
1486            .expect("write event");
1487        writer.finish().expect("finish");
1488
1489        let reader = TraceReader::open(path).expect("open reader");
1490        assert_eq!(reader.event_count(), 0);
1491    }
1492
1493    #[test]
1494    fn write_limit_callback_invoked() {
1495        let temp = NamedTempFile::new().expect("create temp file");
1496        let path = temp.path();
1497
1498        let hits = Arc::new(AtomicUsize::new(0));
1499        let hit_ref = Arc::clone(&hits);
1500        let action = LimitAction::Callback(Arc::new(move |_info| {
1501            hit_ref.fetch_add(1, Ordering::SeqCst);
1502            LimitAction::StopRecording
1503        }));
1504
1505        let config = TraceFileConfig::new()
1506            .with_max_events(Some(1))
1507            .on_limit(action);
1508        let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1509        writer
1510            .write_metadata(&TraceMetadata::new(42))
1511            .expect("write metadata");
1512        writer
1513            .write_event(&ReplayEvent::RngSeed { seed: 1 })
1514            .expect("write event");
1515        writer
1516            .write_event(&ReplayEvent::RngSeed { seed: 2 })
1517            .expect("write event");
1518        writer.finish().expect("finish");
1519
1520        assert_eq!(hits.load(Ordering::SeqCst), 1);
1521    }
1522
1523    #[test]
1524    #[cfg(target_family = "unix")]
1525    fn disk_full_is_handled() {
1526        let path = std::path::Path::new("/dev/full");
1527        if !path.exists() {
1528            return;
1529        }
1530
1531        let Ok(mut writer) = TraceWriter::create(path) else {
1532            return;
1533        };
1534
1535        // write_metadata buffers to BufWriter, which may not immediately
1536        // write to disk. We need to finish() to flush and detect ENOSPC.
1537        let _ = writer.write_metadata(&TraceMetadata::new(42));
1538        let result = writer.finish();
1539        assert!(matches!(
1540            result,
1541            Err(TraceFileError::Io(err)) if err.raw_os_error() == Some(ENOSPC)
1542        ));
1543    }
1544
1545    // =========================================================================
1546    // Compression Tests (feature-gated)
1547    // =========================================================================
1548
1549    #[cfg(feature = "trace-compression")]
1550    mod compression_tests {
1551        use super::*;
1552
1553        #[test]
1554        fn compressed_write_and_read_roundtrip() {
1555            let temp = NamedTempFile::new().expect("create temp file");
1556            let path = temp.path();
1557
1558            let metadata = TraceMetadata::new(42).with_description("compressed trace");
1559            let events = sample_events();
1560
1561            // Write with compression
1562            let config = TraceFileConfig::new().with_compression(CompressionMode::Lz4 { level: 1 });
1563            let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1564            writer.write_metadata(&metadata).expect("write metadata");
1565            for event in &events {
1566                writer.write_event(event).expect("write event");
1567            }
1568            writer.finish().expect("finish");
1569
1570            // Read (auto-detects compression)
1571            let reader = TraceReader::open(path).expect("open reader");
1572            assert!(reader.is_compressed());
1573            assert_eq!(reader.metadata().seed, metadata.seed);
1574            assert_eq!(reader.event_count(), events.len() as u64);
1575
1576            let read_events = reader.load_all().expect("load all");
1577            assert_eq!(read_events.len(), events.len());
1578            for (orig, read) in events.iter().zip(read_events.iter()) {
1579                assert_eq!(orig, read);
1580            }
1581        }
1582
1583        #[test]
1584        fn compressed_streaming_read() {
1585            let temp = NamedTempFile::new().expect("create temp file");
1586            let path = temp.path();
1587
1588            let metadata = TraceMetadata::new(123);
1589            let events = sample_events();
1590
1591            // Write with compression
1592            let config = TraceFileConfig::new().with_compression(CompressionMode::Lz4 { level: 1 });
1593            let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1594            writer.write_metadata(&metadata).expect("write metadata");
1595            for event in &events {
1596                writer.write_event(event).expect("write event");
1597            }
1598            writer.finish().expect("finish");
1599
1600            // Streaming read
1601            let reader = TraceReader::open(path).expect("open reader");
1602            assert!(reader.is_compressed());
1603
1604            let mut count = 0;
1605            for result in reader.events() {
1606                let event = result.expect("read event");
1607                assert_eq!(event, events[count]);
1608                count += 1;
1609            }
1610            assert_eq!(count, events.len());
1611        }
1612
1613        #[test]
1614        fn large_compressed_trace() {
1615            let temp = NamedTempFile::new().expect("create temp file");
1616            let path = temp.path();
1617
1618            let metadata = TraceMetadata::new(42);
1619            let event_count = 10_000u64;
1620
1621            // Generate large trace
1622            let events: Vec<_> = (0..event_count)
1623                .map(|i| ReplayEvent::TaskScheduled {
1624                    task: CompactTaskId(i),
1625                    at_tick: i,
1626                })
1627                .collect();
1628
1629            // Write with compression
1630            let config = TraceFileConfig::new()
1631                .with_compression(CompressionMode::Lz4 { level: 1 })
1632                .with_chunk_size(8 * 1024); // 8KB chunks for more chunks in test
1633            let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1634            writer.write_metadata(&metadata).expect("write metadata");
1635            for event in &events {
1636                writer.write_event(event).expect("write event");
1637            }
1638            writer.finish().expect("finish");
1639
1640            // Read with streaming
1641            let reader = TraceReader::open(path).expect("open reader");
1642            assert!(reader.is_compressed());
1643            assert_eq!(reader.event_count(), event_count);
1644
1645            let mut count = 0u64;
1646            for result in reader.events() {
1647                let event = result.expect("read event");
1648                if let ReplayEvent::TaskScheduled { task, at_tick } = event {
1649                    assert_eq!(task.0, count);
1650                    assert_eq!(at_tick, count);
1651                } else {
1652                    unreachable!("unexpected event type");
1653                }
1654                count += 1;
1655            }
1656            assert_eq!(count, event_count);
1657        }
1658
1659        #[test]
1660        fn compression_ratio() {
1661            let temp_uncompressed = NamedTempFile::new().expect("create temp file");
1662            let temp_compressed = NamedTempFile::new().expect("create temp file");
1663
1664            let metadata = TraceMetadata::new(42);
1665            let event_count = 5000u64;
1666
1667            // Generate trace with repetitive data (good for compression)
1668            let events: Vec<_> = (0..event_count)
1669                .map(|i| ReplayEvent::TaskScheduled {
1670                    task: CompactTaskId(i % 100), // Repetitive task IDs
1671                    at_tick: i,
1672                })
1673                .collect();
1674
1675            // Write uncompressed
1676            {
1677                let mut writer =
1678                    TraceWriter::create(temp_uncompressed.path()).expect("create writer");
1679                writer.write_metadata(&metadata).expect("write metadata");
1680                for event in &events {
1681                    writer.write_event(event).expect("write event");
1682                }
1683                writer.finish().expect("finish");
1684            }
1685
1686            // Write compressed
1687            {
1688                let config =
1689                    TraceFileConfig::new().with_compression(CompressionMode::Lz4 { level: 1 });
1690                let mut writer = TraceWriter::create_with_config(temp_compressed.path(), config)
1691                    .expect("create writer");
1692                writer.write_metadata(&metadata).expect("write metadata");
1693                for event in &events {
1694                    writer.write_event(event).expect("write event");
1695                }
1696                writer.finish().expect("finish");
1697            }
1698
1699            let uncompressed_size = std::fs::metadata(temp_uncompressed.path())
1700                .expect("metadata")
1701                .len();
1702            let compressed_size = std::fs::metadata(temp_compressed.path())
1703                .expect("metadata")
1704                .len();
1705
1706            #[allow(clippy::cast_precision_loss)]
1707            let ratio = uncompressed_size as f64 / compressed_size as f64;
1708
1709            // LZ4 should achieve at least 2x compression on this repetitive data
1710            assert!(
1711                ratio > 2.0,
1712                "Compression ratio {ratio:.2}x is below expected 2x minimum"
1713            );
1714        }
1715
1716        #[test]
1717        fn compressed_rewind() {
1718            let temp = NamedTempFile::new().expect("create temp file");
1719            let path = temp.path();
1720
1721            let metadata = TraceMetadata::new(42);
1722            let events = sample_events();
1723
1724            // Write with compression
1725            let config = TraceFileConfig::new().with_compression(CompressionMode::Lz4 { level: 1 });
1726            let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1727            writer.write_metadata(&metadata).expect("write metadata");
1728            for event in &events {
1729                writer.write_event(event).expect("write event");
1730            }
1731            writer.finish().expect("finish");
1732
1733            let mut reader = TraceReader::open(path).expect("open reader");
1734            assert!(reader.is_compressed());
1735
1736            // Read first two events
1737            let e1 = reader.read_event().expect("read").expect("event");
1738            let e2 = reader.read_event().expect("read").expect("event");
1739            assert_eq!(reader.events_read(), 2);
1740
1741            // Rewind and verify we get the same events
1742            reader.rewind().expect("rewind");
1743            assert_eq!(reader.events_read(), 0);
1744
1745            let e1_again = reader.read_event().expect("read").expect("event");
1746            let e2_again = reader.read_event().expect("read").expect("event");
1747            assert_eq!(e1, e1_again);
1748            assert_eq!(e2, e2_again);
1749        }
1750
1751        #[test]
1752        fn uncompressed_still_readable() {
1753            let temp = NamedTempFile::new().expect("create temp file");
1754            let path = temp.path();
1755
1756            let metadata = TraceMetadata::new(42);
1757            let events = sample_events();
1758
1759            // Write without compression
1760            write_trace(path, &metadata, &events).expect("write trace");
1761
1762            // Should read successfully and report not compressed
1763            let reader = TraceReader::open(path).expect("open reader");
1764            assert!(!reader.is_compressed());
1765            assert_eq!(reader.event_count(), events.len() as u64);
1766
1767            let read_events = reader.load_all().expect("load all");
1768            assert_eq!(read_events, events);
1769        }
1770    }
1771}