use super::recorder::{DEFAULT_MAX_FILE_SIZE, LimitAction, LimitKind, LimitReached};
use super::replay::{REPLAY_SCHEMA_VERSION, ReplayEvent, TraceMetadata};
use crate::tracing_compat::{error, warn};
use std::fs::File;
use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::Path;
pub const TRACE_MAGIC: &[u8; 11] = b"ASUPERTRACE";
pub const TRACE_FILE_VERSION: u16 = 2;
pub const FLAG_COMPRESSED: u16 = 0x0001;
pub const HEADER_SIZE: usize = 11 + 2 + 2 + 1 + 4;
pub const DEFAULT_COMPRESSION_CHUNK_SIZE: usize = 64 * 1024;
pub const AUTO_COMPRESSION_THRESHOLD: usize = 1024 * 1024;
pub const MAX_META_LEN: usize = 1024 * 1024;
pub const MAX_EVENT_PREALLOC: usize = 10_000_000;
pub const MAX_EVENT_LEN: usize = 16 * 1024 * 1024;
pub const MAX_COMPRESSED_CHUNK_LEN: usize = 64 * 1024 * 1024;
#[cfg(unix)]
const DISK_FULL_OS_ERROR: i32 = 28;
#[cfg(windows)]
const DISK_FULL_OS_ERROR: i32 = 112;
fn is_disk_full_os_error(code: Option<i32>) -> bool {
#[cfg(unix)]
{
code == Some(DISK_FULL_OS_ERROR)
}
#[cfg(windows)]
{
code == Some(DISK_FULL_OS_ERROR)
}
#[cfg(not(any(unix, windows)))]
{
let _ = code;
false
}
}
fn validate_event_len(len: usize) -> TraceFileResult<()> {
if len > MAX_EVENT_LEN {
return Err(TraceFileError::OversizedField {
field: "event_len",
actual: len as u64,
max: MAX_EVENT_LEN as u64,
});
}
Ok(())
}
fn truncated_or_io(err: io::Error) -> TraceFileError {
if err.kind() == io::ErrorKind::UnexpectedEof {
TraceFileError::Truncated
} else {
TraceFileError::Io(err)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CompressionMode {
#[default]
None,
#[cfg(feature = "trace-compression")]
Lz4 {
level: i32,
},
#[cfg(feature = "trace-compression")]
Auto,
}
impl CompressionMode {
#[must_use]
pub fn is_compressed(&self) -> bool {
match self {
Self::None => false,
#[cfg(feature = "trace-compression")]
Self::Lz4 { .. } | Self::Auto => true,
}
}
fn to_byte(self) -> u8 {
match self {
Self::None => 0,
#[cfg(feature = "trace-compression")]
Self::Lz4 { .. } | Self::Auto => 1,
}
}
fn from_byte(byte: u8) -> Option<Self> {
match byte {
0 => Some(Self::None),
#[cfg(feature = "trace-compression")]
1 => Some(Self::Lz4 { level: 1 }),
#[cfg(not(feature = "trace-compression"))]
1 => None, _ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct TraceFileConfig {
pub compression: CompressionMode,
pub chunk_size: usize,
pub max_events: Option<u64>,
pub max_file_size: u64,
pub on_limit: LimitAction,
}
impl Default for TraceFileConfig {
fn default() -> Self {
Self {
compression: CompressionMode::None,
chunk_size: DEFAULT_COMPRESSION_CHUNK_SIZE,
max_events: None,
max_file_size: DEFAULT_MAX_FILE_SIZE,
on_limit: LimitAction::StopRecording,
}
}
}
impl TraceFileConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_compression(mut self, mode: CompressionMode) -> Self {
self.compression = mode;
self
}
#[must_use]
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
#[must_use]
pub const fn with_max_events(mut self, max_events: Option<u64>) -> Self {
self.max_events = max_events;
self
}
#[must_use]
pub const fn with_max_file_size(mut self, max_file_size: u64) -> Self {
self.max_file_size = max_file_size;
self
}
#[must_use]
pub fn on_limit(mut self, action: LimitAction) -> Self {
self.on_limit = action;
self
}
}
#[derive(Debug, thiserror::Error)]
pub enum TraceFileError {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("invalid magic bytes: not a trace file")]
InvalidMagic,
#[error("unsupported file version: expected <= {expected}, found {found}")]
UnsupportedVersion {
expected: u16,
found: u16,
},
#[error("unsupported flags: {0:#06x}")]
UnsupportedFlags(u16),
#[error("unsupported compression format: {0}")]
UnsupportedCompression(u8),
#[error("file is compressed but trace-compression feature is not enabled")]
CompressionNotAvailable,
#[error("compression error: {0}")]
Compression(String),
#[error("decompression error: {0}")]
Decompression(String),
#[error("serialization error: {0}")]
Serialize(String),
#[error("deserialization error: {0}")]
Deserialize(String),
#[error("schema version mismatch: expected {expected}, found {found}")]
SchemaMismatch {
expected: u32,
found: u32,
},
#[error("writer already finished")]
AlreadyFinished,
#[error("trace metadata must be written before events or finish")]
MetadataNotWritten,
#[error("trace metadata can only be written once")]
MetadataAlreadyWritten,
#[error("trace metadata write did not complete; discard and recreate the writer")]
MetadataCorrupt,
#[error("file truncated or corrupt")]
Truncated,
#[error("length prefix too large: {field} is {actual} bytes, max is {max}")]
OversizedField {
field: &'static str,
actual: u64,
max: u64,
},
}
impl From<rmp_serde::encode::Error> for TraceFileError {
fn from(e: rmp_serde::encode::Error) -> Self {
Self::Serialize(e.to_string())
}
}
impl From<rmp_serde::decode::Error> for TraceFileError {
fn from(e: rmp_serde::decode::Error) -> Self {
Self::Deserialize(e.to_string())
}
}
pub type TraceFileResult<T> = Result<T, TraceFileError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TraceWriterMetadataState {
Pending,
Written,
Corrupt,
}
pub struct TraceWriter {
writer: BufWriter<File>,
event_count: u64,
event_count_pos: u64,
finished: bool,
metadata_state: TraceWriterMetadataState,
config: TraceFileConfig,
bytes_written: u64,
buffered_bytes: u64,
stopped: bool,
halted: bool,
#[cfg(feature = "trace-compression")]
event_buffer: Vec<u8>,
}
impl TraceWriter {
pub fn create(path: impl AsRef<Path>) -> TraceFileResult<Self> {
Self::create_with_config(path, TraceFileConfig::default())
}
pub fn create_with_config(
path: impl AsRef<Path>,
config: TraceFileConfig,
) -> TraceFileResult<Self> {
let file = File::create(path)?;
let writer = BufWriter::new(file);
Ok(Self {
writer,
event_count: 0,
event_count_pos: 0,
finished: false,
metadata_state: TraceWriterMetadataState::Pending,
config,
bytes_written: 0,
buffered_bytes: 0,
stopped: false,
halted: false,
#[cfg(feature = "trace-compression")]
event_buffer: Vec::new(),
})
}
fn should_write(&self) -> bool {
!self.stopped && !self.halted
}
fn resolve_limit_action(&self, info: &LimitReached) -> LimitAction {
match &self.config.on_limit {
LimitAction::Callback(cb) => (cb)(info.clone()),
other => other.clone(),
}
}
fn handle_limit(&mut self, info: &LimitReached) -> TraceFileResult<bool> {
let mut action = self.resolve_limit_action(info);
if matches!(action, LimitAction::Callback(_)) {
action = LimitAction::StopRecording;
}
match action {
LimitAction::StopRecording => {
warn!(
kind = ?info.kind,
current_events = info.current_events,
max_events = ?info.max_events,
current_bytes = info.current_bytes,
max_bytes = info.max_bytes,
"trace write stopped: limit reached"
);
self.stopped = true;
Ok(false)
}
LimitAction::DropOldest => {
warn!(
kind = ?info.kind,
"trace write stopped: drop-oldest not supported for file writer"
);
self.stopped = true;
Ok(false)
}
LimitAction::Fail => {
error!(
kind = ?info.kind,
current_events = info.current_events,
max_events = ?info.max_events,
current_bytes = info.current_bytes,
max_bytes = info.max_bytes,
"trace write failed: limit exceeded"
);
self.stopped = true;
Err(TraceFileError::Io(io::Error::other(
"trace write limit exceeded",
)))
}
LimitAction::Callback(_) => {
self.stopped = true;
Ok(false)
}
}
}
fn is_disk_full(err: &io::Error) -> bool {
is_disk_full_os_error(err.raw_os_error())
}
fn handle_disk_full(&mut self, err: io::Error) -> TraceFileError {
warn!("trace write halted: disk full (ENOSPC). Free space and retry recording.");
self.halted = true;
TraceFileError::Io(err)
}
fn write_bytes(&mut self, bytes: &[u8]) -> TraceFileResult<()> {
if self.halted {
return Ok(());
}
match self.writer.write_all(bytes) {
Ok(()) => {
self.bytes_written = self.bytes_written.saturating_add(bytes.len() as u64);
Ok(())
}
Err(err) if Self::is_disk_full(&err) => Err(self.handle_disk_full(err)),
Err(err) => Err(TraceFileError::Io(err)),
}
}
fn update_event_count(&mut self) -> TraceFileResult<()> {
self.writer.seek(SeekFrom::Start(self.event_count_pos))?;
self.writer.write_all(&self.event_count.to_le_bytes())?;
self.writer.flush()?;
Ok(())
}
fn update_event_count_best_effort(&mut self) {
if let Err(err) = self.update_event_count() {
if matches!(
&err,
TraceFileError::Io(io_err) if Self::is_disk_full(io_err)
) {
warn!("trace event count update skipped: disk full");
}
warn!("trace event count update skipped: {err}");
}
}
fn ensure_metadata_written(&self) -> TraceFileResult<()> {
match self.metadata_state {
TraceWriterMetadataState::Pending => Err(TraceFileError::MetadataNotWritten),
TraceWriterMetadataState::Written => Ok(()),
TraceWriterMetadataState::Corrupt => Err(TraceFileError::MetadataCorrupt),
}
}
pub fn write_metadata(&mut self, metadata: &TraceMetadata) -> TraceFileResult<()> {
if self.finished {
return Err(TraceFileError::AlreadyFinished);
}
match self.metadata_state {
TraceWriterMetadataState::Pending => {}
TraceWriterMetadataState::Written => {
return Err(TraceFileError::MetadataAlreadyWritten);
}
TraceWriterMetadataState::Corrupt => {
return Err(TraceFileError::MetadataCorrupt);
}
}
let meta_bytes = rmp_serde::to_vec(metadata)?;
let flags = if self.config.compression.is_compressed() {
FLAG_COMPRESSED
} else {
0
};
self.metadata_state = TraceWriterMetadataState::Corrupt;
self.write_bytes(TRACE_MAGIC)?;
self.write_bytes(&TRACE_FILE_VERSION.to_le_bytes())?;
self.write_bytes(&flags.to_le_bytes())?;
self.write_bytes(&[self.config.compression.to_byte()])?;
let meta_len = u32::try_from(meta_bytes.len()).map_err(|_| {
TraceFileError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"metadata too large for trace format: {} bytes exceeds u32::MAX",
meta_bytes.len()
),
))
})?;
self.write_bytes(&meta_len.to_le_bytes())?;
self.write_bytes(&meta_bytes)?;
self.event_count_pos = HEADER_SIZE as u64 + u64::from(meta_len);
self.write_bytes(&0u64.to_le_bytes())?;
self.metadata_state = TraceWriterMetadataState::Written;
Ok(())
}
pub fn write_event(&mut self, event: &ReplayEvent) -> TraceFileResult<()> {
if self.finished {
return Err(TraceFileError::AlreadyFinished);
}
self.ensure_metadata_written()?;
if !self.should_write() {
return Ok(());
}
if let Some(max_events) = self.config.max_events {
if self.event_count.saturating_add(1) > max_events {
let info = LimitReached {
kind: LimitKind::MaxEvents,
current_events: self.event_count,
max_events: Some(max_events),
current_bytes: self.bytes_written,
max_bytes: self.config.max_file_size,
needed_bytes: 0,
};
if !self.handle_limit(&info)? {
return Ok(());
}
}
}
let event_bytes = rmp_serde::to_vec(event)?;
let len = u32::try_from(event_bytes.len()).map_err(|_| {
TraceFileError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"serialized event too large for trace format: {} bytes exceeds u32::MAX",
event_bytes.len()
),
))
})?;
let estimated_bytes = 4u64 + event_bytes.len() as u64;
let pending_bytes = self.bytes_written.saturating_add(self.buffered_bytes);
if self.config.max_file_size > 0
&& pending_bytes.saturating_add(estimated_bytes) > self.config.max_file_size
{
let info = LimitReached {
kind: LimitKind::MaxFileSize,
current_events: self.event_count,
max_events: self.config.max_events,
current_bytes: pending_bytes,
max_bytes: self.config.max_file_size,
needed_bytes: estimated_bytes,
};
if !self.handle_limit(&info)? {
return Ok(());
}
}
#[cfg(feature = "trace-compression")]
if self.config.compression.is_compressed() {
self.event_buffer.extend_from_slice(&len.to_le_bytes());
self.event_buffer.extend_from_slice(&event_bytes);
self.buffered_bytes = self.buffered_bytes.saturating_add(estimated_bytes);
self.event_count += 1;
if self.event_buffer.len() >= self.config.chunk_size {
self.flush_compressed_chunk()?;
}
return Ok(());
}
self.write_bytes(&len.to_le_bytes())?;
self.write_bytes(&event_bytes)?;
self.event_count += 1;
Ok(())
}
#[cfg(feature = "trace-compression")]
fn flush_compressed_chunk(&mut self) -> TraceFileResult<()> {
if self.event_buffer.is_empty() {
return Ok(());
}
let compressed = lz4_flex::compress_prepend_size(&self.event_buffer);
let chunk_len = u32::try_from(compressed.len()).map_err(|_| {
TraceFileError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"compressed chunk too large for trace format: {} bytes exceeds u32::MAX",
compressed.len()
),
))
})?;
self.write_bytes(&chunk_len.to_le_bytes())?;
self.write_bytes(&compressed)?;
self.event_buffer.clear();
self.buffered_bytes = 0;
Ok(())
}
pub fn finish(mut self) -> TraceFileResult<()> {
self.ensure_metadata_written()?;
self.finished = true;
#[cfg(feature = "trace-compression")]
if self.config.compression.is_compressed() {
self.flush_compressed_chunk()?;
}
if self.halted {
let _ = self.writer.flush();
self.update_event_count_best_effort();
return Ok(());
}
self.writer.flush()?;
self.update_event_count()?;
Ok(())
}
#[must_use]
pub fn event_count(&self) -> u64 {
self.event_count
}
}
impl Drop for TraceWriter {
fn drop(&mut self) {
if !self.finished {
#[cfg(feature = "trace-compression")]
if self.config.compression.is_compressed() {
let _ = self.flush_compressed_chunk();
}
let _ = self.writer.flush();
if self.metadata_state == TraceWriterMetadataState::Written {
self.update_event_count_best_effort();
}
}
}
}
pub struct TraceReader {
reader: BufReader<File>,
metadata: TraceMetadata,
event_count: u64,
events_read: u64,
events_start_pos: u64,
compression: CompressionMode,
#[cfg(feature = "trace-compression")]
decompressed_buffer: Vec<u8>,
#[cfg(feature = "trace-compression")]
buffer_pos: usize,
}
impl TraceReader {
pub fn open(path: impl AsRef<Path>) -> TraceFileResult<Self> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let mut magic = [0u8; 11];
reader.read_exact(&mut magic)?;
if &magic != TRACE_MAGIC {
return Err(TraceFileError::InvalidMagic);
}
let mut version_bytes = [0u8; 2];
reader.read_exact(&mut version_bytes)?;
let version = u16::from_le_bytes(version_bytes);
if version > TRACE_FILE_VERSION {
return Err(TraceFileError::UnsupportedVersion {
expected: TRACE_FILE_VERSION,
found: version,
});
}
let mut flags_bytes = [0u8; 2];
reader.read_exact(&mut flags_bytes)?;
let flags = u16::from_le_bytes(flags_bytes);
let is_compressed = flags & FLAG_COMPRESSED != 0;
let compression = if version >= 2 {
let mut comp_byte = [0u8; 1];
reader.read_exact(&mut comp_byte)?;
match CompressionMode::from_byte(comp_byte[0]) {
Some(mode) => mode,
None if is_compressed => {
return Err(TraceFileError::UnsupportedCompression(comp_byte[0]));
}
None => CompressionMode::None,
}
} else {
if is_compressed {
return Err(TraceFileError::UnsupportedFlags(flags));
}
CompressionMode::None
};
#[cfg(not(feature = "trace-compression"))]
if compression.is_compressed() {
return Err(TraceFileError::CompressionNotAvailable);
}
let mut meta_len_bytes = [0u8; 4];
reader.read_exact(&mut meta_len_bytes)?;
let meta_len = u32::from_le_bytes(meta_len_bytes) as usize;
if meta_len > MAX_META_LEN {
return Err(TraceFileError::OversizedField {
field: "meta_len",
actual: meta_len as u64,
max: MAX_META_LEN as u64,
});
}
let mut meta_bytes = vec![0u8; meta_len];
reader.read_exact(&mut meta_bytes)?;
let metadata: TraceMetadata = rmp_serde::from_slice(&meta_bytes)?;
if metadata.version != REPLAY_SCHEMA_VERSION {
return Err(TraceFileError::SchemaMismatch {
expected: REPLAY_SCHEMA_VERSION,
found: metadata.version,
});
}
let mut event_count_bytes = [0u8; 8];
reader.read_exact(&mut event_count_bytes)?;
let event_count = u64::from_le_bytes(event_count_bytes);
let header_size = if version >= 2 {
HEADER_SIZE
} else {
HEADER_SIZE - 1
};
let events_start_pos = header_size as u64 + meta_len as u64 + 8;
Ok(Self {
reader,
metadata,
event_count,
events_read: 0,
events_start_pos,
compression,
#[cfg(feature = "trace-compression")]
decompressed_buffer: Vec::new(),
#[cfg(feature = "trace-compression")]
buffer_pos: 0,
})
}
#[must_use]
pub fn is_compressed(&self) -> bool {
self.compression.is_compressed()
}
#[must_use]
pub fn compression(&self) -> CompressionMode {
self.compression
}
#[must_use]
pub fn metadata(&self) -> &TraceMetadata {
&self.metadata
}
#[must_use]
pub fn event_count(&self) -> u64 {
self.event_count
}
#[must_use]
pub fn events_read(&self) -> u64 {
self.events_read
}
#[must_use]
pub fn events(self) -> TraceEventIterator {
TraceEventIterator {
reader: self.reader,
remaining: self.event_count,
compression: self.compression,
#[cfg(feature = "trace-compression")]
decompressed_buffer: self.decompressed_buffer,
#[cfg(feature = "trace-compression")]
buffer_pos: self.buffer_pos,
}
}
pub fn read_event(&mut self) -> TraceFileResult<Option<ReplayEvent>> {
if self.events_read >= self.event_count {
return Ok(None);
}
#[cfg(feature = "trace-compression")]
if self.compression.is_compressed() {
return self.read_compressed_event();
}
self.read_uncompressed_event()
}
fn read_uncompressed_event(&mut self) -> TraceFileResult<Option<ReplayEvent>> {
let mut len_bytes = [0u8; 4];
self.reader
.read_exact(&mut len_bytes)
.map_err(truncated_or_io)?;
let len = u32::from_le_bytes(len_bytes) as usize;
validate_event_len(len)?;
let mut event_bytes = vec![0u8; len];
self.reader
.read_exact(&mut event_bytes)
.map_err(truncated_or_io)?;
let event: ReplayEvent = rmp_serde::from_slice(&event_bytes)?;
self.events_read += 1;
Ok(Some(event))
}
#[cfg(feature = "trace-compression")]
fn read_compressed_event(&mut self) -> TraceFileResult<Option<ReplayEvent>> {
if self.buffer_pos >= self.decompressed_buffer.len() {
self.refill_decompressed_buffer()?;
}
if self.buffer_pos + 4 > self.decompressed_buffer.len() {
return Err(TraceFileError::Truncated);
}
let len_bytes: [u8; 4] = self.decompressed_buffer[self.buffer_pos..self.buffer_pos + 4]
.try_into()
.map_err(|_| TraceFileError::Truncated)?;
let len = u32::from_le_bytes(len_bytes) as usize;
validate_event_len(len)?;
self.buffer_pos += 4;
if self.buffer_pos + len > self.decompressed_buffer.len() {
return Err(TraceFileError::Truncated);
}
let event_bytes = &self.decompressed_buffer[self.buffer_pos..self.buffer_pos + len];
let event: ReplayEvent = rmp_serde::from_slice(event_bytes)?;
self.buffer_pos += len;
self.events_read += 1;
Ok(Some(event))
}
#[cfg(feature = "trace-compression")]
fn refill_decompressed_buffer(&mut self) -> TraceFileResult<()> {
let mut chunk_len_bytes = [0u8; 4];
self.reader
.read_exact(&mut chunk_len_bytes)
.map_err(truncated_or_io)?;
let chunk_len = u32::from_le_bytes(chunk_len_bytes) as usize;
if chunk_len == 0 {
return Err(TraceFileError::Truncated);
}
if chunk_len > MAX_COMPRESSED_CHUNK_LEN {
return Err(TraceFileError::OversizedField {
field: "compressed_chunk_len",
actual: chunk_len as u64,
max: MAX_COMPRESSED_CHUNK_LEN as u64,
});
}
let mut compressed = vec![0u8; chunk_len];
self.reader
.read_exact(&mut compressed)
.map_err(truncated_or_io)?;
if compressed.len() >= 4 {
let mut len_bytes = [0u8; 4];
len_bytes.copy_from_slice(&compressed[0..4]);
let uncompressed_len = u32::from_le_bytes(len_bytes) as usize;
if uncompressed_len > MAX_COMPRESSED_CHUNK_LEN {
return Err(TraceFileError::OversizedField {
field: "decompressed_chunk_len",
actual: uncompressed_len as u64,
max: MAX_COMPRESSED_CHUNK_LEN as u64,
});
}
}
self.decompressed_buffer = lz4_flex::decompress_size_prepended(&compressed).map_err(
|e: lz4_flex::block::DecompressError| TraceFileError::Decompression(e.to_string()),
)?;
self.buffer_pos = 0;
Ok(())
}
pub fn rewind(&mut self) -> TraceFileResult<()> {
self.reader.seek(SeekFrom::Start(self.events_start_pos))?;
self.events_read = 0;
#[cfg(feature = "trace-compression")]
{
self.decompressed_buffer.clear();
self.buffer_pos = 0;
}
Ok(())
}
pub fn load_all(mut self) -> TraceFileResult<Vec<ReplayEvent>> {
let prealloc = usize::try_from(self.event_count)
.unwrap_or(usize::MAX)
.min(MAX_EVENT_PREALLOC);
let mut events = Vec::with_capacity(prealloc);
while let Some(event) = self.read_event()? {
events.push(event);
}
Ok(events)
}
}
pub struct TraceEventIterator {
reader: BufReader<File>,
remaining: u64,
#[cfg_attr(not(feature = "trace-compression"), allow(dead_code))]
compression: CompressionMode,
#[cfg(feature = "trace-compression")]
decompressed_buffer: Vec<u8>,
#[cfg(feature = "trace-compression")]
buffer_pos: usize,
}
impl Iterator for TraceEventIterator {
type Item = TraceFileResult<ReplayEvent>;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
#[cfg(feature = "trace-compression")]
if self.compression.is_compressed() {
return Some(self.next_compressed());
}
Some(self.next_uncompressed())
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = usize::try_from(self.remaining).unwrap_or(usize::MAX);
(remaining, Some(remaining))
}
}
impl TraceEventIterator {
fn next_uncompressed(&mut self) -> TraceFileResult<ReplayEvent> {
let mut len_bytes = [0u8; 4];
if let Err(e) = self.reader.read_exact(&mut len_bytes) {
return Err(truncated_or_io(e));
}
let len = u32::from_le_bytes(len_bytes) as usize;
validate_event_len(len)?;
let mut event_bytes = vec![0u8; len];
if let Err(e) = self.reader.read_exact(&mut event_bytes) {
return Err(truncated_or_io(e));
}
match rmp_serde::from_slice(&event_bytes) {
Ok(event) => {
self.remaining -= 1;
Ok(event)
}
Err(e) => Err(TraceFileError::from(e)),
}
}
#[cfg(feature = "trace-compression")]
fn next_compressed(&mut self) -> TraceFileResult<ReplayEvent> {
if self.buffer_pos >= self.decompressed_buffer.len() {
self.refill_buffer()?;
}
if self.buffer_pos + 4 > self.decompressed_buffer.len() {
return Err(TraceFileError::Truncated);
}
let len_bytes: [u8; 4] =
match self.decompressed_buffer[self.buffer_pos..self.buffer_pos + 4].try_into() {
Ok(b) => b,
Err(_) => return Err(TraceFileError::Truncated),
};
let len = u32::from_le_bytes(len_bytes) as usize;
validate_event_len(len)?;
self.buffer_pos += 4;
if self.buffer_pos + len > self.decompressed_buffer.len() {
return Err(TraceFileError::Truncated);
}
let event_bytes = &self.decompressed_buffer[self.buffer_pos..self.buffer_pos + len];
match rmp_serde::from_slice(event_bytes) {
Ok(event) => {
self.buffer_pos += len;
self.remaining -= 1;
Ok(event)
}
Err(e) => Err(TraceFileError::from(e)),
}
}
#[cfg(feature = "trace-compression")]
fn refill_buffer(&mut self) -> TraceFileResult<()> {
let mut chunk_len_bytes = [0u8; 4];
self.reader
.read_exact(&mut chunk_len_bytes)
.map_err(truncated_or_io)?;
let chunk_len = u32::from_le_bytes(chunk_len_bytes) as usize;
if chunk_len == 0 {
return Err(TraceFileError::Truncated);
}
if chunk_len > MAX_COMPRESSED_CHUNK_LEN {
return Err(TraceFileError::OversizedField {
field: "compressed_chunk_len",
actual: chunk_len as u64,
max: MAX_COMPRESSED_CHUNK_LEN as u64,
});
}
let mut compressed = vec![0u8; chunk_len];
self.reader
.read_exact(&mut compressed)
.map_err(truncated_or_io)?;
if compressed.len() >= 4 {
let mut len_bytes = [0u8; 4];
len_bytes.copy_from_slice(&compressed[0..4]);
let uncompressed_len = u32::from_le_bytes(len_bytes) as usize;
if uncompressed_len > MAX_COMPRESSED_CHUNK_LEN {
return Err(TraceFileError::OversizedField {
field: "decompressed_chunk_len",
actual: uncompressed_len as u64,
max: MAX_COMPRESSED_CHUNK_LEN as u64,
});
}
}
self.decompressed_buffer = lz4_flex::decompress_size_prepended(&compressed).map_err(
|e: lz4_flex::block::DecompressError| TraceFileError::Decompression(e.to_string()),
)?;
self.buffer_pos = 0;
Ok(())
}
}
impl ExactSizeIterator for TraceEventIterator {}
pub fn write_trace(
path: impl AsRef<Path>,
metadata: &TraceMetadata,
events: &[ReplayEvent],
) -> TraceFileResult<()> {
let mut writer = TraceWriter::create(path)?;
writer.write_metadata(metadata)?;
for event in events {
writer.write_event(event)?;
}
writer.finish()
}
pub fn read_trace(path: impl AsRef<Path>) -> TraceFileResult<(TraceMetadata, Vec<ReplayEvent>)> {
let reader = TraceReader::open(path)?;
let metadata = reader.metadata().clone();
let events = reader.load_all()?;
Ok((metadata, events))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::trace::replay::CompactTaskId;
use serde_json::json;
use std::io::Write;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tempfile::NamedTempFile;
fn sample_events() -> Vec<ReplayEvent> {
vec![
ReplayEvent::RngSeed { seed: 42 },
ReplayEvent::TaskScheduled {
task: CompactTaskId(1),
at_tick: 0,
},
ReplayEvent::TimeAdvanced {
from_nanos: 0,
to_nanos: 1_000_000,
},
ReplayEvent::TaskYielded {
task: CompactTaskId(1),
},
ReplayEvent::TaskScheduled {
task: CompactTaskId(1),
at_tick: 1,
},
ReplayEvent::TaskCompleted {
task: CompactTaskId(1),
outcome: 0,
},
]
}
fn write_header_with_metadata(file: &mut std::fs::File, compression: CompressionMode) {
let metadata = TraceMetadata::new(42);
let meta_bytes = rmp_serde::to_vec(&metadata).expect("serialize metadata");
let flags = if compression.is_compressed() {
FLAG_COMPRESSED
} else {
0
};
file.write_all(TRACE_MAGIC).expect("write magic");
file.write_all(&TRACE_FILE_VERSION.to_le_bytes())
.expect("write version");
file.write_all(&flags.to_le_bytes()).expect("write flags");
file.write_all(&[compression.to_byte()])
.expect("write compression");
file.write_all(&(meta_bytes.len() as u32).to_le_bytes())
.expect("write metadata length");
file.write_all(&meta_bytes).expect("write metadata");
}
fn trace_file_layout_summary(path: &std::path::Path) -> serde_json::Value {
let bytes = std::fs::read(path).expect("read trace bytes");
let version = u16::from_le_bytes(
bytes[TRACE_MAGIC.len()..TRACE_MAGIC.len() + 2]
.try_into()
.expect("version bytes"),
);
let flags = u16::from_le_bytes(
bytes[TRACE_MAGIC.len() + 2..TRACE_MAGIC.len() + 4]
.try_into()
.expect("flag bytes"),
);
let compression_byte = bytes[TRACE_MAGIC.len() + 4];
let meta_len = u32::from_le_bytes(
bytes[TRACE_MAGIC.len() + 5..HEADER_SIZE]
.try_into()
.expect("metadata length bytes"),
);
let event_count_offset = HEADER_SIZE + meta_len as usize;
let event_count = u64::from_le_bytes(
bytes[event_count_offset..event_count_offset + 8]
.try_into()
.expect("event count bytes"),
);
let mut metadata =
serde_json::to_value(TraceReader::open(path).expect("open reader").metadata())
.expect("serialize metadata");
if let Some(obj) = metadata.as_object_mut() {
if let Some(recorded_at) = obj.get_mut("recorded_at") {
*recorded_at = json!("[recorded_at]");
}
}
json!({
"magic": std::str::from_utf8(TRACE_MAGIC).expect("trace magic is valid utf8"),
"version": version,
"flags_hex": format!("{flags:#06x}"),
"compression_byte": compression_byte,
"meta_len": meta_len,
"event_count": event_count,
"metadata": metadata,
"events": sample_events(),
})
}
#[test]
fn compression_mode_debug_clone_copy_eq_default() {
let def = CompressionMode::default();
assert_eq!(def, CompressionMode::None);
let copied = def;
let cloned = def;
assert_eq!(copied, cloned);
assert!(!def.is_compressed());
let dbg = format!("{def:?}");
assert!(dbg.contains("None"));
}
#[test]
fn trace_file_config_debug_clone_default() {
let def = TraceFileConfig::default();
assert_eq!(def.compression, CompressionMode::None);
assert_eq!(def.chunk_size, DEFAULT_COMPRESSION_CHUNK_SIZE);
assert!(def.max_events.is_none());
let cloned = def.clone();
assert_eq!(cloned.compression, CompressionMode::None);
let dbg = format!("{def:?}");
assert!(dbg.contains("TraceFileConfig"));
}
#[test]
fn trace_file_error_debug_display() {
let err = TraceFileError::InvalidMagic;
let dbg = format!("{err:?}");
assert!(dbg.contains("InvalidMagic"));
let display = format!("{err}");
assert!(display.contains("magic"));
let version_err = TraceFileError::UnsupportedVersion {
expected: 2,
found: 99,
};
let display2 = format!("{version_err}");
assert!(display2.contains("99"));
}
#[test]
fn trace_file_layout_snapshot_scrubs_recorded_at() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata {
version: REPLAY_SCHEMA_VERSION,
seed: 42,
recorded_at: 1_726_133_456_789_000_000,
config_hash: 0xfeed_beef_cafe_babe,
description: Some("trace file layout snapshot".to_string()),
};
let events = sample_events();
write_trace(path, &metadata, &events).expect("write trace");
insta::assert_json_snapshot!(
"trace_file_layout_scrubbed_recorded_at",
trace_file_layout_summary(path)
);
}
#[test]
fn write_and_read_roundtrip() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(42).with_description("test trace");
let events = sample_events();
write_trace(path, &metadata, &events).expect("write trace");
let (read_meta, read_events) = read_trace(path).expect("read trace");
assert_eq!(read_meta.seed, metadata.seed);
assert_eq!(read_meta.description, metadata.description);
assert_eq!(read_events.len(), events.len());
for (orig, read) in events.iter().zip(read_events.iter()) {
assert_eq!(orig, read);
}
}
#[test]
fn streaming_write_and_read() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(123);
let events = sample_events();
{
let mut writer = TraceWriter::create(path).expect("create writer");
writer.write_metadata(&metadata).expect("write metadata");
for event in &events {
writer.write_event(event).expect("write event");
}
assert_eq!(writer.event_count(), events.len() as u64);
writer.finish().expect("finish");
}
{
let reader = TraceReader::open(path).expect("open reader");
assert_eq!(reader.metadata().seed, 123);
assert_eq!(reader.event_count(), events.len() as u64);
let mut count = 0;
for result in reader.events() {
let event = result.expect("read event");
assert_eq!(event, events[count]);
count += 1;
}
assert_eq!(count, events.len());
}
}
#[test]
fn reader_rewind() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = sample_events();
write_trace(path, &metadata, &events).expect("write trace");
let mut reader = TraceReader::open(path).expect("open reader");
let e1 = reader.read_event().expect("read").expect("event");
let e2 = reader.read_event().expect("read").expect("event");
assert_eq!(reader.events_read(), 2);
reader.rewind().expect("rewind");
assert_eq!(reader.events_read(), 0);
let e1_again = reader.read_event().expect("read").expect("event");
let e2_again = reader.read_event().expect("read").expect("event");
assert_eq!(e1, e1_again);
assert_eq!(e2, e2_again);
}
#[test]
fn empty_trace() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(0);
write_trace(path, &metadata, &[]).expect("write empty trace");
let (read_meta, read_events) = read_trace(path).expect("read empty trace");
assert_eq!(read_meta.seed, 0);
assert!(read_events.is_empty());
}
#[test]
fn large_trace() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(42);
let event_count = 10_000;
let events: Vec<_> = (0..event_count)
.map(|i| ReplayEvent::TaskScheduled {
task: CompactTaskId(i),
at_tick: i,
})
.collect();
write_trace(path, &metadata, &events).expect("write large trace");
let reader = TraceReader::open(path).expect("open reader");
assert_eq!(reader.event_count(), event_count);
let mut count = 0u64;
for result in reader.events() {
let event = result.expect("read event");
if let ReplayEvent::TaskScheduled { task, at_tick } = event {
assert_eq!(task.0, count);
assert_eq!(at_tick, count);
} else {
unreachable!("unexpected event type");
}
count += 1;
}
assert_eq!(count, event_count);
}
#[test]
fn invalid_magic() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
std::fs::write(path, b"NOT A TRACE FILE").expect("write garbage");
let result = TraceReader::open(path);
assert!(matches!(result, Err(TraceFileError::InvalidMagic)));
}
#[test]
fn reader_read_event_errors_on_truncated_stream() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let mut file = std::fs::File::create(path).expect("create file");
write_header_with_metadata(&mut file, CompressionMode::None);
file.write_all(&1u64.to_le_bytes())
.expect("write event count");
file.flush().expect("flush");
drop(file);
let mut reader = TraceReader::open(path).expect("open reader");
let err = reader
.read_event()
.expect_err("missing declared event must error");
assert!(matches!(err, TraceFileError::Truncated), "got: {err:?}");
}
#[test]
fn event_iterator_errors_on_truncated_stream() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let mut file = std::fs::File::create(path).expect("create file");
write_header_with_metadata(&mut file, CompressionMode::None);
file.write_all(&1u64.to_le_bytes())
.expect("write event count");
file.flush().expect("flush");
drop(file);
let mut iter = TraceReader::open(path).expect("open reader").events();
let first = iter
.next()
.expect("iterator should emit an error for the missing event");
assert!(
matches!(first, Err(TraceFileError::Truncated)),
"got: {first:?}"
);
}
#[test]
fn file_size_reasonable() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events: Vec<_> = (0..1000)
.map(|i| ReplayEvent::TaskScheduled {
task: CompactTaskId(i),
at_tick: i,
})
.collect();
write_trace(path, &metadata, &events).expect("write trace");
let file_size = std::fs::metadata(path).expect("metadata").len();
let file_size = u32::try_from(file_size).expect("trace file size fits u32 for test");
let bytes_per_event = f64::from(file_size) / 1000.0;
assert!(
bytes_per_event < 40.0,
"File size too large: {bytes_per_event:.1} bytes/event"
);
}
#[test]
fn writer_already_finished_error() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let mut writer = TraceWriter::create(path).expect("create writer");
writer
.write_metadata(&TraceMetadata::new(42))
.expect("write metadata");
writer.finish().expect("finish");
}
#[test]
fn write_event_requires_metadata_first() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let mut writer = TraceWriter::create(path).expect("create writer");
let err = writer
.write_event(&ReplayEvent::RngSeed { seed: 42 })
.expect_err("events before metadata must be rejected");
assert!(matches!(err, TraceFileError::MetadataNotWritten));
drop(writer);
let file_len = std::fs::metadata(path).expect("metadata").len();
assert_eq!(
file_len, 0,
"rejecting pre-header events must not scribble an event count at offset zero"
);
}
#[test]
fn finish_requires_metadata_first() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let writer = TraceWriter::create(path).expect("create writer");
let err = writer
.finish()
.expect_err("finish without metadata must be rejected");
assert!(matches!(err, TraceFileError::MetadataNotWritten));
let file_len = std::fs::metadata(path).expect("metadata").len();
assert_eq!(
file_len, 0,
"failed finish without metadata must leave the new file empty"
);
}
#[test]
fn write_metadata_rejects_duplicate_headers_without_corrupting_file() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(42);
let mut writer = TraceWriter::create(path).expect("create writer");
writer.write_metadata(&metadata).expect("write metadata");
let err = writer
.write_metadata(&metadata)
.expect_err("duplicate metadata must be rejected");
assert!(matches!(err, TraceFileError::MetadataAlreadyWritten));
writer.finish().expect("finish");
let reader = TraceReader::open(path).expect("open reader");
assert_eq!(reader.metadata().seed, metadata.seed);
assert_eq!(reader.event_count(), 0);
}
#[test]
fn write_stops_at_max_events() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = sample_events();
let config = TraceFileConfig::new().with_max_events(Some(2));
let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
writer.write_metadata(&metadata).expect("write metadata");
for event in &events {
writer.write_event(event).expect("write event");
}
writer.finish().expect("finish");
let reader = TraceReader::open(path).expect("open reader");
assert_eq!(reader.event_count(), 2);
}
#[test]
fn write_stops_at_max_file_size() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(42);
let meta_len = rmp_serde::to_vec(&metadata)
.expect("serialize metadata")
.len() as u64;
let header_bytes = HEADER_SIZE as u64 + meta_len + 8;
let config = TraceFileConfig::new().with_max_file_size(header_bytes);
let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
writer.write_metadata(&metadata).expect("write metadata");
writer
.write_event(&ReplayEvent::RngSeed { seed: 42 })
.expect("write event");
writer.finish().expect("finish");
let reader = TraceReader::open(path).expect("open reader");
assert_eq!(reader.event_count(), 0);
}
#[test]
fn write_limit_callback_invoked() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let hits = Arc::new(AtomicUsize::new(0));
let hit_ref = Arc::clone(&hits);
let action = LimitAction::Callback(Arc::new(move |_info| {
hit_ref.fetch_add(1, Ordering::SeqCst);
LimitAction::StopRecording
}));
let config = TraceFileConfig::new()
.with_max_events(Some(1))
.on_limit(action);
let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
writer
.write_metadata(&TraceMetadata::new(42))
.expect("write metadata");
writer
.write_event(&ReplayEvent::RngSeed { seed: 1 })
.expect("write event");
writer
.write_event(&ReplayEvent::RngSeed { seed: 2 })
.expect("write event");
writer.finish().expect("finish");
assert_eq!(hits.load(Ordering::SeqCst), 1);
}
#[test]
#[cfg(target_family = "unix")]
fn disk_full_is_handled() {
let path = std::path::Path::new("/dev/full");
if !path.exists() {
return;
}
let Ok(mut writer) = TraceWriter::create(path) else {
return;
};
let _ = writer.write_metadata(&TraceMetadata::new(42));
let result = writer.finish();
assert!(matches!(
result,
Err(TraceFileError::Io(err)) if is_disk_full_os_error(err.raw_os_error())
));
}
#[cfg(feature = "trace-compression")]
mod compression_tests {
use super::*;
#[test]
fn compressed_write_and_read_roundtrip() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(42).with_description("compressed trace");
let events = sample_events();
let config = TraceFileConfig::new().with_compression(CompressionMode::Lz4 { level: 1 });
let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
writer.write_metadata(&metadata).expect("write metadata");
for event in &events {
writer.write_event(event).expect("write event");
}
writer.finish().expect("finish");
let reader = TraceReader::open(path).expect("open reader");
assert!(reader.is_compressed());
assert_eq!(reader.metadata().seed, metadata.seed);
assert_eq!(reader.event_count(), events.len() as u64);
let read_events = reader.load_all().expect("load all");
assert_eq!(read_events.len(), events.len());
for (orig, read) in events.iter().zip(read_events.iter()) {
assert_eq!(orig, read);
}
}
#[test]
fn compressed_streaming_read() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(123);
let events = sample_events();
let config = TraceFileConfig::new().with_compression(CompressionMode::Lz4 { level: 1 });
let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
writer.write_metadata(&metadata).expect("write metadata");
for event in &events {
writer.write_event(event).expect("write event");
}
writer.finish().expect("finish");
let reader = TraceReader::open(path).expect("open reader");
assert!(reader.is_compressed());
let mut count = 0;
for result in reader.events() {
let event = result.expect("read event");
assert_eq!(event, events[count]);
count += 1;
}
assert_eq!(count, events.len());
}
#[test]
fn large_compressed_trace() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(42);
let event_count = 10_000u64;
let events: Vec<_> = (0..event_count)
.map(|i| ReplayEvent::TaskScheduled {
task: CompactTaskId(i),
at_tick: i,
})
.collect();
let config = TraceFileConfig::new()
.with_compression(CompressionMode::Lz4 { level: 1 })
.with_chunk_size(8 * 1024); let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
writer.write_metadata(&metadata).expect("write metadata");
for event in &events {
writer.write_event(event).expect("write event");
}
writer.finish().expect("finish");
let reader = TraceReader::open(path).expect("open reader");
assert!(reader.is_compressed());
assert_eq!(reader.event_count(), event_count);
let mut count = 0u64;
for result in reader.events() {
let event = result.expect("read event");
if let ReplayEvent::TaskScheduled { task, at_tick } = event {
assert_eq!(task.0, count);
assert_eq!(at_tick, count);
} else {
unreachable!("unexpected event type");
}
count += 1;
}
assert_eq!(count, event_count);
}
#[test]
fn compression_ratio() {
let temp_uncompressed = NamedTempFile::new().expect("create temp file");
let temp_compressed = NamedTempFile::new().expect("create temp file");
let metadata = TraceMetadata::new(42);
let event_count = 5000u64;
let events: Vec<_> = (0..event_count)
.map(|i| ReplayEvent::TaskScheduled {
task: CompactTaskId(i % 100), at_tick: i,
})
.collect();
{
let mut writer =
TraceWriter::create(temp_uncompressed.path()).expect("create writer");
writer.write_metadata(&metadata).expect("write metadata");
for event in &events {
writer.write_event(event).expect("write event");
}
writer.finish().expect("finish");
}
{
let config =
TraceFileConfig::new().with_compression(CompressionMode::Lz4 { level: 1 });
let mut writer = TraceWriter::create_with_config(temp_compressed.path(), config)
.expect("create writer");
writer.write_metadata(&metadata).expect("write metadata");
for event in &events {
writer.write_event(event).expect("write event");
}
writer.finish().expect("finish");
}
let uncompressed_size = std::fs::metadata(temp_uncompressed.path())
.expect("metadata")
.len();
let compressed_size = std::fs::metadata(temp_compressed.path())
.expect("metadata")
.len();
#[allow(clippy::cast_precision_loss)]
let ratio = uncompressed_size as f64 / compressed_size as f64;
assert!(
ratio > 2.0,
"Compression ratio {ratio:.2}x is below expected 2x minimum"
);
}
#[test]
fn compressed_rewind() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = sample_events();
let config = TraceFileConfig::new().with_compression(CompressionMode::Lz4 { level: 1 });
let mut writer = TraceWriter::create_with_config(path, config).expect("create writer");
writer.write_metadata(&metadata).expect("write metadata");
for event in &events {
writer.write_event(event).expect("write event");
}
writer.finish().expect("finish");
let mut reader = TraceReader::open(path).expect("open reader");
assert!(reader.is_compressed());
let e1 = reader.read_event().expect("read").expect("event");
let e2 = reader.read_event().expect("read").expect("event");
assert_eq!(reader.events_read(), 2);
reader.rewind().expect("rewind");
assert_eq!(reader.events_read(), 0);
let e1_again = reader.read_event().expect("read").expect("event");
let e2_again = reader.read_event().expect("read").expect("event");
assert_eq!(e1, e1_again);
assert_eq!(e2, e2_again);
}
#[test]
fn uncompressed_still_readable() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let metadata = TraceMetadata::new(42);
let events = sample_events();
write_trace(path, &metadata, &events).expect("write trace");
let reader = TraceReader::open(path).expect("open reader");
assert!(!reader.is_compressed());
assert_eq!(reader.event_count(), events.len() as u64);
let read_events = reader.load_all().expect("load all");
assert_eq!(read_events, events);
}
#[test]
fn reader_read_event_errors_on_truncated_compressed_stream() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let mut file = std::fs::File::create(path).expect("create file");
write_header_with_metadata(&mut file, CompressionMode::Lz4 { level: 1 });
file.write_all(&1u64.to_le_bytes())
.expect("write event count");
file.flush().expect("flush");
drop(file);
let mut reader = TraceReader::open(path).expect("open reader");
let err = reader
.read_event()
.expect_err("missing compressed chunk must error");
assert!(matches!(err, TraceFileError::Truncated), "got: {err:?}");
}
#[test]
fn event_iterator_errors_on_truncated_compressed_stream() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let mut file = std::fs::File::create(path).expect("create file");
write_header_with_metadata(&mut file, CompressionMode::Lz4 { level: 1 });
file.write_all(&1u64.to_le_bytes())
.expect("write event count");
file.flush().expect("flush");
drop(file);
let mut iter = TraceReader::open(path).expect("open reader").events();
let first = iter
.next()
.expect("iterator should emit an error for the missing chunk");
assert!(
matches!(first, Err(TraceFileError::Truncated)),
"got: {first:?}"
);
}
#[test]
fn reader_read_event_rejects_oversized_event_len_in_compressed_stream() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let mut file = std::fs::File::create(path).expect("create file");
write_header_with_metadata(&mut file, CompressionMode::Lz4 { level: 1 });
file.write_all(&1u64.to_le_bytes())
.expect("write event count");
let oversized_len = u32::try_from(MAX_EVENT_LEN + 1).expect("event limit fits in u32");
let compressed = lz4_flex::compress_prepend_size(&oversized_len.to_le_bytes());
let chunk_len = u32::try_from(compressed.len()).expect("compressed chunk fits in u32");
file.write_all(&chunk_len.to_le_bytes())
.expect("write chunk len");
file.write_all(&compressed).expect("write chunk");
file.flush().expect("flush");
drop(file);
let mut reader = TraceReader::open(path).expect("open reader");
let err = reader
.read_event()
.expect_err("oversized event len must error");
assert!(
matches!(
err,
TraceFileError::OversizedField {
field: "event_len",
actual,
max,
} if actual == (MAX_EVENT_LEN as u64) + 1 && max == MAX_EVENT_LEN as u64
),
"got: {err:?}"
);
}
#[test]
fn event_iterator_rejects_oversized_event_len_in_compressed_stream() {
let temp = NamedTempFile::new().expect("create temp file");
let path = temp.path();
let mut file = std::fs::File::create(path).expect("create file");
write_header_with_metadata(&mut file, CompressionMode::Lz4 { level: 1 });
file.write_all(&1u64.to_le_bytes())
.expect("write event count");
let oversized_len = u32::try_from(MAX_EVENT_LEN + 1).expect("event limit fits in u32");
let compressed = lz4_flex::compress_prepend_size(&oversized_len.to_le_bytes());
let chunk_len = u32::try_from(compressed.len()).expect("compressed chunk fits in u32");
file.write_all(&chunk_len.to_le_bytes())
.expect("write chunk len");
file.write_all(&compressed).expect("write chunk");
file.flush().expect("flush");
drop(file);
let mut iter = TraceReader::open(path).expect("open reader").events();
let first = iter
.next()
.expect("iterator should emit an error for oversized event len");
assert!(
matches!(
first,
Err(TraceFileError::OversizedField {
field: "event_len",
actual,
max,
}) if actual == (MAX_EVENT_LEN as u64) + 1 && max == MAX_EVENT_LEN as u64
),
"got: {first:?}"
);
}
}
}