1use super::{
13 format::{
14 CARGO_METADATA_JSON_PATH, OutputDict, RECORD_OPTS_JSON_PATH, RUN_LOG_FILE_NAME,
15 STDERR_DICT_PATH, STDOUT_DICT_PATH, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH,
16 },
17 summary::{RecordOpts, TestEventSummary, ZipStoreOutput},
18};
19use crate::{
20 errors::RecordReadError,
21 record::format::{RERUN_INFO_JSON_PATH, RerunInfo},
22 user_config::elements::MAX_MAX_OUTPUT_SIZE,
23};
24use camino::{Utf8Path, Utf8PathBuf};
25use debug_ignore::DebugIgnore;
26use nextest_metadata::TestListSummary;
27use std::{
28 fs::File,
29 io::{self, BufRead, BufReader, Read},
30};
31use zip::{ZipArchive, result::ZipError};
32
33pub trait StoreReader {
39 fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError>;
41
42 fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError>;
44
45 fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError>;
47
48 fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError>;
52
53 fn load_dictionaries(&mut self) -> Result<(), RecordReadError>;
57
58 fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError>;
67
68 fn extract_file_to_path(
73 &mut self,
74 store_path: &str,
75 output_path: &Utf8Path,
76 ) -> Result<u64, RecordReadError>;
77}
78
79#[derive(Debug)]
84pub struct RecordReader {
85 run_dir: Utf8PathBuf,
86 archive: Option<ZipArchive<File>>,
87 stdout_dict: Option<Vec<u8>>,
89 stderr_dict: Option<Vec<u8>>,
91}
92
93impl RecordReader {
94 pub fn open(run_dir: &Utf8Path) -> Result<Self, RecordReadError> {
98 if !run_dir.exists() {
99 return Err(RecordReadError::RunNotFound {
100 path: run_dir.to_owned(),
101 });
102 }
103
104 Ok(Self {
105 run_dir: run_dir.to_owned(),
106 archive: None,
107 stdout_dict: None,
108 stderr_dict: None,
109 })
110 }
111
112 pub fn run_dir(&self) -> &Utf8Path {
114 &self.run_dir
115 }
116
117 fn ensure_archive(&mut self) -> Result<&mut ZipArchive<File>, RecordReadError> {
119 if self.archive.is_none() {
120 let store_path = self.run_dir.join(STORE_ZIP_FILE_NAME);
121 let file = File::open(&store_path).map_err(|error| RecordReadError::OpenArchive {
122 path: store_path,
123 error,
124 })?;
125 let archive =
126 ZipArchive::new(file).map_err(|error| RecordReadError::ReadArchiveFile {
127 file_name: STORE_ZIP_FILE_NAME.to_string(),
128 error,
129 })?;
130 self.archive = Some(archive);
131 }
132 Ok(self.archive.as_mut().expect("archive was just set"))
133 }
134
135 fn read_archive_file(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
145 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
146 let archive = self.ensure_archive()?;
147 let file =
148 archive
149 .by_name(file_name)
150 .map_err(|error| RecordReadError::ReadArchiveFile {
151 file_name: file_name.to_string(),
152 error,
153 })?;
154
155 let claimed_size = file.size();
156 if claimed_size > limit {
157 return Err(RecordReadError::FileTooLarge {
158 file_name: file_name.to_string(),
159 size: claimed_size,
160 limit,
161 });
162 }
163
164 let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
165 let mut contents = Vec::with_capacity(capacity);
166
167 file.take(limit)
168 .read_to_end(&mut contents)
169 .map_err(|error| RecordReadError::Decompress {
170 file_name: file_name.to_string(),
171 error,
172 })?;
173
174 let actual_size = contents.len() as u64;
175 if actual_size != claimed_size {
176 return Err(RecordReadError::SizeMismatch {
177 file_name: file_name.to_string(),
178 claimed_size,
179 actual_size,
180 });
181 }
182
183 Ok(contents)
184 }
185
186 pub fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
188 let bytes = self.read_archive_file(CARGO_METADATA_JSON_PATH)?;
189 String::from_utf8(bytes).map_err(|e| RecordReadError::Decompress {
190 file_name: CARGO_METADATA_JSON_PATH.to_string(),
191 error: std::io::Error::new(std::io::ErrorKind::InvalidData, e),
192 })
193 }
194
195 pub fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
197 let bytes = self.read_archive_file(TEST_LIST_JSON_PATH)?;
198 serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
199 file_name: TEST_LIST_JSON_PATH.to_string(),
200 error,
201 })
202 }
203
204 pub fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
206 let bytes = self.read_archive_file(RECORD_OPTS_JSON_PATH)?;
207 serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
208 file_name: RECORD_OPTS_JSON_PATH.to_string(),
209 error,
210 })
211 }
212
213 pub fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
218 match self.read_archive_file(RERUN_INFO_JSON_PATH) {
219 Ok(bytes) => {
220 let info = serde_json::from_slice(&bytes).map_err(|error| {
221 RecordReadError::DeserializeMetadata {
222 file_name: RERUN_INFO_JSON_PATH.to_string(),
223 error,
224 }
225 })?;
226 Ok(Some(info))
227 }
228 Err(RecordReadError::ReadArchiveFile {
229 error: ZipError::FileNotFound,
230 ..
231 }) => {
232 Ok(None)
234 }
235 Err(e) => Err(e),
236 }
237 }
238
239 pub fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
248 self.stdout_dict = Some(self.read_archive_file(STDOUT_DICT_PATH)?);
249 self.stderr_dict = Some(self.read_archive_file(STDERR_DICT_PATH)?);
250 Ok(())
251 }
252
253 pub fn events(&self) -> Result<RecordEventIter, RecordReadError> {
257 let log_path = self.run_dir.join(RUN_LOG_FILE_NAME);
258 let file = File::open(&log_path).map_err(|error| RecordReadError::OpenRunLog {
259 path: log_path.clone(),
260 error,
261 })?;
262 let decoder =
263 zstd::stream::Decoder::new(file).map_err(|error| RecordReadError::OpenRunLog {
264 path: log_path,
265 error,
266 })?;
267 Ok(RecordEventIter {
268 reader: DebugIgnore(BufReader::new(decoder)),
269 line_buf: String::new(),
270 line_number: 0,
271 })
272 }
273
274 pub fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
286 let path = format!("out/{file_name}");
287 let compressed = self.read_archive_file(&path)?;
288 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
289
290 let dict_bytes = self.get_dict_for_output(file_name).ok_or_else(|| {
294 RecordReadError::UnknownOutputType {
295 file_name: file_name.to_owned(),
296 }
297 })?;
298
299 decompress_with_dict(&compressed, dict_bytes, limit).map_err(|error| {
300 RecordReadError::Decompress {
301 file_name: path,
302 error,
303 }
304 })
305 }
306
307 fn get_dict_for_output(&self, file_name: &str) -> Option<&[u8]> {
316 match OutputDict::for_output_file_name(file_name) {
317 OutputDict::Stdout => Some(
318 self.stdout_dict
319 .as_ref()
320 .expect("load_dictionaries must be called first"),
321 ),
322 OutputDict::Stderr => Some(
323 self.stderr_dict
324 .as_ref()
325 .expect("load_dictionaries must be called first"),
326 ),
327 OutputDict::None => None,
328 }
329 }
330}
331
332impl StoreReader for RecordReader {
333 fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
334 RecordReader::read_cargo_metadata(self)
335 }
336
337 fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
338 RecordReader::read_test_list(self)
339 }
340
341 fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
342 RecordReader::read_record_opts(self)
343 }
344
345 fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
346 RecordReader::read_rerun_info(self)
347 }
348
349 fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
350 RecordReader::load_dictionaries(self)
351 }
352
353 fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
354 RecordReader::read_output(self, file_name)
355 }
356
357 fn extract_file_to_path(
358 &mut self,
359 store_path: &str,
360 output_path: &Utf8Path,
361 ) -> Result<u64, RecordReadError> {
362 let archive = self.ensure_archive()?;
363 let mut file =
364 archive
365 .by_name(store_path)
366 .map_err(|error| RecordReadError::ReadArchiveFile {
367 file_name: store_path.to_owned(),
368 error,
369 })?;
370
371 let mut output_file =
372 File::create(output_path).map_err(|error| RecordReadError::ExtractFile {
373 store_path: store_path.to_owned(),
374 output_path: output_path.to_owned(),
375 error,
376 })?;
377
378 io::copy(&mut file, &mut output_file).map_err(|error| RecordReadError::ExtractFile {
379 store_path: store_path.to_owned(),
380 output_path: output_path.to_owned(),
381 error,
382 })
383 }
384}
385
386pub(super) fn decompress_with_dict(
391 compressed: &[u8],
392 dict_bytes: &[u8],
393 limit: u64,
394) -> std::io::Result<Vec<u8>> {
395 let dict = zstd::dict::DecoderDictionary::copy(dict_bytes);
396 let decoder = zstd::stream::Decoder::with_prepared_dictionary(compressed, &dict)?;
397 let mut decompressed = Vec::new();
398 decoder.take(limit).read_to_end(&mut decompressed)?;
399 Ok(decompressed)
400}
401
402type LogDecoder = zstd::stream::Decoder<'static, BufReader<File>>;
404
405#[derive(Debug)]
409pub struct RecordEventIter {
410 reader: DebugIgnore<BufReader<LogDecoder>>,
411 line_buf: String,
412 line_number: usize,
413}
414
415impl Iterator for RecordEventIter {
416 type Item = Result<TestEventSummary<ZipStoreOutput>, RecordReadError>;
417
418 fn next(&mut self) -> Option<Self::Item> {
419 loop {
420 self.line_buf.clear();
421 self.line_number += 1;
422
423 match self.reader.read_line(&mut self.line_buf) {
424 Ok(0) => return None,
425 Ok(_) => {
426 let trimmed = self.line_buf.trim();
427 if trimmed.is_empty() {
428 continue;
429 }
430 return Some(serde_json::from_str(trimmed).map_err(|error| {
431 RecordReadError::ParseEvent {
432 line_number: self.line_number,
433 error,
434 }
435 }));
436 }
437 Err(error) => {
438 return Some(Err(RecordReadError::ReadRunLog {
439 line_number: self.line_number,
440 error,
441 }));
442 }
443 }
444 }
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451
452 #[test]
453 fn test_record_reader_nonexistent_dir() {
454 let result = RecordReader::open(Utf8Path::new("/nonexistent/path"));
455 assert!(matches!(result, Err(RecordReadError::RunNotFound { .. })));
456 }
457}