use super::{
format::{
CARGO_METADATA_JSON_PATH, OutputDict, RECORD_OPTS_JSON_PATH, RUN_LOG_FILE_NAME,
STDERR_DICT_PATH, STDOUT_DICT_PATH, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH,
},
summary::{RecordOpts, TestEventSummary},
};
use crate::{
errors::RecordReadError,
output_spec::RecordingSpec,
record::format::{RERUN_INFO_JSON_PATH, RerunInfo},
user_config::elements::MAX_MAX_OUTPUT_SIZE,
};
use camino::{Utf8Path, Utf8PathBuf};
use debug_ignore::DebugIgnore;
use eazip::Archive;
use nextest_metadata::TestListSummary;
use std::{
fs::File,
io::{self, BufRead, BufReader, Read},
};
pub trait StoreReader {
fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError>;
fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError>;
fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError>;
fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError>;
fn load_dictionaries(&mut self) -> Result<(), RecordReadError>;
fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError>;
fn extract_file_to_path(
&mut self,
store_path: &str,
output_path: &Utf8Path,
) -> Result<u64, RecordReadError>;
}
#[derive(Debug)]
pub struct RecordReader {
run_dir: Utf8PathBuf,
archive: Option<DebugIgnore<Archive<BufReader<File>>>>,
stdout_dict: Option<Vec<u8>>,
stderr_dict: Option<Vec<u8>>,
}
impl RecordReader {
pub fn open(run_dir: &Utf8Path) -> Result<Self, RecordReadError> {
if !run_dir.exists() {
return Err(RecordReadError::RunNotFound {
path: run_dir.to_owned(),
});
}
Ok(Self {
run_dir: run_dir.to_owned(),
archive: None,
stdout_dict: None,
stderr_dict: None,
})
}
pub fn run_dir(&self) -> &Utf8Path {
&self.run_dir
}
fn ensure_archive(&mut self) -> Result<&mut Archive<BufReader<File>>, RecordReadError> {
if self.archive.is_none() {
let store_path = self.run_dir.join(STORE_ZIP_FILE_NAME);
let file = File::open(&store_path).map_err(|error| RecordReadError::OpenArchive {
path: store_path.clone(),
error,
})?;
let archive = Archive::new(BufReader::new(file)).map_err(|error| {
RecordReadError::ParseArchive {
path: store_path,
error,
}
})?;
self.archive = Some(DebugIgnore(archive));
}
Ok(self.archive.as_mut().expect("archive was just set"))
}
fn read_archive_file(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
let archive = self.ensure_archive()?;
let mut file =
archive
.get_by_name(file_name)
.ok_or_else(|| RecordReadError::FileNotFound {
file_name: file_name.to_string(),
})?;
let claimed_size = file.metadata().uncompressed_size;
if claimed_size > limit {
return Err(RecordReadError::FileTooLarge {
file_name: file_name.to_string(),
size: claimed_size,
limit,
});
}
let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
let mut contents = Vec::with_capacity(capacity);
file.read()
.and_then(|reader| reader.take(limit).read_to_end(&mut contents))
.map_err(|error| RecordReadError::Decompress {
file_name: file_name.to_string(),
error,
})?;
let actual_size = contents.len() as u64;
if actual_size != claimed_size {
return Err(RecordReadError::SizeMismatch {
file_name: file_name.to_string(),
claimed_size,
actual_size,
});
}
Ok(contents)
}
pub fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
let bytes = self.read_archive_file(CARGO_METADATA_JSON_PATH)?;
String::from_utf8(bytes).map_err(|e| RecordReadError::Decompress {
file_name: CARGO_METADATA_JSON_PATH.to_string(),
error: std::io::Error::new(std::io::ErrorKind::InvalidData, e),
})
}
pub fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
let bytes = self.read_archive_file(TEST_LIST_JSON_PATH)?;
serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
file_name: TEST_LIST_JSON_PATH.to_string(),
error,
})
}
pub fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
let bytes = self.read_archive_file(RECORD_OPTS_JSON_PATH)?;
serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
file_name: RECORD_OPTS_JSON_PATH.to_string(),
error,
})
}
pub fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
match self.read_archive_file(RERUN_INFO_JSON_PATH) {
Ok(bytes) => {
let info = serde_json::from_slice(&bytes).map_err(|error| {
RecordReadError::DeserializeMetadata {
file_name: RERUN_INFO_JSON_PATH.to_string(),
error,
}
})?;
Ok(Some(info))
}
Err(RecordReadError::FileNotFound { .. }) => {
Ok(None)
}
Err(e) => Err(e),
}
}
pub fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
self.stdout_dict = Some(self.read_archive_file(STDOUT_DICT_PATH)?);
self.stderr_dict = Some(self.read_archive_file(STDERR_DICT_PATH)?);
Ok(())
}
pub fn events(&self) -> Result<RecordEventIter, RecordReadError> {
let log_path = self.run_dir.join(RUN_LOG_FILE_NAME);
let file = File::open(&log_path).map_err(|error| RecordReadError::OpenRunLog {
path: log_path.clone(),
error,
})?;
let decoder =
zstd::stream::Decoder::new(file).map_err(|error| RecordReadError::OpenRunLog {
path: log_path,
error,
})?;
Ok(RecordEventIter {
reader: DebugIgnore(BufReader::new(decoder)),
line_buf: String::new(),
line_number: 0,
})
}
pub fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
let path = format!("out/{file_name}");
let compressed = self.read_archive_file(&path)?;
let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
let dict_bytes = self.get_dict_for_output(file_name).ok_or_else(|| {
RecordReadError::UnknownOutputType {
file_name: file_name.to_owned(),
}
})?;
decompress_with_dict(&compressed, dict_bytes, limit).map_err(|error| {
RecordReadError::Decompress {
file_name: path,
error,
}
})
}
fn get_dict_for_output(&self, file_name: &str) -> Option<&[u8]> {
match OutputDict::for_output_file_name(file_name) {
OutputDict::Stdout => Some(
self.stdout_dict
.as_ref()
.expect("load_dictionaries must be called first"),
),
OutputDict::Stderr => Some(
self.stderr_dict
.as_ref()
.expect("load_dictionaries must be called first"),
),
OutputDict::None => None,
}
}
}
impl StoreReader for RecordReader {
fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
RecordReader::read_cargo_metadata(self)
}
fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
RecordReader::read_test_list(self)
}
fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
RecordReader::read_record_opts(self)
}
fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
RecordReader::read_rerun_info(self)
}
fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
RecordReader::load_dictionaries(self)
}
fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
RecordReader::read_output(self, file_name)
}
fn extract_file_to_path(
&mut self,
store_path: &str,
output_path: &Utf8Path,
) -> Result<u64, RecordReadError> {
let archive = self.ensure_archive()?;
let mut file =
archive
.get_by_name(store_path)
.ok_or_else(|| RecordReadError::FileNotFound {
file_name: store_path.to_owned(),
})?;
let mut output_file =
File::create(output_path).map_err(|error| RecordReadError::ExtractFile {
store_path: store_path.to_owned(),
output_path: output_path.to_owned(),
error,
})?;
let mut reader = file
.read()
.map_err(|error| RecordReadError::ReadArchiveFile {
file_name: store_path.to_owned(),
error,
})?;
io::copy(&mut reader, &mut output_file).map_err(|error| RecordReadError::ExtractFile {
store_path: store_path.to_owned(),
output_path: output_path.to_owned(),
error,
})
}
}
pub(super) fn decompress_with_dict(
compressed: &[u8],
dict_bytes: &[u8],
limit: u64,
) -> std::io::Result<Vec<u8>> {
let dict = zstd::dict::DecoderDictionary::copy(dict_bytes);
let decoder = zstd::stream::Decoder::with_prepared_dictionary(compressed, &dict)?;
let mut decompressed = Vec::new();
decoder.take(limit).read_to_end(&mut decompressed)?;
Ok(decompressed)
}
type LogDecoder = zstd::stream::Decoder<'static, BufReader<File>>;
#[derive(Debug)]
pub struct RecordEventIter {
reader: DebugIgnore<BufReader<LogDecoder>>,
line_buf: String,
line_number: usize,
}
impl Iterator for RecordEventIter {
type Item = Result<TestEventSummary<RecordingSpec>, RecordReadError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
self.line_buf.clear();
self.line_number += 1;
match self.reader.read_line(&mut self.line_buf) {
Ok(0) => return None,
Ok(_) => {
let trimmed = self.line_buf.trim();
if trimmed.is_empty() {
continue;
}
return Some(serde_json::from_str(trimmed).map_err(|error| {
RecordReadError::ParseEvent {
line_number: self.line_number,
error,
}
}));
}
Err(error) => {
return Some(Err(RecordReadError::ReadRunLog {
line_number: self.line_number,
error,
}));
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_record_reader_nonexistent_dir() {
let result = RecordReader::open(Utf8Path::new("/nonexistent/path"));
assert!(matches!(result, Err(RecordReadError::RunNotFound { .. })));
}
}