Skip to main content

nextest_runner/record/
reader.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Reading logic for recorded test runs.
5//!
6//! The [`RecordReader`] reads a recorded test run from disk, providing access
7//! to metadata and events stored during the run.
8//!
9//! The [`StoreReader`] trait provides a unified interface for reading from
10//! either on-disk stores or portable archives.
11
12use super::{
13    format::{
14        CARGO_METADATA_JSON_PATH, OutputDict, RECORD_OPTS_JSON_PATH, RUN_LOG_FILE_NAME,
15        STDERR_DICT_PATH, STDOUT_DICT_PATH, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH,
16    },
17    summary::{RecordOpts, TestEventSummary},
18};
19use crate::{
20    errors::RecordReadError,
21    output_spec::RecordingSpec,
22    record::format::{RERUN_INFO_JSON_PATH, RerunInfo},
23    user_config::elements::MAX_MAX_OUTPUT_SIZE,
24};
25use camino::{Utf8Path, Utf8PathBuf};
26use debug_ignore::DebugIgnore;
27use eazip::Archive;
28use nextest_metadata::TestListSummary;
29use std::{
30    fs::File,
31    io::{self, BufRead, BufReader, Read},
32};
33
34/// Trait for reading from a recorded run's store.
35///
36/// This trait abstracts over reading from either an on-disk store directory
37/// (via [`RecordReader`]) or from an inner store.zip within a portable archive
38/// (via [`PortableStoreReader`](super::portable::PortableStoreReader)).
39pub trait StoreReader {
40    /// Returns the cargo metadata JSON from the store.
41    fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError>;
42
43    /// Returns the test list summary from the store.
44    fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError>;
45
46    /// Returns the record options from the store.
47    fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError>;
48
49    /// Returns the rerun info from the store, if this is a rerun.
50    ///
51    /// Returns `Ok(None)` if this run is not a rerun (the file doesn't exist).
52    fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError>;
53
54    /// Loads the dictionaries from the store.
55    ///
56    /// This must be called before reading output files.
57    fn load_dictionaries(&mut self) -> Result<(), RecordReadError>;
58
59    /// Reads output for a specific file from the store.
60    ///
61    /// The `file_name` should be the value from `ZipStoreOutput::file_name`,
62    /// e.g., "test-abc123-1-stdout".
63    ///
64    /// # Panics
65    ///
66    /// Panics if [`load_dictionaries`](Self::load_dictionaries) has not been called first.
67    fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError>;
68
69    /// Extracts a file from the store to a path, streaming directly.
70    ///
71    /// The `store_path` is relative to the store root (e.g., `meta/test-list.json`).
72    /// Returns the number of bytes written.
73    fn extract_file_to_path(
74        &mut self,
75        store_path: &str,
76        output_path: &Utf8Path,
77    ) -> Result<u64, RecordReadError>;
78}
79
80/// Reader for a recorded test run.
81///
82/// Provides access to the metadata and events stored during a test run.
83/// The archive is opened lazily when methods are called.
84#[derive(Debug)]
85pub struct RecordReader {
86    run_dir: Utf8PathBuf,
87    archive: Option<DebugIgnore<Archive<BufReader<File>>>>,
88    /// Cached stdout dictionary loaded from the archive.
89    stdout_dict: Option<Vec<u8>>,
90    /// Cached stderr dictionary loaded from the archive.
91    stderr_dict: Option<Vec<u8>>,
92}
93
94impl RecordReader {
95    /// Opens a recorded run from its directory.
96    ///
97    /// The directory should contain `store.zip` and `run.log.zst`.
98    pub fn open(run_dir: &Utf8Path) -> Result<Self, RecordReadError> {
99        if !run_dir.exists() {
100            return Err(RecordReadError::RunNotFound {
101                path: run_dir.to_owned(),
102            });
103        }
104
105        Ok(Self {
106            run_dir: run_dir.to_owned(),
107            archive: None,
108            stdout_dict: None,
109            stderr_dict: None,
110        })
111    }
112
113    /// Returns the path to the run directory.
114    pub fn run_dir(&self) -> &Utf8Path {
115        &self.run_dir
116    }
117
118    /// Opens the zip archive if not already open.
119    fn ensure_archive(&mut self) -> Result<&mut Archive<BufReader<File>>, RecordReadError> {
120        if self.archive.is_none() {
121            let store_path = self.run_dir.join(STORE_ZIP_FILE_NAME);
122            let file = File::open(&store_path).map_err(|error| RecordReadError::OpenArchive {
123                path: store_path.clone(),
124                error,
125            })?;
126            let archive = Archive::new(BufReader::new(file)).map_err(|error| {
127                RecordReadError::ParseArchive {
128                    path: store_path,
129                    error,
130                }
131            })?;
132            self.archive = Some(DebugIgnore(archive));
133        }
134        Ok(self.archive.as_mut().expect("archive was just set"))
135    }
136
137    /// Reads a file from the archive as bytes, with size limit.
138    ///
139    /// The size limit prevents malicious archives from causing OOM by
140    /// specifying a huge decompressed size. The limit is checked against the
141    /// claimed size in the ZIP header, and `take()` is used during decompression
142    /// to guard against spoofed headers.
143    ///
144    /// Since nextest controls archive creation, any mismatch between the header
145    /// size and actual size indicates corruption or tampering.
146    fn read_archive_file(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
147        let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
148        let archive = self.ensure_archive()?;
149        let mut file =
150            archive
151                .get_by_name(file_name)
152                .ok_or_else(|| RecordReadError::FileNotFound {
153                    file_name: file_name.to_string(),
154                })?;
155
156        let claimed_size = file.metadata().uncompressed_size;
157        if claimed_size > limit {
158            return Err(RecordReadError::FileTooLarge {
159                file_name: file_name.to_string(),
160                size: claimed_size,
161                limit,
162            });
163        }
164
165        let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
166        let mut contents = Vec::with_capacity(capacity);
167
168        // file.read() returns a reader that decompresses and verifies CRC32 +
169        // size. The take(limit) is a safety belt against spoofed headers.
170        file.read()
171            .and_then(|reader| reader.take(limit).read_to_end(&mut contents))
172            .map_err(|error| RecordReadError::Decompress {
173                file_name: file_name.to_string(),
174                error,
175            })?;
176
177        let actual_size = contents.len() as u64;
178        if actual_size != claimed_size {
179            return Err(RecordReadError::SizeMismatch {
180                file_name: file_name.to_string(),
181                claimed_size,
182                actual_size,
183            });
184        }
185
186        Ok(contents)
187    }
188
189    /// Returns the cargo metadata JSON from the archive.
190    pub fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
191        let bytes = self.read_archive_file(CARGO_METADATA_JSON_PATH)?;
192        String::from_utf8(bytes).map_err(|e| RecordReadError::Decompress {
193            file_name: CARGO_METADATA_JSON_PATH.to_string(),
194            error: std::io::Error::new(std::io::ErrorKind::InvalidData, e),
195        })
196    }
197
198    /// Returns the test list from the archive.
199    pub fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
200        let bytes = self.read_archive_file(TEST_LIST_JSON_PATH)?;
201        serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
202            file_name: TEST_LIST_JSON_PATH.to_string(),
203            error,
204        })
205    }
206
207    /// Returns the record options from the archive.
208    pub fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
209        let bytes = self.read_archive_file(RECORD_OPTS_JSON_PATH)?;
210        serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
211            file_name: RECORD_OPTS_JSON_PATH.to_string(),
212            error,
213        })
214    }
215
216    /// Returns the rerun info from the archive, if this is a rerun.
217    ///
218    /// Returns `Ok(None)` if this run is not a rerun (the file doesn't exist).
219    /// Returns `Err` if the file exists but cannot be read or parsed.
220    pub fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
221        match self.read_archive_file(RERUN_INFO_JSON_PATH) {
222            Ok(bytes) => {
223                let info = serde_json::from_slice(&bytes).map_err(|error| {
224                    RecordReadError::DeserializeMetadata {
225                        file_name: RERUN_INFO_JSON_PATH.to_string(),
226                        error,
227                    }
228                })?;
229                Ok(Some(info))
230            }
231            Err(RecordReadError::FileNotFound { .. }) => {
232                // File doesn't exist; this is not a rerun.
233                Ok(None)
234            }
235            Err(e) => Err(e),
236        }
237    }
238
239    /// Loads the dictionaries from the archive.
240    ///
241    /// This must be called before reading output files. The dictionaries are
242    /// used for decompressing test output.
243    ///
244    /// Note: The store format version is checked before opening the archive,
245    /// using the `store_format_version` field in runs.json.zst. This method
246    /// assumes the version has already been validated.
247    pub fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
248        self.stdout_dict = Some(self.read_archive_file(STDOUT_DICT_PATH)?);
249        self.stderr_dict = Some(self.read_archive_file(STDERR_DICT_PATH)?);
250        Ok(())
251    }
252
253    /// Returns an iterator over events in the run log.
254    ///
255    /// Events are read one at a time from the zstd-compressed JSON Lines file.
256    pub fn events(&self) -> Result<RecordEventIter, RecordReadError> {
257        let log_path = self.run_dir.join(RUN_LOG_FILE_NAME);
258        let file = File::open(&log_path).map_err(|error| RecordReadError::OpenRunLog {
259            path: log_path.clone(),
260            error,
261        })?;
262        let decoder =
263            zstd::stream::Decoder::new(file).map_err(|error| RecordReadError::OpenRunLog {
264                path: log_path,
265                error,
266            })?;
267        Ok(RecordEventIter {
268            reader: DebugIgnore(BufReader::new(decoder)),
269            line_buf: String::new(),
270            line_number: 0,
271        })
272    }
273
274    /// Reads output for a specific file from the archive.
275    ///
276    /// The `file_name` should be the value from `ZipStoreOutput::file_name`,
277    /// e.g., "test-abc123-1-stdout".
278    ///
279    /// The [`OutputFileName`](crate::record::OutputFileName) type ensures that
280    /// file names are validated during deserialization, preventing path traversal.
281    ///
282    /// # Panics
283    ///
284    /// Panics if [`load_dictionaries`](Self::load_dictionaries) has not been called first.
285    pub fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
286        let path = format!("out/{file_name}");
287        let compressed = self.read_archive_file(&path)?;
288        let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
289
290        // Output files are stored pre-compressed with zstd dictionaries.
291        // Unknown file types indicate a format revision that should have been
292        // rejected during version validation.
293        let dict_bytes = self.get_dict_for_output(file_name).ok_or_else(|| {
294            RecordReadError::UnknownOutputType {
295                file_name: file_name.to_owned(),
296            }
297        })?;
298
299        decompress_with_dict(&compressed, dict_bytes, limit).map_err(|error| {
300            RecordReadError::Decompress {
301                file_name: path,
302                error,
303            }
304        })
305    }
306
307    /// Returns the dictionary bytes for the given output file name, if known.
308    ///
309    /// Returns `None` for unknown file types, which indicates a format revision
310    /// that should have been rejected during version validation.
311    ///
312    /// # Panics
313    ///
314    /// Panics if [`load_dictionaries`](Self::load_dictionaries) has not been called first.
315    fn get_dict_for_output(&self, file_name: &str) -> Option<&[u8]> {
316        match OutputDict::for_output_file_name(file_name) {
317            OutputDict::Stdout => Some(
318                self.stdout_dict
319                    .as_ref()
320                    .expect("load_dictionaries must be called first"),
321            ),
322            OutputDict::Stderr => Some(
323                self.stderr_dict
324                    .as_ref()
325                    .expect("load_dictionaries must be called first"),
326            ),
327            OutputDict::None => None,
328        }
329    }
330}
331
332impl StoreReader for RecordReader {
333    fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
334        RecordReader::read_cargo_metadata(self)
335    }
336
337    fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
338        RecordReader::read_test_list(self)
339    }
340
341    fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
342        RecordReader::read_record_opts(self)
343    }
344
345    fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
346        RecordReader::read_rerun_info(self)
347    }
348
349    fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
350        RecordReader::load_dictionaries(self)
351    }
352
353    fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
354        RecordReader::read_output(self, file_name)
355    }
356
357    fn extract_file_to_path(
358        &mut self,
359        store_path: &str,
360        output_path: &Utf8Path,
361    ) -> Result<u64, RecordReadError> {
362        let archive = self.ensure_archive()?;
363        let mut file =
364            archive
365                .get_by_name(store_path)
366                .ok_or_else(|| RecordReadError::FileNotFound {
367                    file_name: store_path.to_owned(),
368                })?;
369
370        let mut output_file =
371            File::create(output_path).map_err(|error| RecordReadError::ExtractFile {
372                store_path: store_path.to_owned(),
373                output_path: output_path.to_owned(),
374                error,
375            })?;
376
377        // file.read() decompresses and verifies CRC32 + size.
378        let mut reader = file
379            .read()
380            .map_err(|error| RecordReadError::ReadArchiveFile {
381                file_name: store_path.to_owned(),
382                error,
383            })?;
384
385        io::copy(&mut reader, &mut output_file).map_err(|error| RecordReadError::ExtractFile {
386            store_path: store_path.to_owned(),
387            output_path: output_path.to_owned(),
388            error,
389        })
390    }
391}
392
393/// Decompresses data using a pre-trained zstd dictionary, with a size limit.
394///
395/// The limit prevents compression bombs where a small compressed payload
396/// expands to an extremely large decompressed output.
397pub(super) fn decompress_with_dict(
398    compressed: &[u8],
399    dict_bytes: &[u8],
400    limit: u64,
401) -> std::io::Result<Vec<u8>> {
402    let dict = zstd::dict::DecoderDictionary::copy(dict_bytes);
403    let decoder = zstd::stream::Decoder::with_prepared_dictionary(compressed, &dict)?;
404    let mut decompressed = Vec::new();
405    decoder.take(limit).read_to_end(&mut decompressed)?;
406    Ok(decompressed)
407}
408
409/// Zstd decoder reading from a file.
410type LogDecoder = zstd::stream::Decoder<'static, BufReader<File>>;
411
412/// Iterator over recorded events.
413///
414/// Reads events one at a time from the zstd-compressed JSON Lines run log.
415#[derive(Debug)]
416pub struct RecordEventIter {
417    reader: DebugIgnore<BufReader<LogDecoder>>,
418    line_buf: String,
419    line_number: usize,
420}
421
422impl Iterator for RecordEventIter {
423    type Item = Result<TestEventSummary<RecordingSpec>, RecordReadError>;
424
425    fn next(&mut self) -> Option<Self::Item> {
426        loop {
427            self.line_buf.clear();
428            self.line_number += 1;
429
430            match self.reader.read_line(&mut self.line_buf) {
431                Ok(0) => return None,
432                Ok(_) => {
433                    let trimmed = self.line_buf.trim();
434                    if trimmed.is_empty() {
435                        continue;
436                    }
437                    return Some(serde_json::from_str(trimmed).map_err(|error| {
438                        RecordReadError::ParseEvent {
439                            line_number: self.line_number,
440                            error,
441                        }
442                    }));
443                }
444                Err(error) => {
445                    return Some(Err(RecordReadError::ReadRunLog {
446                        line_number: self.line_number,
447                        error,
448                    }));
449                }
450            }
451        }
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458
459    #[test]
460    fn test_record_reader_nonexistent_dir() {
461        let result = RecordReader::open(Utf8Path::new("/nonexistent/path"));
462        assert!(matches!(result, Err(RecordReadError::RunNotFound { .. })));
463    }
464}