use super::{
dicts,
format::{
CARGO_METADATA_JSON_PATH, OutputDict, RECORD_OPTS_JSON_PATH, RUN_LOG_FILE_NAME,
STDERR_DICT_PATH, STDOUT_DICT_PATH, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH,
stored_file_options, zstd_file_options,
},
summary::{
OutputEventKind, OutputFileName, OutputKind, RecordOpts, TestEventKindSummary,
TestEventSummary, ZipStoreOutput, ZipStoreOutputDescription,
},
};
use crate::{
errors::{RunStoreError, StoreWriterError},
output_spec::{LiveSpec, RecordingSpec},
record::format::{RERUN_INFO_JSON_PATH, RerunInfo},
reporter::events::{
ChildExecutionOutputDescription, ChildOutputDescription, ExecuteStatus, ExecutionStatuses,
SetupScriptExecuteStatus,
},
test_output::ChildSingleOutput,
};
use camino::{Utf8Path, Utf8PathBuf};
use countio::Counter;
use debug_ignore::DebugIgnore;
use eazip::ArchiveWriter;
use nextest_metadata::TestListSummary;
use std::{
borrow::Cow,
collections::HashSet,
fs::File,
io::{self, Write},
};
struct LogEncoder {
inner: Option<Counter<zstd::stream::Encoder<'static, Counter<File>>>>,
}
impl std::fmt::Debug for LogEncoder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LogEncoder").finish_non_exhaustive()
}
}
impl LogEncoder {
fn new(encoder: zstd::stream::Encoder<'static, Counter<File>>) -> Self {
Self {
inner: Some(Counter::new(encoder)),
}
}
fn finish(mut self, entries: u64) -> io::Result<ComponentSizes> {
let counter = self.inner.take().expect("encoder already finished");
let uncompressed = counter.writer_bytes() as u64;
let file_counter = counter.into_inner().finish()?;
let compressed = file_counter.writer_bytes() as u64;
Ok(ComponentSizes {
compressed,
uncompressed,
entries,
})
}
}
impl Write for LogEncoder {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner
.as_mut()
.expect("encoder already finished")
.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.inner
.as_mut()
.expect("encoder already finished")
.flush()
}
}
impl Drop for LogEncoder {
fn drop(&mut self) {
if let Some(counter) = self.inner.take() {
let _ = counter.into_inner().finish();
}
}
}
#[derive(Debug)]
pub struct RunRecorder {
store_path: Utf8PathBuf,
store_writer: StoreWriter,
log_path: Utf8PathBuf,
log: DebugIgnore<LogEncoder>,
log_entries: u64,
max_output_size: usize,
}
impl RunRecorder {
pub(super) fn new(
run_dir: Utf8PathBuf,
max_output_size: bytesize::ByteSize,
) -> Result<Self, RunStoreError> {
std::fs::create_dir_all(&run_dir).map_err(|error| RunStoreError::RunDirCreate {
run_dir: run_dir.clone(),
error,
})?;
let store_path = run_dir.join(STORE_ZIP_FILE_NAME);
let store_writer =
StoreWriter::new(&store_path).map_err(|error| RunStoreError::StoreWrite {
store_path: store_path.clone(),
error,
})?;
let log_path = run_dir.join(RUN_LOG_FILE_NAME);
let file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&log_path)
.map_err(|error| RunStoreError::RunLogCreate {
path: log_path.clone(),
error,
})?;
let encoder = zstd::stream::Encoder::new(Counter::new(file), 3).map_err(|error| {
RunStoreError::RunLogCreate {
path: log_path.clone(),
error,
}
})?;
let log = LogEncoder::new(encoder);
Ok(Self {
store_path,
store_writer,
log_path,
log: DebugIgnore(log),
log_entries: 0,
max_output_size: usize::try_from(max_output_size.as_u64()).unwrap_or(usize::MAX),
})
}
pub(crate) fn write_meta(
&mut self,
cargo_metadata_json: &str,
test_list: &TestListSummary,
opts: &RecordOpts,
) -> Result<(), RunStoreError> {
let test_list_json = serde_json::to_string(test_list)
.map_err(|error| RunStoreError::TestListSerialize { error })?;
let opts_json = serde_json::to_string(opts)
.map_err(|error| RunStoreError::RecordOptionsSerialize { error })?;
self.write_archive_file(TEST_LIST_JSON_PATH, test_list_json.as_bytes())?;
self.write_archive_file(CARGO_METADATA_JSON_PATH, cargo_metadata_json.as_bytes())?;
self.write_archive_file(RECORD_OPTS_JSON_PATH, opts_json.as_bytes())?;
self.write_archive_file(STDOUT_DICT_PATH, dicts::STDOUT)?;
self.write_archive_file(STDERR_DICT_PATH, dicts::STDERR)?;
Ok(())
}
pub(crate) fn write_rerun_info(&mut self, rerun_info: &RerunInfo) -> Result<(), RunStoreError> {
let rerun_info_json = serde_json::to_string(rerun_info)
.map_err(|error| RunStoreError::RerunInfoSerialize { error })?;
self.write_archive_file(RERUN_INFO_JSON_PATH, rerun_info_json.as_bytes())?;
Ok(())
}
fn write_archive_file(&mut self, path: &str, bytes: &[u8]) -> Result<(), RunStoreError> {
self.store_writer
.add_file(Utf8PathBuf::from(path), bytes)
.map_err(|error| RunStoreError::StoreWrite {
store_path: self.store_path.clone(),
error,
})
}
pub(crate) fn write_event(
&mut self,
event: TestEventSummary<LiveSpec>,
) -> Result<(), RunStoreError> {
let mut cx = SerializeTestEventContext {
store_writer: &mut self.store_writer,
max_output_size: self.max_output_size,
};
let event = cx
.convert_event(event)
.map_err(|error| RunStoreError::StoreWrite {
store_path: self.store_path.clone(),
error,
})?;
let json = serde_json::to_string(&event)
.map_err(|error| RunStoreError::TestEventSerialize { error })?;
self.write_log_impl(json.as_bytes())?;
self.write_log_impl(b"\n")?;
self.log_entries += 1;
Ok(())
}
fn write_log_impl(&mut self, bytes: &[u8]) -> Result<(), RunStoreError> {
self.log
.write_all(bytes)
.map_err(|error| RunStoreError::RunLogWrite {
path: self.log_path.clone(),
error,
})
}
pub(crate) fn finish(self) -> Result<StoreSizes, RunStoreError> {
let log_sizes =
self.log
.0
.finish(self.log_entries)
.map_err(|error| RunStoreError::RunLogFlush {
path: self.log_path.clone(),
error,
})?;
let store_sizes =
self.store_writer
.finish()
.map_err(|error| RunStoreError::StoreWrite {
store_path: self.store_path.clone(),
error,
})?;
Ok(StoreSizes {
log: log_sizes,
store: store_sizes,
})
}
}
#[derive(Debug)]
pub(crate) struct StoreWriter {
writer: DebugIgnore<ArchiveWriter<Counter<File>>>,
added_files: HashSet<Utf8PathBuf>,
uncompressed_size: u64,
}
impl StoreWriter {
fn new(store_path: &Utf8Path) -> Result<Self, StoreWriterError> {
let zip_file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(store_path)
.map_err(|error| StoreWriterError::Create { error })?;
let writer = ArchiveWriter::new(Counter::new(zip_file));
Ok(Self {
writer: DebugIgnore(writer),
added_files: HashSet::new(),
uncompressed_size: 0,
})
}
fn add_file(&mut self, path: Utf8PathBuf, contents: &[u8]) -> Result<(), StoreWriterError> {
if self.added_files.contains(&path) {
return Ok(());
}
self.uncompressed_size += contents.len() as u64;
let dict = OutputDict::for_path(&path);
match dict.dict_bytes() {
Some(dict_bytes) => {
let compressed = compress_with_dict(contents, dict_bytes)
.map_err(|error| StoreWriterError::Compress { error })?;
let options = stored_file_options();
self.writer
.add_file(path.as_str(), &compressed[..], &options)
.map_err(|error| StoreWriterError::Write {
path: path.clone(),
error,
})?;
}
None => {
let options = zstd_file_options();
self.writer
.add_file(path.as_str(), contents, &options)
.map_err(|error| StoreWriterError::Write {
path: path.clone(),
error,
})?;
}
}
self.added_files.insert(path);
Ok(())
}
fn finish(self) -> Result<ComponentSizes, StoreWriterError> {
let entries = self.added_files.len() as u64;
let mut counter = self
.writer
.0
.finish()
.map_err(|error| StoreWriterError::Finish { error })?;
counter
.flush()
.map_err(|error| StoreWriterError::Flush { error })?;
Ok(ComponentSizes {
compressed: counter.writer_bytes() as u64,
uncompressed: self.uncompressed_size,
entries,
})
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct ComponentSizes {
pub compressed: u64,
pub uncompressed: u64,
pub entries: u64,
}
#[derive(Clone, Copy, Debug, Default)]
pub struct StoreSizes {
pub log: ComponentSizes,
pub store: ComponentSizes,
}
impl StoreSizes {
pub fn total_compressed(&self) -> u64 {
self.log.compressed + self.store.compressed
}
pub fn total_uncompressed(&self) -> u64 {
self.log.uncompressed + self.store.uncompressed
}
}
fn compress_with_dict(data: &[u8], dict_bytes: &[u8]) -> io::Result<Vec<u8>> {
let dict = zstd::dict::EncoderDictionary::copy(dict_bytes, 3);
let mut encoder = zstd::stream::Encoder::with_prepared_dictionary(Vec::new(), &dict)?;
encoder.write_all(data)?;
encoder.finish()
}
struct SerializeTestEventContext<'a> {
store_writer: &'a mut StoreWriter,
max_output_size: usize,
}
impl SerializeTestEventContext<'_> {
fn convert_event(
&mut self,
event: TestEventSummary<LiveSpec>,
) -> Result<TestEventSummary<RecordingSpec>, StoreWriterError> {
Ok(TestEventSummary {
timestamp: event.timestamp,
elapsed: event.elapsed,
kind: self.convert_event_kind(event.kind)?,
})
}
fn convert_event_kind(
&mut self,
kind: TestEventKindSummary<LiveSpec>,
) -> Result<TestEventKindSummary<RecordingSpec>, StoreWriterError> {
match kind {
TestEventKindSummary::Core(core) => Ok(TestEventKindSummary::Core(core)),
TestEventKindSummary::Output(output) => Ok(TestEventKindSummary::Output(
self.convert_output_event(output)?,
)),
}
}
fn convert_output_event(
&mut self,
event: OutputEventKind<LiveSpec>,
) -> Result<OutputEventKind<RecordingSpec>, StoreWriterError> {
match event {
OutputEventKind::SetupScriptFinished {
stress_index,
index,
total,
script_id,
program,
args,
no_capture,
run_status,
} => {
let run_status = self.convert_setup_script_status(&run_status)?;
Ok(OutputEventKind::SetupScriptFinished {
stress_index,
index,
total,
script_id,
program,
args,
no_capture,
run_status,
})
}
OutputEventKind::TestAttemptFailedWillRetry {
stress_index,
test_instance,
run_status,
delay_before_next_attempt,
failure_output,
running,
} => {
let run_status = self.convert_execute_status(run_status)?;
Ok(OutputEventKind::TestAttemptFailedWillRetry {
stress_index,
test_instance,
run_status,
delay_before_next_attempt,
failure_output,
running,
})
}
OutputEventKind::TestFinished {
stress_index,
test_instance,
success_output,
failure_output,
junit_store_success_output,
junit_store_failure_output,
junit_flaky_fail_status,
run_statuses,
current_stats,
running,
} => {
let run_statuses = self.convert_execution_statuses(run_statuses)?;
Ok(OutputEventKind::TestFinished {
stress_index,
test_instance,
success_output,
failure_output,
junit_store_success_output,
junit_store_failure_output,
junit_flaky_fail_status,
run_statuses,
current_stats,
running,
})
}
}
}
fn convert_setup_script_status(
&mut self,
status: &SetupScriptExecuteStatus<LiveSpec>,
) -> Result<SetupScriptExecuteStatus<RecordingSpec>, StoreWriterError> {
Ok(SetupScriptExecuteStatus {
output: self.convert_child_execution_output(&status.output)?,
result: status.result.clone(),
start_time: status.start_time,
time_taken: status.time_taken,
is_slow: status.is_slow,
env_map: status.env_map.clone(),
error_summary: status.error_summary.clone(),
})
}
fn convert_execution_statuses(
&mut self,
statuses: ExecutionStatuses<LiveSpec>,
) -> Result<ExecutionStatuses<RecordingSpec>, StoreWriterError> {
let flaky_result = statuses.flaky_result();
let statuses = statuses
.into_iter()
.map(|status| self.convert_execute_status(status))
.collect::<Result<Vec<_>, _>>()?;
Ok(ExecutionStatuses::new(statuses, flaky_result))
}
fn convert_execute_status(
&mut self,
status: ExecuteStatus<LiveSpec>,
) -> Result<ExecuteStatus<RecordingSpec>, StoreWriterError> {
let output = self.convert_child_execution_output(&status.output)?;
Ok(ExecuteStatus {
retry_data: status.retry_data,
output,
result: status.result,
start_time: status.start_time,
time_taken: status.time_taken,
is_slow: status.is_slow,
delay_before_start: status.delay_before_start,
error_summary: status.error_summary,
output_error_slice: status.output_error_slice,
})
}
fn convert_child_execution_output(
&mut self,
output: &ChildExecutionOutputDescription<LiveSpec>,
) -> Result<ChildExecutionOutputDescription<RecordingSpec>, StoreWriterError> {
match output {
ChildExecutionOutputDescription::Output {
result,
output,
errors,
} => {
let output = self.convert_child_output(output)?;
Ok(ChildExecutionOutputDescription::Output {
result: result.clone(),
output,
errors: errors.clone(),
})
}
ChildExecutionOutputDescription::StartError(err) => {
Ok(ChildExecutionOutputDescription::StartError(err.clone()))
}
}
}
fn convert_child_output(
&mut self,
output: &ChildOutputDescription,
) -> Result<ZipStoreOutputDescription, StoreWriterError> {
match output {
ChildOutputDescription::Split { stdout, stderr } => {
Ok(ZipStoreOutputDescription::Split {
stdout: stdout
.as_ref()
.map(|o| self.write_single_output(Some(o), OutputKind::Stdout))
.transpose()?,
stderr: stderr
.as_ref()
.map(|o| self.write_single_output(Some(o), OutputKind::Stderr))
.transpose()?,
})
}
ChildOutputDescription::Combined { output } => {
Ok(ZipStoreOutputDescription::Combined {
output: self.write_single_output(Some(output), OutputKind::Combined)?,
})
}
ChildOutputDescription::NotLoaded => {
unreachable!(
"NotLoaded output should never be present during recording \
(NotLoaded is only produced during replay conversion)"
);
}
}
}
fn write_single_output(
&mut self,
output: Option<&ChildSingleOutput>,
kind: OutputKind,
) -> Result<ZipStoreOutput, StoreWriterError> {
let Some(output) = output else {
return Ok(ZipStoreOutput::Empty);
};
if output.buf().is_empty() {
return Ok(ZipStoreOutput::Empty);
}
let original_len = output.buf().len();
let (data, truncated): (Cow<'_, [u8]>, bool) = if original_len <= self.max_output_size {
(Cow::Borrowed(output.buf()), false)
} else {
(truncate_output(output.buf(), self.max_output_size), true)
};
let file_name = OutputFileName::from_content(&data, kind);
let file_path = Utf8PathBuf::from(format!("out/{file_name}"));
self.store_writer.add_file(file_path, &data)?;
if truncated {
Ok(ZipStoreOutput::Truncated {
file_name,
original_size: original_len as u64,
})
} else {
Ok(ZipStoreOutput::Full { file_name })
}
}
}
fn truncate_output(buf: &[u8], max_size: usize) -> Cow<'_, [u8]> {
if buf.len() <= max_size {
return Cow::Borrowed(buf);
}
let truncated_bytes = buf.len() - max_size;
let marker = format!("\n\n... [truncated {truncated_bytes} bytes] ...\n\n");
let marker_bytes = marker.as_bytes();
let content_space = max_size.saturating_sub(marker_bytes.len());
let head_size = content_space / 2;
let tail_size = content_space - head_size;
let mut result = Vec::with_capacity(max_size);
result.extend_from_slice(&buf[..head_size]);
result.extend_from_slice(marker_bytes);
result.extend_from_slice(&buf[buf.len() - tail_size..]);
Cow::Owned(result)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::record::dicts;
#[test]
fn test_truncate_output_no_truncation_needed() {
let input = b"hello world";
let result = truncate_output(input, 100);
assert_eq!(&*result, input);
assert!(matches!(result, Cow::Borrowed(_)), "should be borrowed");
}
#[test]
fn test_truncate_output_exact_size() {
let input = b"exactly100bytes";
let result = truncate_output(input, input.len());
assert_eq!(&*result, input);
assert!(matches!(result, Cow::Borrowed(_)), "should be borrowed");
}
#[test]
fn test_truncate_output_basic() {
let input: Vec<u8> = (0..200).collect();
let max_size = 100;
let result = truncate_output(&input, max_size);
assert!(matches!(result, Cow::Owned(_)), "should be owned");
assert!(
result.len() <= max_size,
"result len {} should be <= max_size {}",
result.len(),
max_size
);
let result_str = String::from_utf8_lossy(&result);
assert!(
result_str.contains("[truncated"),
"should contain truncation marker: {result_str:?}"
);
assert!(
result_str.contains("bytes]"),
"should contain 'bytes]': {result_str:?}"
);
assert!(
result.starts_with(&[0, 1, 2]),
"should start with beginning of input"
);
assert!(
result.ends_with(&[197, 198, 199]),
"should end with end of input"
);
}
#[test]
fn test_truncate_output_preserves_head_and_tail() {
let head = b"HEAD_CONTENT_";
let middle = vec![b'x'; 1000];
let tail = b"_TAIL_CONTENT";
let mut input = Vec::new();
input.extend_from_slice(head);
input.extend_from_slice(&middle);
input.extend_from_slice(tail);
let max_size = 200;
let result = truncate_output(&input, max_size);
assert!(result.len() <= max_size);
assert!(
result.starts_with(b"HEAD"),
"should preserve head: {:?}",
String::from_utf8_lossy(&result[..20])
);
assert!(
result.ends_with(b"CONTENT"),
"should preserve tail: {:?}",
String::from_utf8_lossy(&result[result.len() - 20..])
);
}
#[test]
fn test_truncate_output_marker_shows_correct_count() {
let input: Vec<u8> = vec![b'a'; 1000];
let max_size = 100;
let result = truncate_output(&input, max_size);
let result_str = String::from_utf8_lossy(&result);
assert!(
result_str.contains("[truncated 900 bytes]"),
"should show correct truncation count: {result_str:?}"
);
}
#[test]
fn test_truncate_output_large_input() {
let input: Vec<u8> = vec![b'x'; 20 * 1024 * 1024]; let max_size = 10 * 1024 * 1024;
let result = truncate_output(&input, max_size);
assert!(
result.len() <= max_size,
"result {} should be <= max_size {}",
result.len(),
max_size
);
let result_str = String::from_utf8_lossy(&result);
assert!(
result_str.contains("[truncated"),
"should contain truncation marker"
);
}
#[test]
fn test_truncate_output_max_size_smaller_than_marker() {
let input: Vec<u8> = vec![b'x'; 100];
let max_size = 10;
let result = truncate_output(&input, max_size);
let result_str = String::from_utf8_lossy(&result);
assert!(
result_str.contains("[truncated"),
"should still contain truncation marker: {result_str:?}"
);
assert!(
result_str.starts_with("\n\n..."),
"should start with marker prefix"
);
assert!(
result_str.ends_with("...\n\n"),
"should end with marker suffix"
);
}
#[test]
fn test_truncate_output_max_size_zero() {
let input: Vec<u8> = vec![b'x'; 50];
let max_size = 0;
let result = truncate_output(&input, max_size);
let result_str = String::from_utf8_lossy(&result);
assert!(
result_str.contains("[truncated 50 bytes]"),
"should show correct truncation count: {result_str:?}"
);
}
#[test]
fn test_compress_with_dict_stdout() {
let test_output = b"running 1 test\ntest tests::my_test ... ok\n\ntest result: ok. 1 passed; 0 failed; 0 ignored\n";
let compressed =
compress_with_dict(test_output, dicts::STDOUT).expect("compression failed");
let dict = zstd::dict::DecoderDictionary::copy(dicts::STDOUT);
let mut decoder = zstd::stream::Decoder::with_prepared_dictionary(&compressed[..], &dict)
.expect("decoder creation failed");
let mut decompressed = Vec::new();
io::Read::read_to_end(&mut decoder, &mut decompressed).expect("decompression failed");
assert_eq!(decompressed, test_output, "round-trip should preserve data");
}
}