1use super::recorder::{DEFAULT_MAX_FILE_SIZE, LimitAction, LimitKind, LimitReached};
62use super::replay::{REPLAY_SCHEMA_VERSION, ReplayEvent, TraceMetadata};
63use crate::tracing_compat::{error, warn};
64use libc::ENOSPC;
65use std::fs::File;
66use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
67use std::path::Path;
68
69pub const TRACE_MAGIC: &[u8; 11] = b"ASUPERTRACE";
75
76pub const TRACE_FILE_VERSION: u16 = 2;
79
80pub const FLAG_COMPRESSED: u16 = 0x0001;
82
83pub const HEADER_SIZE: usize = 11 + 2 + 2 + 1 + 4;
85
86pub const DEFAULT_COMPRESSION_CHUNK_SIZE: usize = 64 * 1024;
88
89pub const AUTO_COMPRESSION_THRESHOLD: usize = 1024 * 1024;
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
98pub enum CompressionMode {
99 #[default]
101 None,
102
103 #[cfg(feature = "trace-compression")]
108 Lz4 {
109 level: i32,
111 },
112
113 #[cfg(feature = "trace-compression")]
117 Auto,
118}
119
120impl CompressionMode {
121 #[must_use]
123 pub fn is_compressed(&self) -> bool {
124 match self {
125 Self::None => false,
126 #[cfg(feature = "trace-compression")]
127 Self::Lz4 { .. } | Self::Auto => true,
128 }
129 }
130
131 fn to_byte(self) -> u8 {
133 match self {
134 Self::None => 0,
135 #[cfg(feature = "trace-compression")]
136 Self::Lz4 { .. } | Self::Auto => 1,
137 }
138 }
139
140 fn from_byte(byte: u8) -> Option<Self> {
142 match byte {
143 0 => Some(Self::None),
144 #[cfg(feature = "trace-compression")]
145 1 => Some(Self::Lz4 { level: 1 }),
146 #[cfg(not(feature = "trace-compression"))]
147 1 => None, _ => None,
149 }
150 }
151}
152
153#[derive(Debug, Clone)]
155pub struct TraceFileConfig {
156 pub compression: CompressionMode,
158
159 pub chunk_size: usize,
161
162 pub max_events: Option<u64>,
165
166 pub max_file_size: u64,
169
170 pub on_limit: LimitAction,
172}
173
174impl Default for TraceFileConfig {
175 fn default() -> Self {
176 Self {
177 compression: CompressionMode::None,
178 chunk_size: DEFAULT_COMPRESSION_CHUNK_SIZE,
179 max_events: None,
180 max_file_size: DEFAULT_MAX_FILE_SIZE,
181 on_limit: LimitAction::StopRecording,
182 }
183 }
184}
185
186impl TraceFileConfig {
187 #[must_use]
189 pub fn new() -> Self {
190 Self::default()
191 }
192
193 #[must_use]
195 pub fn with_compression(mut self, mode: CompressionMode) -> Self {
196 self.compression = mode;
197 self
198 }
199
200 #[must_use]
202 pub fn with_chunk_size(mut self, size: usize) -> Self {
203 self.chunk_size = size;
204 self
205 }
206
207 #[must_use]
209 pub const fn with_max_events(mut self, max_events: Option<u64>) -> Self {
210 self.max_events = max_events;
211 self
212 }
213
214 #[must_use]
216 pub const fn with_max_file_size(mut self, max_file_size: u64) -> Self {
217 self.max_file_size = max_file_size;
218 self
219 }
220
221 #[must_use]
223 pub fn on_limit(mut self, action: LimitAction) -> Self {
224 self.on_limit = action;
225 self
226 }
227}
228
229#[derive(Debug, thiserror::Error)]
235pub enum TraceFileError {
236 #[error("I/O error: {0}")]
238 Io(#[from] io::Error),
239
240 #[error("invalid magic bytes: not a trace file")]
242 InvalidMagic,
243
244 #[error("unsupported file version: expected <= {expected}, found {found}")]
246 UnsupportedVersion {
247 expected: u16,
249 found: u16,
251 },
252
253 #[error("unsupported flags: {0:#06x}")]
255 UnsupportedFlags(u16),
256
257 #[error("unsupported compression format: {0}")]
259 UnsupportedCompression(u8),
260
261 #[error("file is compressed but trace-compression feature is not enabled")]
263 CompressionNotAvailable,
264
265 #[error("compression error: {0}")]
267 Compression(String),
268
269 #[error("decompression error: {0}")]
271 Decompression(String),
272
273 #[error("serialization error: {0}")]
275 Serialize(String),
276
277 #[error("deserialization error: {0}")]
279 Deserialize(String),
280
281 #[error("schema version mismatch: expected {expected}, found {found}")]
283 SchemaMismatch {
284 expected: u32,
286 found: u32,
288 },
289
290 #[error("writer already finished")]
292 AlreadyFinished,
293
294 #[error("file truncated or corrupt")]
296 Truncated,
297}
298
299impl From<rmp_serde::encode::Error> for TraceFileError {
300 fn from(e: rmp_serde::encode::Error) -> Self {
301 Self::Serialize(e.to_string())
302 }
303}
304
305impl From<rmp_serde::decode::Error> for TraceFileError {
306 fn from(e: rmp_serde::decode::Error) -> Self {
307 Self::Deserialize(e.to_string())
308 }
309}
310
311pub type TraceFileResult<T> = Result<T, TraceFileError>;
313
314pub struct TraceWriter {
335 writer: BufWriter<File>,
336 event_count: u64,
337 event_count_pos: u64,
338 finished: bool,
339 config: TraceFileConfig,
340 bytes_written: u64,
341 buffered_bytes: u64,
342 stopped: bool,
343 halted: bool,
344 #[cfg(feature = "trace-compression")]
346 event_buffer: Vec<u8>,
347}
348
349impl TraceWriter {
350 pub fn create(path: impl AsRef<Path>) -> TraceFileResult<Self> {
356 Self::create_with_config(path, TraceFileConfig::default())
357 }
358
359 pub fn create_with_config(
365 path: impl AsRef<Path>,
366 config: TraceFileConfig,
367 ) -> TraceFileResult<Self> {
368 let file = File::create(path)?;
369 let writer = BufWriter::new(file);
370
371 Ok(Self {
372 writer,
373 event_count: 0,
374 event_count_pos: 0,
375 finished: false,
376 config,
377 bytes_written: 0,
378 buffered_bytes: 0,
379 stopped: false,
380 halted: false,
381 #[cfg(feature = "trace-compression")]
382 event_buffer: Vec::new(),
383 })
384 }
385
386 fn should_write(&self) -> bool {
387 !self.stopped && !self.halted
388 }
389
390 fn resolve_limit_action(&self, info: &LimitReached) -> LimitAction {
391 match &self.config.on_limit {
392 LimitAction::Callback(cb) => (cb)(info.clone()),
393 other => other.clone(),
394 }
395 }
396
397 fn handle_limit(&mut self, info: &LimitReached) -> TraceFileResult<bool> {
398 let mut action = self.resolve_limit_action(info);
399 if matches!(action, LimitAction::Callback(_)) {
400 action = LimitAction::StopRecording;
401 }
402
403 match action {
404 LimitAction::StopRecording => {
405 warn!(
406 kind = ?info.kind,
407 current_events = info.current_events,
408 max_events = ?info.max_events,
409 current_bytes = info.current_bytes,
410 max_bytes = info.max_bytes,
411 "trace write stopped: limit reached"
412 );
413 self.stopped = true;
414 Ok(false)
415 }
416 LimitAction::DropOldest => {
417 warn!(
418 kind = ?info.kind,
419 "trace write stopped: drop-oldest not supported for file writer"
420 );
421 self.stopped = true;
422 Ok(false)
423 }
424 LimitAction::Fail => {
425 error!(
426 kind = ?info.kind,
427 current_events = info.current_events,
428 max_events = ?info.max_events,
429 current_bytes = info.current_bytes,
430 max_bytes = info.max_bytes,
431 "trace write failed: limit exceeded"
432 );
433 self.stopped = true;
434 Err(TraceFileError::Io(io::Error::other(
435 "trace write limit exceeded",
436 )))
437 }
438 LimitAction::Callback(_) => {
439 self.stopped = true;
440 Ok(false)
441 }
442 }
443 }
444
445 fn is_disk_full(err: &io::Error) -> bool {
446 err.raw_os_error() == Some(ENOSPC)
447 }
448
449 fn handle_disk_full(&mut self, err: io::Error) -> TraceFileError {
450 warn!("trace write halted: disk full (ENOSPC). Free space and retry recording.");
451 self.halted = true;
452 TraceFileError::Io(err)
453 }
454
455 fn write_bytes(&mut self, bytes: &[u8]) -> TraceFileResult<()> {
456 if self.halted {
457 return Ok(());
458 }
459 match self.writer.write_all(bytes) {
460 Ok(()) => {
461 self.bytes_written = self.bytes_written.saturating_add(bytes.len() as u64);
462 Ok(())
463 }
464 Err(err) if Self::is_disk_full(&err) => Err(self.handle_disk_full(err)),
465 Err(err) => Err(TraceFileError::Io(err)),
466 }
467 }
468
469 fn update_event_count(&mut self) -> TraceFileResult<()> {
470 let file = self.writer.get_mut();
471 file.seek(SeekFrom::Start(self.event_count_pos))?;
472 file.write_all(&self.event_count.to_le_bytes())?;
473 file.flush()?;
474 Ok(())
475 }
476
477 fn update_event_count_best_effort(&mut self) {
478 if let Err(err) = self.update_event_count() {
479 if matches!(
480 &err,
481 TraceFileError::Io(io_err) if Self::is_disk_full(io_err)
482 ) {
483 warn!("trace event count update skipped: disk full");
484 }
485 warn!("trace event count update skipped: {err}");
486 }
487 }
488
489 pub fn write_metadata(&mut self, metadata: &TraceMetadata) -> TraceFileResult<()> {
498 if self.finished {
499 return Err(TraceFileError::AlreadyFinished);
500 }
501
502 let meta_bytes = rmp_serde::to_vec(metadata)?;
504
505 let flags = if self.config.compression.is_compressed() {
507 FLAG_COMPRESSED
508 } else {
509 0
510 };
511
512 self.write_bytes(TRACE_MAGIC)?;
514 self.write_bytes(&TRACE_FILE_VERSION.to_le_bytes())?;
515 self.write_bytes(&flags.to_le_bytes())?;
516 self.write_bytes(&[self.config.compression.to_byte()])?; let meta_len = meta_bytes.len() as u32;
520 self.write_bytes(&meta_len.to_le_bytes())?;
521 self.write_bytes(&meta_bytes)?;
522
523 self.event_count_pos = HEADER_SIZE as u64 + u64::from(meta_len);
525 self.write_bytes(&0u64.to_le_bytes())?;
526
527 Ok(())
528 }
529
530 pub fn write_event(&mut self, event: &ReplayEvent) -> TraceFileResult<()> {
539 if self.finished {
540 return Err(TraceFileError::AlreadyFinished);
541 }
542 if !self.should_write() {
543 return Ok(());
544 }
545
546 if let Some(max_events) = self.config.max_events {
547 if self.event_count.saturating_add(1) > max_events {
548 let info = LimitReached {
549 kind: LimitKind::MaxEvents,
550 current_events: self.event_count,
551 max_events: Some(max_events),
552 current_bytes: self.bytes_written,
553 max_bytes: self.config.max_file_size,
554 needed_bytes: 0,
555 };
556 if !self.handle_limit(&info)? {
557 return Ok(());
558 }
559 }
560 }
561
562 let event_bytes = rmp_serde::to_vec(event)?;
564 let len = event_bytes.len() as u32;
565 let estimated_bytes = 4u64 + event_bytes.len() as u64;
566 let pending_bytes = self.bytes_written.saturating_add(self.buffered_bytes);
567
568 if self.config.max_file_size > 0
569 && pending_bytes.saturating_add(estimated_bytes) > self.config.max_file_size
570 {
571 let info = LimitReached {
572 kind: LimitKind::MaxFileSize,
573 current_events: self.event_count,
574 max_events: self.config.max_events,
575 current_bytes: pending_bytes,
576 max_bytes: self.config.max_file_size,
577 needed_bytes: estimated_bytes,
578 };
579 if !self.handle_limit(&info)? {
580 return Ok(());
581 }
582 }
583
584 #[cfg(feature = "trace-compression")]
585 if self.config.compression.is_compressed() {
586 self.event_buffer.extend_from_slice(&len.to_le_bytes());
588 self.event_buffer.extend_from_slice(&event_bytes);
589 self.buffered_bytes = self.buffered_bytes.saturating_add(estimated_bytes);
590 self.event_count += 1;
591
592 if self.event_buffer.len() >= self.config.chunk_size {
594 self.flush_compressed_chunk()?;
595 }
596 return Ok(());
597 }
598
599 self.write_bytes(&len.to_le_bytes())?;
601 self.write_bytes(&event_bytes)?;
602 self.event_count += 1;
603 Ok(())
604 }
605
606 #[cfg(feature = "trace-compression")]
608 fn flush_compressed_chunk(&mut self) -> TraceFileResult<()> {
609 if self.event_buffer.is_empty() {
610 return Ok(());
611 }
612
613 let compressed = lz4_flex::compress_prepend_size(&self.event_buffer);
615
616 let chunk_len = compressed.len() as u32;
618 self.write_bytes(&chunk_len.to_le_bytes())?;
619 self.write_bytes(&compressed)?;
620
621 self.event_buffer.clear();
622 self.buffered_bytes = 0;
623 Ok(())
624 }
625
626 pub fn finish(mut self) -> TraceFileResult<()> {
636 self.finished = true;
637
638 #[cfg(feature = "trace-compression")]
640 if self.config.compression.is_compressed() {
641 self.flush_compressed_chunk()?;
642 }
643
644 if self.halted {
645 let _ = self.writer.flush();
646 self.update_event_count_best_effort();
647 return Ok(());
648 }
649
650 self.writer.flush()?;
652
653 self.update_event_count()?;
655
656 Ok(())
657 }
658
659 #[must_use]
661 pub fn event_count(&self) -> u64 {
662 self.event_count
663 }
664}
665
666impl Drop for TraceWriter {
667 fn drop(&mut self) {
668 if !self.finished {
669 let _ = self.writer.flush();
671 }
672 }
673}
674
675pub struct TraceReader {
698 reader: BufReader<File>,
699 metadata: TraceMetadata,
700 event_count: u64,
701 events_read: u64,
702 events_start_pos: u64,
703 compression: CompressionMode,
704 #[cfg(feature = "trace-compression")]
706 decompressed_buffer: Vec<u8>,
707 #[cfg(feature = "trace-compression")]
709 buffer_pos: usize,
710}
711
712impl TraceReader {
713 pub fn open(path: impl AsRef<Path>) -> TraceFileResult<Self> {
726 let file = File::open(path)?;
727 let mut reader = BufReader::new(file);
728
729 let mut magic = [0u8; 11];
731 reader.read_exact(&mut magic)?;
732 if &magic != TRACE_MAGIC {
733 return Err(TraceFileError::InvalidMagic);
734 }
735
736 let mut version_bytes = [0u8; 2];
738 reader.read_exact(&mut version_bytes)?;
739 let version = u16::from_le_bytes(version_bytes);
740 if version > TRACE_FILE_VERSION {
741 return Err(TraceFileError::UnsupportedVersion {
742 expected: TRACE_FILE_VERSION,
743 found: version,
744 });
745 }
746
747 let mut flags_bytes = [0u8; 2];
749 reader.read_exact(&mut flags_bytes)?;
750 let flags = u16::from_le_bytes(flags_bytes);
751 let is_compressed = flags & FLAG_COMPRESSED != 0;
752
753 let compression = if version >= 2 {
755 let mut comp_byte = [0u8; 1];
756 reader.read_exact(&mut comp_byte)?;
757 match CompressionMode::from_byte(comp_byte[0]) {
758 Some(mode) => mode,
759 None if is_compressed => {
760 return Err(TraceFileError::UnsupportedCompression(comp_byte[0]));
761 }
762 None => CompressionMode::None,
763 }
764 } else {
765 if is_compressed {
767 return Err(TraceFileError::UnsupportedFlags(flags));
768 }
769 CompressionMode::None
770 };
771
772 #[cfg(not(feature = "trace-compression"))]
774 if compression.is_compressed() {
775 return Err(TraceFileError::CompressionNotAvailable);
776 }
777
778 let mut meta_len_bytes = [0u8; 4];
780 reader.read_exact(&mut meta_len_bytes)?;
781 let meta_len = u32::from_le_bytes(meta_len_bytes) as usize;
782
783 let mut meta_bytes = vec![0u8; meta_len];
785 reader.read_exact(&mut meta_bytes)?;
786 let metadata: TraceMetadata = rmp_serde::from_slice(&meta_bytes)?;
787
788 if metadata.version != REPLAY_SCHEMA_VERSION {
790 return Err(TraceFileError::SchemaMismatch {
791 expected: REPLAY_SCHEMA_VERSION,
792 found: metadata.version,
793 });
794 }
795
796 let mut event_count_bytes = [0u8; 8];
798 reader.read_exact(&mut event_count_bytes)?;
799 let event_count = u64::from_le_bytes(event_count_bytes);
800
801 let header_size = if version >= 2 {
803 HEADER_SIZE
804 } else {
805 HEADER_SIZE - 1
806 };
807 let events_start_pos = header_size as u64 + meta_len as u64 + 8;
808
809 Ok(Self {
810 reader,
811 metadata,
812 event_count,
813 events_read: 0,
814 events_start_pos,
815 compression,
816 #[cfg(feature = "trace-compression")]
817 decompressed_buffer: Vec::new(),
818 #[cfg(feature = "trace-compression")]
819 buffer_pos: 0,
820 })
821 }
822
823 #[must_use]
825 pub fn is_compressed(&self) -> bool {
826 self.compression.is_compressed()
827 }
828
829 #[must_use]
831 pub fn compression(&self) -> CompressionMode {
832 self.compression
833 }
834
835 #[must_use]
837 pub fn metadata(&self) -> &TraceMetadata {
838 &self.metadata
839 }
840
841 #[must_use]
843 pub fn event_count(&self) -> u64 {
844 self.event_count
845 }
846
847 #[must_use]
849 pub fn events_read(&self) -> u64 {
850 self.events_read
851 }
852
853 #[must_use]
858 pub fn events(self) -> TraceEventIterator {
859 TraceEventIterator {
860 reader: self.reader,
861 remaining: self.event_count,
862 compression: self.compression,
863 #[cfg(feature = "trace-compression")]
864 decompressed_buffer: self.decompressed_buffer,
865 #[cfg(feature = "trace-compression")]
866 buffer_pos: self.buffer_pos,
867 }
868 }
869
870 pub fn read_event(&mut self) -> TraceFileResult<Option<ReplayEvent>> {
879 if self.events_read >= self.event_count {
880 return Ok(None);
881 }
882
883 #[cfg(feature = "trace-compression")]
884 if self.compression.is_compressed() {
885 return self.read_compressed_event();
886 }
887
888 self.read_uncompressed_event()
890 }
891
892 fn read_uncompressed_event(&mut self) -> TraceFileResult<Option<ReplayEvent>> {
894 let mut len_bytes = [0u8; 4];
896 if self.reader.read_exact(&mut len_bytes).is_err() {
897 return Ok(None);
898 }
899 let len = u32::from_le_bytes(len_bytes) as usize;
900
901 let mut event_bytes = vec![0u8; len];
903 self.reader.read_exact(&mut event_bytes)?;
904
905 let event: ReplayEvent = rmp_serde::from_slice(&event_bytes)?;
906 self.events_read += 1;
907
908 Ok(Some(event))
909 }
910
911 #[cfg(feature = "trace-compression")]
913 fn read_compressed_event(&mut self) -> TraceFileResult<Option<ReplayEvent>> {
914 if self.buffer_pos >= self.decompressed_buffer.len()
916 && !self.refill_decompressed_buffer()?
917 {
918 return Ok(None);
919 }
920
921 if self.buffer_pos + 4 > self.decompressed_buffer.len() {
923 return Err(TraceFileError::Truncated);
924 }
925 let len_bytes: [u8; 4] = self.decompressed_buffer[self.buffer_pos..self.buffer_pos + 4]
926 .try_into()
927 .map_err(|_| TraceFileError::Truncated)?;
928 let len = u32::from_le_bytes(len_bytes) as usize;
929 self.buffer_pos += 4;
930
931 if self.buffer_pos + len > self.decompressed_buffer.len() {
933 return Err(TraceFileError::Truncated);
934 }
935 let event_bytes = &self.decompressed_buffer[self.buffer_pos..self.buffer_pos + len];
936 let event: ReplayEvent = rmp_serde::from_slice(event_bytes)?;
937 self.buffer_pos += len;
938
939 self.events_read += 1;
940 Ok(Some(event))
941 }
942
943 #[cfg(feature = "trace-compression")]
945 fn refill_decompressed_buffer(&mut self) -> TraceFileResult<bool> {
946 let mut chunk_len_bytes = [0u8; 4];
948 match self.reader.read_exact(&mut chunk_len_bytes) {
949 Ok(()) => {}
950 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(false),
951 Err(e) => return Err(TraceFileError::Io(e)),
952 }
953 let chunk_len = u32::from_le_bytes(chunk_len_bytes) as usize;
954
955 if chunk_len == 0 {
956 return Ok(false);
957 }
958
959 let mut compressed = vec![0u8; chunk_len];
961 self.reader.read_exact(&mut compressed)?;
962
963 self.decompressed_buffer = lz4_flex::decompress_size_prepended(&compressed)
965 .map_err(|e| TraceFileError::Decompression(e.to_string()))?;
966 self.buffer_pos = 0;
967
968 Ok(true)
969 }
970
971 pub fn rewind(&mut self) -> TraceFileResult<()> {
977 self.reader.seek(SeekFrom::Start(self.events_start_pos))?;
978 self.events_read = 0;
979
980 #[cfg(feature = "trace-compression")]
981 {
982 self.decompressed_buffer.clear();
983 self.buffer_pos = 0;
984 }
985
986 Ok(())
987 }
988
989 pub fn load_all(mut self) -> TraceFileResult<Vec<ReplayEvent>> {
998 let mut events = Vec::with_capacity(self.event_count as usize);
999 while let Some(event) = self.read_event()? {
1000 events.push(event);
1001 }
1002 Ok(events)
1003 }
1004}
1005
1006pub struct TraceEventIterator {
1012 reader: BufReader<File>,
1013 remaining: u64,
1014 compression: CompressionMode,
1015 #[cfg(feature = "trace-compression")]
1017 decompressed_buffer: Vec<u8>,
1018 #[cfg(feature = "trace-compression")]
1020 buffer_pos: usize,
1021}
1022
1023impl Iterator for TraceEventIterator {
1024 type Item = TraceFileResult<ReplayEvent>;
1025
1026 fn next(&mut self) -> Option<Self::Item> {
1027 if self.remaining == 0 {
1028 return None;
1029 }
1030
1031 #[cfg(feature = "trace-compression")]
1032 if self.compression.is_compressed() {
1033 return self.next_compressed();
1034 }
1035
1036 self.next_uncompressed()
1037 }
1038
1039 fn size_hint(&self) -> (usize, Option<usize>) {
1040 let remaining = self.remaining as usize;
1041 (remaining, Some(remaining))
1042 }
1043}
1044
1045impl TraceEventIterator {
1046 fn next_uncompressed(&mut self) -> Option<TraceFileResult<ReplayEvent>> {
1048 let mut len_bytes = [0u8; 4];
1050 if let Err(e) = self.reader.read_exact(&mut len_bytes) {
1051 if e.kind() == io::ErrorKind::UnexpectedEof {
1052 return None;
1053 }
1054 return Some(Err(TraceFileError::Io(e)));
1055 }
1056 let len = u32::from_le_bytes(len_bytes) as usize;
1057
1058 let mut event_bytes = vec![0u8; len];
1060 if let Err(e) = self.reader.read_exact(&mut event_bytes) {
1061 return Some(Err(TraceFileError::Io(e)));
1062 }
1063
1064 match rmp_serde::from_slice(&event_bytes) {
1065 Ok(event) => {
1066 self.remaining -= 1;
1067 Some(Ok(event))
1068 }
1069 Err(e) => Some(Err(TraceFileError::from(e))),
1070 }
1071 }
1072
1073 #[cfg(feature = "trace-compression")]
1075 fn next_compressed(&mut self) -> Option<TraceFileResult<ReplayEvent>> {
1076 if self.buffer_pos >= self.decompressed_buffer.len() {
1078 match self.refill_buffer() {
1079 Ok(true) => {}
1080 Ok(false) => return None,
1081 Err(e) => return Some(Err(e)),
1082 }
1083 }
1084
1085 if self.buffer_pos + 4 > self.decompressed_buffer.len() {
1087 return Some(Err(TraceFileError::Truncated));
1088 }
1089 let len_bytes: [u8; 4] =
1090 match self.decompressed_buffer[self.buffer_pos..self.buffer_pos + 4].try_into() {
1091 Ok(b) => b,
1092 Err(_) => return Some(Err(TraceFileError::Truncated)),
1093 };
1094 let len = u32::from_le_bytes(len_bytes) as usize;
1095 self.buffer_pos += 4;
1096
1097 if self.buffer_pos + len > self.decompressed_buffer.len() {
1099 return Some(Err(TraceFileError::Truncated));
1100 }
1101 let event_bytes = &self.decompressed_buffer[self.buffer_pos..self.buffer_pos + len];
1102
1103 match rmp_serde::from_slice(event_bytes) {
1104 Ok(event) => {
1105 self.buffer_pos += len;
1106 self.remaining -= 1;
1107 Some(Ok(event))
1108 }
1109 Err(e) => Some(Err(TraceFileError::from(e))),
1110 }
1111 }
1112
1113 #[cfg(feature = "trace-compression")]
1115 fn refill_buffer(&mut self) -> TraceFileResult<bool> {
1116 let mut chunk_len_bytes = [0u8; 4];
1118 match self.reader.read_exact(&mut chunk_len_bytes) {
1119 Ok(()) => {}
1120 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(false),
1121 Err(e) => return Err(TraceFileError::Io(e)),
1122 }
1123 let chunk_len = u32::from_le_bytes(chunk_len_bytes) as usize;
1124
1125 if chunk_len == 0 {
1126 return Ok(false);
1127 }
1128
1129 let mut compressed = vec![0u8; chunk_len];
1131 self.reader.read_exact(&mut compressed)?;
1132
1133 self.decompressed_buffer = lz4_flex::decompress_size_prepended(&compressed)
1135 .map_err(|e| TraceFileError::Decompression(e.to_string()))?;
1136 self.buffer_pos = 0;
1137
1138 Ok(true)
1139 }
1140}
1141
1142impl ExactSizeIterator for TraceEventIterator {}
1143
1144pub fn write_trace(
1157 path: impl AsRef<Path>,
1158 metadata: &TraceMetadata,
1159 events: &[ReplayEvent],
1160) -> TraceFileResult<()> {
1161 let mut writer = TraceWriter::create(path)?;
1162 writer.write_metadata(metadata)?;
1163 for event in events {
1164 writer.write_event(event)?;
1165 }
1166 writer.finish()
1167}
1168
1169pub fn read_trace(path: impl AsRef<Path>) -> TraceFileResult<(TraceMetadata, Vec<ReplayEvent>)> {
1178 let reader = TraceReader::open(path)?;
1179 let metadata = reader.metadata().clone();
1180 let events = reader.load_all()?;
1181 Ok((metadata, events))
1182}
1183
1184#[cfg(test)]
1189mod tests {
1190 use super::*;
1191 use crate::trace::replay::CompactTaskId;
1192 use std::sync::Arc;
1193 use std::sync::atomic::{AtomicUsize, Ordering};
1194 use tempfile::NamedTempFile;
1195
1196 fn sample_events() -> Vec<ReplayEvent> {
1197 vec![
1198 ReplayEvent::RngSeed { seed: 42 },
1199 ReplayEvent::TaskScheduled {
1200 task: CompactTaskId(1),
1201 at_tick: 0,
1202 },
1203 ReplayEvent::TimeAdvanced {
1204 from_nanos: 0,
1205 to_nanos: 1_000_000,
1206 },
1207 ReplayEvent::TaskYielded {
1208 task: CompactTaskId(1),
1209 },
1210 ReplayEvent::TaskScheduled {
1211 task: CompactTaskId(1),
1212 at_tick: 1,
1213 },
1214 ReplayEvent::TaskCompleted {
1215 task: CompactTaskId(1),
1216 outcome: 0,
1217 },
1218 ]
1219 }
1220
1221 #[test]
1226 fn compression_mode_debug_clone_copy_eq_default() {
1227 let def = CompressionMode::default();
1228 assert_eq!(def, CompressionMode::None);
1229 let copied = def;
1230 let cloned = def;
1231 assert_eq!(copied, cloned);
1232 assert!(!def.is_compressed());
1233 let dbg = format!("{def:?}");
1234 assert!(dbg.contains("None"));
1235 }
1236
1237 #[test]
1238 fn trace_file_config_debug_clone_default() {
1239 let def = TraceFileConfig::default();
1240 assert_eq!(def.compression, CompressionMode::None);
1241 assert_eq!(def.chunk_size, DEFAULT_COMPRESSION_CHUNK_SIZE);
1242 assert!(def.max_events.is_none());
1243 let cloned = def.clone();
1244 assert_eq!(cloned.compression, CompressionMode::None);
1245 let dbg = format!("{def:?}");
1246 assert!(dbg.contains("TraceFileConfig"));
1247 }
1248
1249 #[test]
1250 fn trace_file_error_debug_display() {
1251 let err = TraceFileError::InvalidMagic;
1252 let dbg = format!("{err:?}");
1253 assert!(dbg.contains("InvalidMagic"));
1254 let display = format!("{err}");
1255 assert!(display.contains("magic"));
1256
1257 let version_err = TraceFileError::UnsupportedVersion {
1258 expected: 2,
1259 found: 99,
1260 };
1261 let display2 = format!("{version_err}");
1262 assert!(display2.contains("99"));
1263 }
1264
1265 #[test]
1266 fn write_and_read_roundtrip() {
1267 let temp = NamedTempFile::new().expect("create temp file");
1268 let path = temp.path();
1269
1270 let metadata = TraceMetadata::new(42).with_description("test trace");
1271 let events = sample_events();
1272
1273 write_trace(path, &metadata, &events).expect("write trace");
1275
1276 let (read_meta, read_events) = read_trace(path).expect("read trace");
1278
1279 assert_eq!(read_meta.seed, metadata.seed);
1280 assert_eq!(read_meta.description, metadata.description);
1281 assert_eq!(read_events.len(), events.len());
1282
1283 for (orig, read) in events.iter().zip(read_events.iter()) {
1284 assert_eq!(orig, read);
1285 }
1286 }
1287
1288 #[test]
1289 fn streaming_write_and_read() {
1290 let temp = NamedTempFile::new().expect("create temp file");
1291 let path = temp.path();
1292
1293 let metadata = TraceMetadata::new(123);
1294 let events = sample_events();
1295
1296 {
1298 let mut writer = TraceWriter::create(path).expect("create writer");
1299 writer.write_metadata(&metadata).expect("write metadata");
1300 for event in &events {
1301 writer.write_event(event).expect("write event");
1302 }
1303 assert_eq!(writer.event_count(), events.len() as u64);
1304 writer.finish().expect("finish");
1305 }
1306
1307 {
1309 let reader = TraceReader::open(path).expect("open reader");
1310 assert_eq!(reader.metadata().seed, 123);
1311 assert_eq!(reader.event_count(), events.len() as u64);
1312
1313 let mut count = 0;
1314 for result in reader.events() {
1315 let event = result.expect("read event");
1316 assert_eq!(event, events[count]);
1317 count += 1;
1318 }
1319 assert_eq!(count, events.len());
1320 }
1321 }
1322
1323 #[test]
1324 fn reader_rewind() {
1325 let temp = NamedTempFile::new().expect("create temp file");
1326 let path = temp.path();
1327
1328 let metadata = TraceMetadata::new(42);
1329 let events = sample_events();
1330 write_trace(path, &metadata, &events).expect("write trace");
1331
1332 let mut reader = TraceReader::open(path).expect("open reader");
1333
1334 let e1 = reader.read_event().expect("read").expect("event");
1336 let e2 = reader.read_event().expect("read").expect("event");
1337 assert_eq!(reader.events_read(), 2);
1338
1339 reader.rewind().expect("rewind");
1341 assert_eq!(reader.events_read(), 0);
1342
1343 let e1_again = reader.read_event().expect("read").expect("event");
1344 let e2_again = reader.read_event().expect("read").expect("event");
1345 assert_eq!(e1, e1_again);
1346 assert_eq!(e2, e2_again);
1347 }
1348
1349 #[test]
1350 fn empty_trace() {
1351 let temp = NamedTempFile::new().expect("create temp file");
1352 let path = temp.path();
1353
1354 let metadata = TraceMetadata::new(0);
1355 write_trace(path, &metadata, &[]).expect("write empty trace");
1356
1357 let (read_meta, read_events) = read_trace(path).expect("read empty trace");
1358 assert_eq!(read_meta.seed, 0);
1359 assert!(read_events.is_empty());
1360 }
1361
1362 #[test]
1363 fn large_trace() {
1364 let temp = NamedTempFile::new().expect("create temp file");
1365 let path = temp.path();
1366
1367 let metadata = TraceMetadata::new(42);
1368 let event_count = 10_000;
1369
1370 let events: Vec<_> = (0..event_count)
1372 .map(|i| ReplayEvent::TaskScheduled {
1373 task: CompactTaskId(i),
1374 at_tick: i,
1375 })
1376 .collect();
1377
1378 write_trace(path, &metadata, &events).expect("write large trace");
1379
1380 let reader = TraceReader::open(path).expect("open reader");
1382 assert_eq!(reader.event_count(), event_count);
1383
1384 let mut count = 0u64;
1385 for result in reader.events() {
1386 let event = result.expect("read event");
1387 if let ReplayEvent::TaskScheduled { task, at_tick } = event {
1388 assert_eq!(task.0, count);
1389 assert_eq!(at_tick, count);
1390 } else {
1391 unreachable!("unexpected event type");
1392 }
1393 count += 1;
1394 }
1395 assert_eq!(count, event_count);
1396 }
1397
1398 #[test]
1399 fn invalid_magic() {
1400 let temp = NamedTempFile::new().expect("create temp file");
1401 let path = temp.path();
1402
1403 std::fs::write(path, b"NOT A TRACE FILE").expect("write garbage");
1405
1406 let result = TraceReader::open(path);
1407 assert!(matches!(result, Err(TraceFileError::InvalidMagic)));
1408 }
1409
1410 #[test]
1411 fn file_size_reasonable() {
1412 let temp = NamedTempFile::new().expect("create temp file");
1413 let path = temp.path();
1414
1415 let metadata = TraceMetadata::new(42);
1416 let events: Vec<_> = (0..1000)
1417 .map(|i| ReplayEvent::TaskScheduled {
1418 task: CompactTaskId(i),
1419 at_tick: i,
1420 })
1421 .collect();
1422
1423 write_trace(path, &metadata, &events).expect("write trace");
1424
1425 let file_size = std::fs::metadata(path).expect("metadata").len();
1426 let file_size = u32::try_from(file_size).expect("trace file size fits u32 for test");
1427 let bytes_per_event = f64::from(file_size) / 1000.0;
1428
1429 assert!(
1431 bytes_per_event < 40.0,
1432 "File size too large: {bytes_per_event:.1} bytes/event"
1433 );
1434 }
1435
1436 #[test]
1437 fn writer_already_finished_error() {
1438 let temp = NamedTempFile::new().expect("create temp file");
1439 let path = temp.path();
1440
1441 let mut writer = TraceWriter::create(path).expect("create writer");
1442 writer
1443 .write_metadata(&TraceMetadata::new(42))
1444 .expect("write metadata");
1445 writer.finish().expect("finish");
1446
1447 }
1450
1451 #[test]
1452 fn write_stops_at_max_events() {
1453 let temp = NamedTempFile::new().expect("create temp file");
1454 let path = temp.path();
1455 let metadata = TraceMetadata::new(42);
1456 let events = sample_events();
1457
1458 let config = TraceFileConfig::new().with_max_events(Some(2));
1459 let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1460 writer.write_metadata(&metadata).expect("write metadata");
1461 for event in &events {
1462 writer.write_event(event).expect("write event");
1463 }
1464 writer.finish().expect("finish");
1465
1466 let reader = TraceReader::open(path).expect("open reader");
1467 assert_eq!(reader.event_count(), 2);
1468 }
1469
1470 #[test]
1471 fn write_stops_at_max_file_size() {
1472 let temp = NamedTempFile::new().expect("create temp file");
1473 let path = temp.path();
1474
1475 let metadata = TraceMetadata::new(42);
1476 let meta_len = rmp_serde::to_vec(&metadata)
1477 .expect("serialize metadata")
1478 .len() as u64;
1479 let header_bytes = HEADER_SIZE as u64 + meta_len + 8;
1480
1481 let config = TraceFileConfig::new().with_max_file_size(header_bytes);
1482 let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1483 writer.write_metadata(&metadata).expect("write metadata");
1484 writer
1485 .write_event(&ReplayEvent::RngSeed { seed: 42 })
1486 .expect("write event");
1487 writer.finish().expect("finish");
1488
1489 let reader = TraceReader::open(path).expect("open reader");
1490 assert_eq!(reader.event_count(), 0);
1491 }
1492
1493 #[test]
1494 fn write_limit_callback_invoked() {
1495 let temp = NamedTempFile::new().expect("create temp file");
1496 let path = temp.path();
1497
1498 let hits = Arc::new(AtomicUsize::new(0));
1499 let hit_ref = Arc::clone(&hits);
1500 let action = LimitAction::Callback(Arc::new(move |_info| {
1501 hit_ref.fetch_add(1, Ordering::SeqCst);
1502 LimitAction::StopRecording
1503 }));
1504
1505 let config = TraceFileConfig::new()
1506 .with_max_events(Some(1))
1507 .on_limit(action);
1508 let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1509 writer
1510 .write_metadata(&TraceMetadata::new(42))
1511 .expect("write metadata");
1512 writer
1513 .write_event(&ReplayEvent::RngSeed { seed: 1 })
1514 .expect("write event");
1515 writer
1516 .write_event(&ReplayEvent::RngSeed { seed: 2 })
1517 .expect("write event");
1518 writer.finish().expect("finish");
1519
1520 assert_eq!(hits.load(Ordering::SeqCst), 1);
1521 }
1522
1523 #[test]
1524 #[cfg(target_family = "unix")]
1525 fn disk_full_is_handled() {
1526 let path = std::path::Path::new("/dev/full");
1527 if !path.exists() {
1528 return;
1529 }
1530
1531 let Ok(mut writer) = TraceWriter::create(path) else {
1532 return;
1533 };
1534
1535 let _ = writer.write_metadata(&TraceMetadata::new(42));
1538 let result = writer.finish();
1539 assert!(matches!(
1540 result,
1541 Err(TraceFileError::Io(err)) if err.raw_os_error() == Some(ENOSPC)
1542 ));
1543 }
1544
1545 #[cfg(feature = "trace-compression")]
1550 mod compression_tests {
1551 use super::*;
1552
1553 #[test]
1554 fn compressed_write_and_read_roundtrip() {
1555 let temp = NamedTempFile::new().expect("create temp file");
1556 let path = temp.path();
1557
1558 let metadata = TraceMetadata::new(42).with_description("compressed trace");
1559 let events = sample_events();
1560
1561 let config = TraceFileConfig::new().with_compression(CompressionMode::Lz4 { level: 1 });
1563 let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1564 writer.write_metadata(&metadata).expect("write metadata");
1565 for event in &events {
1566 writer.write_event(event).expect("write event");
1567 }
1568 writer.finish().expect("finish");
1569
1570 let reader = TraceReader::open(path).expect("open reader");
1572 assert!(reader.is_compressed());
1573 assert_eq!(reader.metadata().seed, metadata.seed);
1574 assert_eq!(reader.event_count(), events.len() as u64);
1575
1576 let read_events = reader.load_all().expect("load all");
1577 assert_eq!(read_events.len(), events.len());
1578 for (orig, read) in events.iter().zip(read_events.iter()) {
1579 assert_eq!(orig, read);
1580 }
1581 }
1582
1583 #[test]
1584 fn compressed_streaming_read() {
1585 let temp = NamedTempFile::new().expect("create temp file");
1586 let path = temp.path();
1587
1588 let metadata = TraceMetadata::new(123);
1589 let events = sample_events();
1590
1591 let config = TraceFileConfig::new().with_compression(CompressionMode::Lz4 { level: 1 });
1593 let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1594 writer.write_metadata(&metadata).expect("write metadata");
1595 for event in &events {
1596 writer.write_event(event).expect("write event");
1597 }
1598 writer.finish().expect("finish");
1599
1600 let reader = TraceReader::open(path).expect("open reader");
1602 assert!(reader.is_compressed());
1603
1604 let mut count = 0;
1605 for result in reader.events() {
1606 let event = result.expect("read event");
1607 assert_eq!(event, events[count]);
1608 count += 1;
1609 }
1610 assert_eq!(count, events.len());
1611 }
1612
1613 #[test]
1614 fn large_compressed_trace() {
1615 let temp = NamedTempFile::new().expect("create temp file");
1616 let path = temp.path();
1617
1618 let metadata = TraceMetadata::new(42);
1619 let event_count = 10_000u64;
1620
1621 let events: Vec<_> = (0..event_count)
1623 .map(|i| ReplayEvent::TaskScheduled {
1624 task: CompactTaskId(i),
1625 at_tick: i,
1626 })
1627 .collect();
1628
1629 let config = TraceFileConfig::new()
1631 .with_compression(CompressionMode::Lz4 { level: 1 })
1632 .with_chunk_size(8 * 1024); let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1634 writer.write_metadata(&metadata).expect("write metadata");
1635 for event in &events {
1636 writer.write_event(event).expect("write event");
1637 }
1638 writer.finish().expect("finish");
1639
1640 let reader = TraceReader::open(path).expect("open reader");
1642 assert!(reader.is_compressed());
1643 assert_eq!(reader.event_count(), event_count);
1644
1645 let mut count = 0u64;
1646 for result in reader.events() {
1647 let event = result.expect("read event");
1648 if let ReplayEvent::TaskScheduled { task, at_tick } = event {
1649 assert_eq!(task.0, count);
1650 assert_eq!(at_tick, count);
1651 } else {
1652 unreachable!("unexpected event type");
1653 }
1654 count += 1;
1655 }
1656 assert_eq!(count, event_count);
1657 }
1658
1659 #[test]
1660 fn compression_ratio() {
1661 let temp_uncompressed = NamedTempFile::new().expect("create temp file");
1662 let temp_compressed = NamedTempFile::new().expect("create temp file");
1663
1664 let metadata = TraceMetadata::new(42);
1665 let event_count = 5000u64;
1666
1667 let events: Vec<_> = (0..event_count)
1669 .map(|i| ReplayEvent::TaskScheduled {
1670 task: CompactTaskId(i % 100), at_tick: i,
1672 })
1673 .collect();
1674
1675 {
1677 let mut writer =
1678 TraceWriter::create(temp_uncompressed.path()).expect("create writer");
1679 writer.write_metadata(&metadata).expect("write metadata");
1680 for event in &events {
1681 writer.write_event(event).expect("write event");
1682 }
1683 writer.finish().expect("finish");
1684 }
1685
1686 {
1688 let config =
1689 TraceFileConfig::new().with_compression(CompressionMode::Lz4 { level: 1 });
1690 let mut writer = TraceWriter::create_with_config(temp_compressed.path(), config)
1691 .expect("create writer");
1692 writer.write_metadata(&metadata).expect("write metadata");
1693 for event in &events {
1694 writer.write_event(event).expect("write event");
1695 }
1696 writer.finish().expect("finish");
1697 }
1698
1699 let uncompressed_size = std::fs::metadata(temp_uncompressed.path())
1700 .expect("metadata")
1701 .len();
1702 let compressed_size = std::fs::metadata(temp_compressed.path())
1703 .expect("metadata")
1704 .len();
1705
1706 #[allow(clippy::cast_precision_loss)]
1707 let ratio = uncompressed_size as f64 / compressed_size as f64;
1708
1709 assert!(
1711 ratio > 2.0,
1712 "Compression ratio {ratio:.2}x is below expected 2x minimum"
1713 );
1714 }
1715
1716 #[test]
1717 fn compressed_rewind() {
1718 let temp = NamedTempFile::new().expect("create temp file");
1719 let path = temp.path();
1720
1721 let metadata = TraceMetadata::new(42);
1722 let events = sample_events();
1723
1724 let config = TraceFileConfig::new().with_compression(CompressionMode::Lz4 { level: 1 });
1726 let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
1727 writer.write_metadata(&metadata).expect("write metadata");
1728 for event in &events {
1729 writer.write_event(event).expect("write event");
1730 }
1731 writer.finish().expect("finish");
1732
1733 let mut reader = TraceReader::open(path).expect("open reader");
1734 assert!(reader.is_compressed());
1735
1736 let e1 = reader.read_event().expect("read").expect("event");
1738 let e2 = reader.read_event().expect("read").expect("event");
1739 assert_eq!(reader.events_read(), 2);
1740
1741 reader.rewind().expect("rewind");
1743 assert_eq!(reader.events_read(), 0);
1744
1745 let e1_again = reader.read_event().expect("read").expect("event");
1746 let e2_again = reader.read_event().expect("read").expect("event");
1747 assert_eq!(e1, e1_again);
1748 assert_eq!(e2, e2_again);
1749 }
1750
1751 #[test]
1752 fn uncompressed_still_readable() {
1753 let temp = NamedTempFile::new().expect("create temp file");
1754 let path = temp.path();
1755
1756 let metadata = TraceMetadata::new(42);
1757 let events = sample_events();
1758
1759 write_trace(path, &metadata, &events).expect("write trace");
1761
1762 let reader = TraceReader::open(path).expect("open reader");
1764 assert!(!reader.is_compressed());
1765 assert_eq!(reader.event_count(), events.len() as u64);
1766
1767 let read_events = reader.load_all().expect("load all");
1768 assert_eq!(read_events, events);
1769 }
1770 }
1771}