use super::{
format::{
CARGO_METADATA_JSON_PATH, OutputDict, PORTABLE_MANIFEST_FILE_NAME,
PORTABLE_RECORDING_FORMAT_VERSION, PortableManifest, RECORD_OPTS_JSON_PATH,
RERUN_INFO_JSON_PATH, RUN_LOG_FILE_NAME, RerunInfo, STDERR_DICT_PATH, STDOUT_DICT_PATH,
STORE_FORMAT_VERSION, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH, has_zip_extension,
stored_file_options,
},
reader::{StoreReader, decompress_with_dict},
store::{RecordedRunInfo, RunFilesExist, StoreRunsDir},
summary::{RecordOpts, TestEventSummary},
};
use crate::{
errors::{PortableRecordingError, PortableRecordingReadError, RecordReadError},
output_spec::RecordingSpec,
user_config::elements::MAX_MAX_OUTPUT_SIZE,
};
use atomicwrites::{AtomicFile, OverwriteBehavior};
use bytesize::ByteSize;
use camino::{Utf8Path, Utf8PathBuf};
use countio::Counter;
use debug_ignore::DebugIgnore;
use eazip::{Archive, ArchiveWriter, CompressionMethod};
use itertools::Either;
use nextest_metadata::TestListSummary;
use std::{
borrow::Cow,
fs::File,
io::{self, BufRead, BufReader, Cursor, Read, Seek, SeekFrom, Write},
};
#[derive(Debug)]
pub struct PortableRecordingResult {
pub path: Utf8PathBuf,
pub size: u64,
}
#[derive(Debug)]
pub struct ExtractOuterFileResult {
pub bytes_written: u64,
pub exceeded_limit: Option<u64>,
}
#[derive(Debug)]
pub struct PortableRecordingWriter<'a> {
run_info: &'a RecordedRunInfo,
run_dir: Utf8PathBuf,
}
impl<'a> PortableRecordingWriter<'a> {
pub fn new(
run_info: &'a RecordedRunInfo,
runs_dir: StoreRunsDir<'_>,
) -> Result<Self, PortableRecordingError> {
let run_dir = runs_dir.run_dir(run_info.run_id);
if !run_dir.exists() {
return Err(PortableRecordingError::RunDirNotFound { path: run_dir });
}
let store_zip_path = run_dir.join(STORE_ZIP_FILE_NAME);
if !store_zip_path.exists() {
return Err(PortableRecordingError::RequiredFileMissing {
run_dir,
file_name: STORE_ZIP_FILE_NAME,
});
}
let run_log_path = run_dir.join(RUN_LOG_FILE_NAME);
if !run_log_path.exists() {
return Err(PortableRecordingError::RequiredFileMissing {
run_dir,
file_name: RUN_LOG_FILE_NAME,
});
}
Ok(Self { run_info, run_dir })
}
pub fn default_filename(&self) -> String {
format!("nextest-run-{}.zip", self.run_info.run_id)
}
pub fn write_to_dir(
&self,
output_dir: &Utf8Path,
) -> Result<PortableRecordingResult, PortableRecordingError> {
let output_path = output_dir.join(self.default_filename());
self.write_to_path(&output_path)
}
pub fn write_to_path(
&self,
output_path: &Utf8Path,
) -> Result<PortableRecordingResult, PortableRecordingError> {
let atomic_file = AtomicFile::new(output_path, OverwriteBehavior::AllowOverwrite);
let final_size = atomic_file
.write(|temp_file| {
let counter = Counter::new(temp_file);
let mut zip_writer = ArchiveWriter::new(counter);
self.write_manifest(&mut zip_writer)?;
self.copy_file(&mut zip_writer, RUN_LOG_FILE_NAME)?;
self.copy_file(&mut zip_writer, STORE_ZIP_FILE_NAME)?;
let counter = zip_writer
.finish()
.map_err(PortableRecordingError::ZipFinalize)?;
let counter_bytes = counter.writer_bytes() as u64;
let file = counter.into_inner();
let size = file.metadata().map(|m| m.len()).unwrap_or(counter_bytes);
Ok(size)
})
.map_err(|err| match err {
atomicwrites::Error::Internal(source) => PortableRecordingError::AtomicWrite {
path: output_path.to_owned(),
source,
},
atomicwrites::Error::User(e) => e,
})?;
Ok(PortableRecordingResult {
path: output_path.to_owned(),
size: final_size,
})
}
fn write_manifest<W: Write>(
&self,
zip_writer: &mut ArchiveWriter<W>,
) -> Result<(), PortableRecordingError> {
let manifest = PortableManifest::new(self.run_info);
let manifest_json = serde_json::to_vec_pretty(&manifest)
.map_err(PortableRecordingError::SerializeManifest)?;
let options = stored_file_options();
zip_writer
.add_file(PORTABLE_MANIFEST_FILE_NAME, &manifest_json[..], &options)
.map_err(|source| PortableRecordingError::ZipWrite {
file_name: PORTABLE_MANIFEST_FILE_NAME,
source,
})?;
Ok(())
}
fn copy_file<W: Write>(
&self,
zip_writer: &mut ArchiveWriter<W>,
file_name: &'static str,
) -> Result<(), PortableRecordingError> {
let source_path = self.run_dir.join(file_name);
let mut file = File::open(&source_path)
.map_err(|source| PortableRecordingError::ReadFile { file_name, source })?;
let options = stored_file_options();
let mut streamer = zip_writer
.stream_file(file_name, &options)
.map_err(|source| PortableRecordingError::ZipStartFile { file_name, source })?;
io::copy(&mut file, &mut streamer)
.map_err(|source| PortableRecordingError::ZipWrite { file_name, source })?;
streamer
.finish()
.map_err(|source| PortableRecordingError::ZipWrite { file_name, source })?;
Ok(())
}
}
const SPOOL_SIZE_LIMIT: ByteSize = ByteSize(4 * 1024 * 1024 * 1024);
#[cfg(windows)]
enum WindowsFileKind {
Disk,
Pipe,
Other(u32),
}
#[cfg(windows)]
fn classify_windows_handle(file: &File) -> WindowsFileKind {
use std::os::windows::io::AsRawHandle;
use windows_sys::Win32::Storage::FileSystem::{FILE_TYPE_DISK, FILE_TYPE_PIPE, GetFileType};
let file_type = unsafe { GetFileType(file.as_raw_handle()) };
match file_type {
FILE_TYPE_DISK => WindowsFileKind::Disk,
FILE_TYPE_PIPE => WindowsFileKind::Pipe,
other => WindowsFileKind::Other(other),
}
}
#[cfg(unix)]
fn is_not_seekable_error(e: &io::Error) -> bool {
e.raw_os_error() == Some(libc::ESPIPE)
}
fn ensure_seekable(file: File, path: &Utf8Path) -> Result<File, PortableRecordingReadError> {
ensure_seekable_impl(file, path, SPOOL_SIZE_LIMIT)
}
fn ensure_seekable_impl(
file: File,
path: &Utf8Path,
spool_limit: ByteSize,
) -> Result<File, PortableRecordingReadError> {
#[cfg(unix)]
{
let mut file = file;
match file.stream_position() {
Ok(_) => Ok(file),
Err(e) if is_not_seekable_error(&e) => spool_to_temp(file, path, spool_limit),
Err(e) => {
Err(PortableRecordingReadError::SeekProbe {
path: path.to_owned(),
error: e,
})
}
}
}
#[cfg(windows)]
match classify_windows_handle(&file) {
WindowsFileKind::Disk => Ok(file),
WindowsFileKind::Pipe => spool_to_temp(file, path, spool_limit),
WindowsFileKind::Other(file_type) => Err(PortableRecordingReadError::SeekProbe {
path: path.to_owned(),
error: io::Error::other(format!(
"unexpected file handle type {file_type:#x} (expected disk or pipe)"
)),
}),
}
}
fn spool_to_temp(
file: File,
path: &Utf8Path,
spool_limit: ByteSize,
) -> Result<File, PortableRecordingReadError> {
let mut temp =
camino_tempfile::tempfile().map_err(|error| PortableRecordingReadError::SpoolTempFile {
path: path.to_owned(),
error,
})?;
let bytes_copied = io::copy(
&mut (&file).take(spool_limit.0.saturating_add(1)),
&mut temp,
)
.map_err(|error| PortableRecordingReadError::SpoolTempFile {
path: path.to_owned(),
error,
})?;
if bytes_copied > spool_limit.0 {
return Err(PortableRecordingReadError::SpoolTooLarge {
path: path.to_owned(),
limit: spool_limit,
});
}
temp.seek(SeekFrom::Start(0))
.map_err(|error| PortableRecordingReadError::SpoolTempFile {
path: path.to_owned(),
error,
})?;
Ok(temp)
}
type ArchiveReadStorage = Either<File, Cursor<Vec<u8>>>;
pub struct PortableRecording {
archive_path: Utf8PathBuf,
manifest: PortableManifest,
outer_archive: Archive<BufReader<ArchiveReadStorage>>,
}
impl std::fmt::Debug for PortableRecording {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PortableRecording")
.field("archive_path", &self.archive_path)
.field("manifest", &self.manifest)
.finish_non_exhaustive()
}
}
impl RunFilesExist for PortableRecording {
fn store_zip_exists(&self) -> bool {
self.outer_archive.index_of(STORE_ZIP_FILE_NAME).is_some()
}
fn run_log_exists(&self) -> bool {
self.outer_archive.index_of(RUN_LOG_FILE_NAME).is_some()
}
}
impl PortableRecording {
pub fn open(path: &Utf8Path) -> Result<Self, PortableRecordingReadError> {
let file = File::open(path).map_err(|error| PortableRecordingReadError::OpenArchive {
path: path.to_owned(),
error,
})?;
let file = ensure_seekable(file, path)?;
let mut outer_archive =
Archive::new(BufReader::new(Either::Left(file))).map_err(|error| {
PortableRecordingReadError::ReadArchive {
path: path.to_owned(),
error,
}
})?;
if outer_archive
.index_of(PORTABLE_MANIFEST_FILE_NAME)
.is_some()
{
return Self::open_validated(path, outer_archive);
}
let mut file_count = 0;
let mut zip_count = 0;
let mut zip_file: Option<String> = None;
for metadata in outer_archive.entries() {
let name = metadata.name();
if name.ends_with('/') || name.ends_with('\\') {
continue;
}
file_count += 1;
if has_zip_extension(Utf8Path::new(name)) {
zip_count += 1;
if zip_count == 1 {
zip_file = Some(name.to_owned());
}
}
}
if let Some(inner_name) = zip_file.filter(|_| file_count == 1 && zip_count == 1) {
let inner_bytes = read_outer_file(&mut outer_archive, inner_name.into(), path)?;
let inner_archive = Archive::new(BufReader::new(Either::Right(Cursor::new(
inner_bytes,
))))
.map_err(|error| PortableRecordingReadError::ReadArchive {
path: path.to_owned(),
error,
})?;
Self::open_validated(path, inner_archive)
} else {
Err(PortableRecordingReadError::NotAWrapperArchive {
path: path.to_owned(),
file_count,
zip_count,
})
}
}
fn open_validated(
path: &Utf8Path,
mut outer_archive: Archive<BufReader<ArchiveReadStorage>>,
) -> Result<Self, PortableRecordingReadError> {
let manifest_bytes =
read_outer_file(&mut outer_archive, PORTABLE_MANIFEST_FILE_NAME.into(), path)?;
let manifest: PortableManifest =
serde_json::from_slice(&manifest_bytes).map_err(|error| {
PortableRecordingReadError::ParseManifest {
path: path.to_owned(),
error,
}
})?;
if let Err(incompatibility) = manifest
.format_version
.check_readable_by(PORTABLE_RECORDING_FORMAT_VERSION)
{
return Err(PortableRecordingReadError::UnsupportedFormatVersion {
path: path.to_owned(),
found: manifest.format_version,
supported: PORTABLE_RECORDING_FORMAT_VERSION,
incompatibility,
});
}
let store_version = manifest.store_format_version();
if let Err(incompatibility) = store_version.check_readable_by(STORE_FORMAT_VERSION) {
return Err(PortableRecordingReadError::UnsupportedStoreFormatVersion {
path: path.to_owned(),
found: store_version,
supported: STORE_FORMAT_VERSION,
incompatibility,
});
}
Ok(Self {
archive_path: path.to_owned(),
manifest,
outer_archive,
})
}
pub fn archive_path(&self) -> &Utf8Path {
&self.archive_path
}
pub fn run_info(&self) -> RecordedRunInfo {
self.manifest.run_info()
}
pub fn read_run_log(&mut self) -> Result<PortableRecordingRunLog, PortableRecordingReadError> {
let run_log_bytes = read_outer_file(
&mut self.outer_archive,
RUN_LOG_FILE_NAME.into(),
&self.archive_path,
)?;
Ok(PortableRecordingRunLog {
archive_path: self.archive_path.clone(),
run_log_bytes,
})
}
pub fn extract_outer_file_to_path(
&mut self,
file_name: &'static str,
output_path: &Utf8Path,
check_limit: bool,
) -> Result<ExtractOuterFileResult, PortableRecordingReadError> {
extract_outer_file_to_path(
&mut self.outer_archive,
file_name,
&self.archive_path,
output_path,
check_limit,
)
}
pub fn open_store(&mut self) -> Result<PortableStoreReader<'_>, PortableRecordingReadError> {
let file = self
.outer_archive
.get_by_name(STORE_ZIP_FILE_NAME)
.ok_or_else(|| PortableRecordingReadError::MissingFile {
path: self.archive_path.clone(),
file_name: Cow::Borrowed(STORE_ZIP_FILE_NAME),
})?;
let compression = file.metadata().compression_method;
if compression != CompressionMethod::STORE {
return Err(PortableRecordingReadError::CompressedInnerArchive {
archive_path: self.archive_path.clone(),
compression,
});
}
let raw = file
.read_stored()
.map_err(|error| PortableRecordingReadError::ReadArchive {
path: self.archive_path.clone(),
error,
})?;
let store_archive =
Archive::new(raw).map_err(|error| PortableRecordingReadError::ReadArchive {
path: self.archive_path.clone(),
error,
})?;
Ok(PortableStoreReader {
archive_path: &self.archive_path,
store_archive,
stdout_dict: None,
stderr_dict: None,
})
}
}
fn read_outer_file(
archive: &mut Archive<BufReader<ArchiveReadStorage>>,
file_name: Cow<'static, str>,
archive_path: &Utf8Path,
) -> Result<Vec<u8>, PortableRecordingReadError> {
let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
let mut file =
archive
.get_by_name(&file_name)
.ok_or_else(|| PortableRecordingReadError::MissingFile {
path: archive_path.to_owned(),
file_name: file_name.clone(),
})?;
let claimed_size = file.metadata().uncompressed_size;
if claimed_size > limit {
return Err(PortableRecordingReadError::FileTooLarge {
path: archive_path.to_owned(),
file_name,
size: claimed_size,
limit,
});
}
let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
let mut contents = Vec::with_capacity(capacity);
file.read()
.and_then(|reader| reader.take(limit).read_to_end(&mut contents))
.map_err(|error| PortableRecordingReadError::ReadArchive {
path: archive_path.to_owned(),
error,
})?;
Ok(contents)
}
fn extract_outer_file_to_path(
archive: &mut Archive<BufReader<ArchiveReadStorage>>,
file_name: &'static str,
archive_path: &Utf8Path,
output_path: &Utf8Path,
check_limit: bool,
) -> Result<ExtractOuterFileResult, PortableRecordingReadError> {
let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
let mut file =
archive
.get_by_name(file_name)
.ok_or_else(|| PortableRecordingReadError::MissingFile {
path: archive_path.to_owned(),
file_name: Cow::Borrowed(file_name),
})?;
let claimed_size = file.metadata().uncompressed_size;
let exceeded_limit = if check_limit && claimed_size > limit {
Some(claimed_size)
} else {
None
};
let mut output_file =
File::create(output_path).map_err(|error| PortableRecordingReadError::ExtractFile {
archive_path: archive_path.to_owned(),
file_name,
output_path: output_path.to_owned(),
error,
})?;
let mut reader = file
.read()
.map_err(|error| PortableRecordingReadError::ReadArchive {
path: archive_path.to_owned(),
error,
})?;
let bytes_written = io::copy(&mut reader, &mut output_file).map_err(|error| {
PortableRecordingReadError::ExtractFile {
archive_path: archive_path.to_owned(),
file_name,
output_path: output_path.to_owned(),
error,
}
})?;
Ok(ExtractOuterFileResult {
bytes_written,
exceeded_limit,
})
}
#[derive(Debug)]
pub struct PortableRecordingRunLog {
archive_path: Utf8PathBuf,
run_log_bytes: Vec<u8>,
}
impl PortableRecordingRunLog {
pub fn events(&self) -> Result<PortableRecordingEventIter<'_>, RecordReadError> {
let decoder =
zstd::stream::Decoder::with_buffer(&self.run_log_bytes[..]).map_err(|error| {
RecordReadError::OpenRunLog {
path: self.archive_path.join(RUN_LOG_FILE_NAME),
error,
}
})?;
Ok(PortableRecordingEventIter {
reader: DebugIgnore(BufReader::new(decoder)),
line_buf: String::new(),
line_number: 0,
})
}
}
#[derive(Debug)]
pub struct PortableRecordingEventIter<'a> {
reader: DebugIgnore<BufReader<zstd::stream::Decoder<'static, &'a [u8]>>>,
line_buf: String,
line_number: usize,
}
impl Iterator for PortableRecordingEventIter<'_> {
type Item = Result<TestEventSummary<RecordingSpec>, RecordReadError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
self.line_buf.clear();
self.line_number += 1;
match self.reader.read_line(&mut self.line_buf) {
Ok(0) => return None,
Ok(_) => {
let trimmed = self.line_buf.trim();
if trimmed.is_empty() {
continue;
}
return Some(serde_json::from_str(trimmed).map_err(|error| {
RecordReadError::ParseEvent {
line_number: self.line_number,
error,
}
}));
}
Err(error) => {
return Some(Err(RecordReadError::ReadRunLog {
line_number: self.line_number,
error,
}));
}
}
}
}
}
pub struct PortableStoreReader<'a> {
archive_path: &'a Utf8Path,
store_archive: Archive<io::Take<&'a mut BufReader<ArchiveReadStorage>>>,
stdout_dict: Option<Vec<u8>>,
stderr_dict: Option<Vec<u8>>,
}
impl std::fmt::Debug for PortableStoreReader<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PortableStoreReader")
.field("archive_path", &self.archive_path)
.field("stdout_dict", &self.stdout_dict.as_ref().map(|d| d.len()))
.field("stderr_dict", &self.stderr_dict.as_ref().map(|d| d.len()))
.finish_non_exhaustive()
}
}
impl PortableStoreReader<'_> {
fn read_store_file(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
let mut file = self.store_archive.get_by_name(file_name).ok_or_else(|| {
RecordReadError::FileNotFound {
file_name: file_name.to_string(),
}
})?;
let claimed_size = file.metadata().uncompressed_size;
if claimed_size > limit {
return Err(RecordReadError::FileTooLarge {
file_name: file_name.to_string(),
size: claimed_size,
limit,
});
}
let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
let mut contents = Vec::with_capacity(capacity);
file.read()
.and_then(|reader| reader.take(limit).read_to_end(&mut contents))
.map_err(|error| RecordReadError::Decompress {
file_name: file_name.to_string(),
error,
})?;
let actual_size = contents.len() as u64;
if actual_size != claimed_size {
return Err(RecordReadError::SizeMismatch {
file_name: file_name.to_string(),
claimed_size,
actual_size,
});
}
Ok(contents)
}
fn get_dict_for_output(&self, file_name: &str) -> Option<&[u8]> {
match OutputDict::for_output_file_name(file_name) {
OutputDict::Stdout => Some(
self.stdout_dict
.as_ref()
.expect("load_dictionaries must be called first"),
),
OutputDict::Stderr => Some(
self.stderr_dict
.as_ref()
.expect("load_dictionaries must be called first"),
),
OutputDict::None => None,
}
}
}
impl StoreReader for PortableStoreReader<'_> {
fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
let bytes = self.read_store_file(CARGO_METADATA_JSON_PATH)?;
String::from_utf8(bytes).map_err(|e| RecordReadError::Decompress {
file_name: CARGO_METADATA_JSON_PATH.to_string(),
error: std::io::Error::new(std::io::ErrorKind::InvalidData, e),
})
}
fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
let bytes = self.read_store_file(TEST_LIST_JSON_PATH)?;
serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
file_name: TEST_LIST_JSON_PATH.to_string(),
error,
})
}
fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
let bytes = self.read_store_file(RECORD_OPTS_JSON_PATH)?;
serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
file_name: RECORD_OPTS_JSON_PATH.to_string(),
error,
})
}
fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
match self.read_store_file(RERUN_INFO_JSON_PATH) {
Ok(bytes) => {
let info = serde_json::from_slice(&bytes).map_err(|error| {
RecordReadError::DeserializeMetadata {
file_name: RERUN_INFO_JSON_PATH.to_string(),
error,
}
})?;
Ok(Some(info))
}
Err(RecordReadError::FileNotFound { .. }) => {
Ok(None)
}
Err(e) => Err(e),
}
}
fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
self.stdout_dict = Some(self.read_store_file(STDOUT_DICT_PATH)?);
self.stderr_dict = Some(self.read_store_file(STDERR_DICT_PATH)?);
Ok(())
}
fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
let path = format!("out/{file_name}");
let compressed = self.read_store_file(&path)?;
let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
let dict_bytes = self.get_dict_for_output(file_name).ok_or_else(|| {
RecordReadError::UnknownOutputType {
file_name: file_name.to_owned(),
}
})?;
decompress_with_dict(&compressed, dict_bytes, limit).map_err(|error| {
RecordReadError::Decompress {
file_name: path,
error,
}
})
}
fn extract_file_to_path(
&mut self,
store_path: &str,
output_path: &Utf8Path,
) -> Result<u64, RecordReadError> {
let mut file = self.store_archive.get_by_name(store_path).ok_or_else(|| {
RecordReadError::FileNotFound {
file_name: store_path.to_owned(),
}
})?;
let mut output_file =
File::create(output_path).map_err(|error| RecordReadError::ExtractFile {
store_path: store_path.to_owned(),
output_path: output_path.to_owned(),
error,
})?;
let mut reader = file
.read()
.map_err(|error| RecordReadError::ReadArchiveFile {
file_name: store_path.to_owned(),
error,
})?;
io::copy(&mut reader, &mut output_file).map_err(|error| RecordReadError::ExtractFile {
store_path: store_path.to_owned(),
output_path: output_path.to_owned(),
error,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::record::{
format::{PORTABLE_RECORDING_FORMAT_VERSION, STORE_FORMAT_VERSION},
store::{CompletedRunStats, RecordedRunStatus, RecordedSizes},
};
use camino_tempfile::{NamedUtf8TempFile, Utf8TempDir};
use chrono::Local;
use eazip::write::FileOptions;
use quick_junit::ReportUuid;
use semver::Version;
use std::{collections::BTreeMap, io::Read};
fn create_test_run_dir(run_id: ReportUuid) -> (Utf8TempDir, Utf8PathBuf) {
let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
let runs_dir = temp_dir.path().to_owned();
let run_dir = runs_dir.join(run_id.to_string());
std::fs::create_dir_all(&run_dir).expect("create run dir");
let store_path = run_dir.join(STORE_ZIP_FILE_NAME);
let store_file = File::create(&store_path).expect("create store.zip");
let mut zip_writer = ArchiveWriter::new(store_file);
let options = FileOptions::default();
zip_writer
.add_file("test.txt", &b"test content"[..], &options)
.expect("add file");
zip_writer.finish().expect("finish zip");
let log_path = run_dir.join(RUN_LOG_FILE_NAME);
let log_file = File::create(&log_path).expect("create run.log.zst");
let mut encoder = zstd::stream::Encoder::new(log_file, 3).expect("create encoder");
encoder.write_all(b"test log content").expect("write log");
encoder.finish().expect("finish encoder");
(temp_dir, runs_dir)
}
fn create_test_run_info(run_id: ReportUuid) -> RecordedRunInfo {
let now = Local::now().fixed_offset();
RecordedRunInfo {
run_id,
store_format_version: STORE_FORMAT_VERSION,
nextest_version: Version::new(0, 9, 111),
started_at: now,
last_written_at: now,
duration_secs: Some(12.345),
cli_args: vec!["cargo".to_owned(), "nextest".to_owned(), "run".to_owned()],
build_scope_args: vec!["--workspace".to_owned()],
env_vars: BTreeMap::from([("CARGO_TERM_COLOR".to_owned(), "always".to_owned())]),
parent_run_id: None,
sizes: RecordedSizes::default(),
status: RecordedRunStatus::Completed(CompletedRunStats {
initial_run_count: 10,
passed: 9,
failed: 1,
exit_code: 100,
}),
}
}
#[test]
fn test_default_filename() {
let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
let (_temp_dir, runs_dir) = create_test_run_dir(run_id);
let run_info = create_test_run_info(run_id);
let writer = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir))
.expect("create writer");
assert_eq!(
writer.default_filename(),
"nextest-run-550e8400-e29b-41d4-a716-446655440000.zip"
);
}
#[test]
fn test_write_portable_recording() {
let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
let (_temp_dir, runs_dir) = create_test_run_dir(run_id);
let run_info = create_test_run_info(run_id);
let writer = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir))
.expect("create writer");
let output_dir = camino_tempfile::tempdir().expect("create output dir");
let result = writer
.write_to_dir(output_dir.path())
.expect("write archive");
assert!(result.path.exists());
assert!(result.size > 0);
let actual_size = std::fs::metadata(&result.path)
.expect("get file metadata")
.len();
assert_eq!(
result.size, actual_size,
"reported size should match actual file size"
);
assert_eq!(
result.path.file_name(),
Some("nextest-run-550e8400-e29b-41d4-a716-446655440000.zip")
);
let archive_file = File::open(&result.path).expect("open archive");
let mut archive = Archive::new(BufReader::new(archive_file)).expect("read archive");
assert_eq!(archive.entries().len(), 3);
{
let mut manifest_file = archive
.get_by_name(PORTABLE_MANIFEST_FILE_NAME)
.expect("manifest");
let mut manifest_content = String::new();
manifest_file
.read()
.expect("get reader")
.read_to_string(&mut manifest_content)
.expect("read manifest");
let manifest: PortableManifest =
serde_json::from_str(&manifest_content).expect("parse manifest");
assert_eq!(manifest.format_version, PORTABLE_RECORDING_FORMAT_VERSION);
assert_eq!(manifest.run.run_id, run_id);
}
{
let store_file = archive.get_by_name(STORE_ZIP_FILE_NAME).expect("store.zip");
assert!(store_file.metadata().uncompressed_size > 0);
}
{
let log_file = archive.get_by_name(RUN_LOG_FILE_NAME).expect("run.log.zst");
assert!(log_file.metadata().uncompressed_size > 0);
}
}
#[test]
fn test_missing_run_dir() {
let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
let runs_dir = temp_dir.path().to_owned();
let run_info = create_test_run_info(run_id);
let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
assert!(matches!(
result,
Err(PortableRecordingError::RunDirNotFound { .. })
));
}
#[test]
fn test_missing_store_zip() {
let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
let runs_dir = temp_dir.path().to_owned();
let run_dir = runs_dir.join(run_id.to_string());
std::fs::create_dir_all(&run_dir).expect("create run dir");
let log_path = run_dir.join(RUN_LOG_FILE_NAME);
let log_file = File::create(&log_path).expect("create run.log.zst");
let mut encoder = zstd::stream::Encoder::new(log_file, 3).expect("create encoder");
encoder.write_all(b"test").expect("write");
encoder.finish().expect("finish");
let run_info = create_test_run_info(run_id);
let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
assert!(
matches!(
&result,
Err(PortableRecordingError::RequiredFileMissing { file_name, .. })
if *file_name == STORE_ZIP_FILE_NAME
),
"expected RequiredFileMissing for store.zip, got {result:?}"
);
}
#[test]
fn test_missing_run_log() {
let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
let runs_dir = temp_dir.path().to_owned();
let run_dir = runs_dir.join(run_id.to_string());
std::fs::create_dir_all(&run_dir).expect("create run dir");
let store_path = run_dir.join(STORE_ZIP_FILE_NAME);
let store_file = File::create(&store_path).expect("create store.zip");
let mut zip_writer = ArchiveWriter::new(store_file);
let options = FileOptions::default();
zip_writer
.add_file("test.txt", &b"test"[..], &options)
.expect("add file");
zip_writer.finish().expect("finish");
let run_info = create_test_run_info(run_id);
let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
assert!(
matches!(
&result,
Err(PortableRecordingError::RequiredFileMissing { file_name, .. })
if *file_name == RUN_LOG_FILE_NAME
),
"expected RequiredFileMissing for run.log.zst, got {result:?}"
);
}
#[test]
fn test_ensure_seekable_regular_file() {
let temp = NamedUtf8TempFile::new().expect("created temp file");
let path = temp.path().to_owned();
std::fs::write(&path, b"hello world").expect("wrote to temp file");
let file = File::open(&path).expect("opened temp file");
#[cfg(unix)]
let original_fd = {
use std::os::unix::io::AsRawFd;
file.as_raw_fd()
};
let result = ensure_seekable(file, &path).expect("ensure_seekable succeeded");
#[cfg(unix)]
{
use std::os::unix::io::AsRawFd;
assert_eq!(
result.as_raw_fd(),
original_fd,
"seekable file should be returned as-is"
);
}
let mut contents = String::new();
let mut reader = io::BufReader::new(result);
reader
.read_to_string(&mut contents)
.expect("read file contents");
assert_eq!(contents, "hello world");
}
#[cfg(unix)]
fn pipe_reader_to_file(reader: std::io::PipeReader) -> File {
use std::os::fd::OwnedFd;
File::from(OwnedFd::from(reader))
}
#[cfg(windows)]
fn pipe_reader_to_file(reader: std::io::PipeReader) -> File {
use std::os::windows::io::OwnedHandle;
File::from(OwnedHandle::from(reader))
}
#[test]
fn test_ensure_seekable_pipe() {
let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
let test_data = b"zip-like test content for pipe spooling";
pipe_writer.write_all(test_data).expect("wrote to pipe");
drop(pipe_writer);
let pipe_file = pipe_reader_to_file(pipe_reader);
let path = Utf8Path::new("/dev/fd/99");
let result = ensure_seekable(pipe_file, path).expect("ensure_seekable succeeded");
let mut contents = Vec::new();
let mut reader = io::BufReader::new(result);
reader
.read_to_end(&mut contents)
.expect("read spooled contents");
assert_eq!(contents, test_data);
}
#[test]
fn test_ensure_seekable_empty_pipe() {
let (pipe_reader, pipe_writer) = std::io::pipe().expect("created pipe");
drop(pipe_writer);
let pipe_file = pipe_reader_to_file(pipe_reader);
let path = Utf8Path::new("/dev/fd/42");
let mut result = ensure_seekable(pipe_file, path).expect("empty pipe should succeed");
let mut contents = Vec::new();
result.read_to_end(&mut contents).expect("read contents");
assert!(contents.is_empty());
}
#[test]
fn test_ensure_seekable_spool_too_large() {
let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
pipe_writer
.write_all(b"01234567890123456789")
.expect("wrote to pipe");
drop(pipe_writer);
let pipe_file = pipe_reader_to_file(pipe_reader);
let path = Utf8Path::new("/dev/fd/42");
let result = ensure_seekable_impl(pipe_file, path, ByteSize(10));
assert!(
matches!(
&result,
Err(PortableRecordingReadError::SpoolTooLarge {
limit: ByteSize(10),
..
})
),
"expected SpoolTooLarge, got {result:?}"
);
}
#[test]
fn test_ensure_seekable_spool_one_over_limit() {
let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
pipe_writer
.write_all(b"01234567890")
.expect("wrote to pipe");
drop(pipe_writer);
let pipe_file = pipe_reader_to_file(pipe_reader);
let path = Utf8Path::new("/dev/fd/42");
let result = ensure_seekable_impl(pipe_file, path, ByteSize(10));
assert!(
matches!(
&result,
Err(PortableRecordingReadError::SpoolTooLarge {
limit: ByteSize(10),
..
})
),
"expected SpoolTooLarge at limit+1 bytes, got {result:?}"
);
}
#[test]
fn test_ensure_seekable_spool_exact_limit() {
let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
pipe_writer.write_all(b"0123456789").expect("wrote to pipe");
drop(pipe_writer);
let pipe_file = pipe_reader_to_file(pipe_reader);
let path = Utf8Path::new("/dev/fd/42");
let mut result = ensure_seekable_impl(pipe_file, path, ByteSize(10))
.expect("exact limit should succeed");
let mut contents = Vec::new();
result.read_to_end(&mut contents).expect("read contents");
assert_eq!(contents, b"0123456789");
}
}