Skip to main content

nextest_runner/record/
reader.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Reading logic for recorded test runs.
5//!
6//! The [`RecordReader`] reads a recorded test run from disk, providing access
7//! to metadata and events stored during the run.
8//!
9//! The [`StoreReader`] trait provides a unified interface for reading from
10//! either on-disk stores or portable archives.
11
12use super::{
13    format::{
14        CARGO_METADATA_JSON_PATH, OutputDict, RECORD_OPTS_JSON_PATH, RUN_LOG_FILE_NAME,
15        STDERR_DICT_PATH, STDOUT_DICT_PATH, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH,
16    },
17    summary::{RecordOpts, TestEventSummary, ZipStoreOutput},
18};
19use crate::{
20    errors::RecordReadError,
21    record::format::{RERUN_INFO_JSON_PATH, RerunInfo},
22    user_config::elements::MAX_MAX_OUTPUT_SIZE,
23};
24use camino::{Utf8Path, Utf8PathBuf};
25use debug_ignore::DebugIgnore;
26use nextest_metadata::TestListSummary;
27use std::{
28    fs::File,
29    io::{self, BufRead, BufReader, Read},
30};
31use zip::{ZipArchive, result::ZipError};
32
33/// Trait for reading from a recorded run's store.
34///
35/// This trait abstracts over reading from either an on-disk store directory
36/// (via [`RecordReader`]) or from an inner store.zip within a portable archive
37/// (via [`PortableStoreReader`](super::portable::PortableStoreReader)).
38pub trait StoreReader {
39    /// Returns the cargo metadata JSON from the store.
40    fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError>;
41
42    /// Returns the test list summary from the store.
43    fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError>;
44
45    /// Returns the record options from the store.
46    fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError>;
47
48    /// Returns the rerun info from the store, if this is a rerun.
49    ///
50    /// Returns `Ok(None)` if this run is not a rerun (the file doesn't exist).
51    fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError>;
52
53    /// Loads the dictionaries from the store.
54    ///
55    /// This must be called before reading output files.
56    fn load_dictionaries(&mut self) -> Result<(), RecordReadError>;
57
58    /// Reads output for a specific file from the store.
59    ///
60    /// The `file_name` should be the value from `ZipStoreOutput::file_name`,
61    /// e.g., "test-abc123-1-stdout".
62    ///
63    /// # Panics
64    ///
65    /// Panics if [`load_dictionaries`](Self::load_dictionaries) has not been called first.
66    fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError>;
67
68    /// Extracts a file from the store to a path, streaming directly.
69    ///
70    /// The `store_path` is relative to the store root (e.g., `meta/test-list.json`).
71    /// Returns the number of bytes written.
72    fn extract_file_to_path(
73        &mut self,
74        store_path: &str,
75        output_path: &Utf8Path,
76    ) -> Result<u64, RecordReadError>;
77}
78
79/// Reader for a recorded test run.
80///
81/// Provides access to the metadata and events stored during a test run.
82/// The archive is opened lazily when methods are called.
83#[derive(Debug)]
84pub struct RecordReader {
85    run_dir: Utf8PathBuf,
86    archive: Option<ZipArchive<File>>,
87    /// Cached stdout dictionary loaded from the archive.
88    stdout_dict: Option<Vec<u8>>,
89    /// Cached stderr dictionary loaded from the archive.
90    stderr_dict: Option<Vec<u8>>,
91}
92
93impl RecordReader {
94    /// Opens a recorded run from its directory.
95    ///
96    /// The directory should contain `store.zip` and `run.log.zst`.
97    pub fn open(run_dir: &Utf8Path) -> Result<Self, RecordReadError> {
98        if !run_dir.exists() {
99            return Err(RecordReadError::RunNotFound {
100                path: run_dir.to_owned(),
101            });
102        }
103
104        Ok(Self {
105            run_dir: run_dir.to_owned(),
106            archive: None,
107            stdout_dict: None,
108            stderr_dict: None,
109        })
110    }
111
112    /// Returns the path to the run directory.
113    pub fn run_dir(&self) -> &Utf8Path {
114        &self.run_dir
115    }
116
117    /// Opens the zip archive if not already open.
118    fn ensure_archive(&mut self) -> Result<&mut ZipArchive<File>, RecordReadError> {
119        if self.archive.is_none() {
120            let store_path = self.run_dir.join(STORE_ZIP_FILE_NAME);
121            let file = File::open(&store_path).map_err(|error| RecordReadError::OpenArchive {
122                path: store_path,
123                error,
124            })?;
125            let archive =
126                ZipArchive::new(file).map_err(|error| RecordReadError::ReadArchiveFile {
127                    file_name: STORE_ZIP_FILE_NAME.to_string(),
128                    error,
129                })?;
130            self.archive = Some(archive);
131        }
132        Ok(self.archive.as_mut().expect("archive was just set"))
133    }
134
135    /// Reads a file from the archive as bytes, with size limit.
136    ///
137    /// The size limit prevents malicious archives from causing OOM by
138    /// specifying a huge decompressed size. The limit is checked against the
139    /// claimed size in the ZIP header, and `take()` is used during decompression
140    /// to guard against spoofed headers.
141    ///
142    /// Since nextest controls archive creation, any mismatch between the header
143    /// size and actual size indicates corruption or tampering.
144    fn read_archive_file(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
145        let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
146        let archive = self.ensure_archive()?;
147        let file =
148            archive
149                .by_name(file_name)
150                .map_err(|error| RecordReadError::ReadArchiveFile {
151                    file_name: file_name.to_string(),
152                    error,
153                })?;
154
155        let claimed_size = file.size();
156        if claimed_size > limit {
157            return Err(RecordReadError::FileTooLarge {
158                file_name: file_name.to_string(),
159                size: claimed_size,
160                limit,
161            });
162        }
163
164        let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
165        let mut contents = Vec::with_capacity(capacity);
166
167        file.take(limit)
168            .read_to_end(&mut contents)
169            .map_err(|error| RecordReadError::Decompress {
170                file_name: file_name.to_string(),
171                error,
172            })?;
173
174        let actual_size = contents.len() as u64;
175        if actual_size != claimed_size {
176            return Err(RecordReadError::SizeMismatch {
177                file_name: file_name.to_string(),
178                claimed_size,
179                actual_size,
180            });
181        }
182
183        Ok(contents)
184    }
185
186    /// Returns the cargo metadata JSON from the archive.
187    pub fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
188        let bytes = self.read_archive_file(CARGO_METADATA_JSON_PATH)?;
189        String::from_utf8(bytes).map_err(|e| RecordReadError::Decompress {
190            file_name: CARGO_METADATA_JSON_PATH.to_string(),
191            error: std::io::Error::new(std::io::ErrorKind::InvalidData, e),
192        })
193    }
194
195    /// Returns the test list from the archive.
196    pub fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
197        let bytes = self.read_archive_file(TEST_LIST_JSON_PATH)?;
198        serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
199            file_name: TEST_LIST_JSON_PATH.to_string(),
200            error,
201        })
202    }
203
204    /// Returns the record options from the archive.
205    pub fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
206        let bytes = self.read_archive_file(RECORD_OPTS_JSON_PATH)?;
207        serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
208            file_name: RECORD_OPTS_JSON_PATH.to_string(),
209            error,
210        })
211    }
212
213    /// Returns the rerun info from the archive, if this is a rerun.
214    ///
215    /// Returns `Ok(None)` if this run is not a rerun (the file doesn't exist).
216    /// Returns `Err` if the file exists but cannot be read or parsed.
217    pub fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
218        match self.read_archive_file(RERUN_INFO_JSON_PATH) {
219            Ok(bytes) => {
220                let info = serde_json::from_slice(&bytes).map_err(|error| {
221                    RecordReadError::DeserializeMetadata {
222                        file_name: RERUN_INFO_JSON_PATH.to_string(),
223                        error,
224                    }
225                })?;
226                Ok(Some(info))
227            }
228            Err(RecordReadError::ReadArchiveFile {
229                error: ZipError::FileNotFound,
230                ..
231            }) => {
232                // File doesn't exist; this is not a rerun.
233                Ok(None)
234            }
235            Err(e) => Err(e),
236        }
237    }
238
239    /// Loads the dictionaries from the archive.
240    ///
241    /// This must be called before reading output files. The dictionaries are
242    /// used for decompressing test output.
243    ///
244    /// Note: The store format version is checked before opening the archive,
245    /// using the `store_format_version` field in runs.json.zst. This method
246    /// assumes the version has already been validated.
247    pub fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
248        self.stdout_dict = Some(self.read_archive_file(STDOUT_DICT_PATH)?);
249        self.stderr_dict = Some(self.read_archive_file(STDERR_DICT_PATH)?);
250        Ok(())
251    }
252
253    /// Returns an iterator over events in the run log.
254    ///
255    /// Events are read one at a time from the zstd-compressed JSON Lines file.
256    pub fn events(&self) -> Result<RecordEventIter, RecordReadError> {
257        let log_path = self.run_dir.join(RUN_LOG_FILE_NAME);
258        let file = File::open(&log_path).map_err(|error| RecordReadError::OpenRunLog {
259            path: log_path.clone(),
260            error,
261        })?;
262        let decoder =
263            zstd::stream::Decoder::new(file).map_err(|error| RecordReadError::OpenRunLog {
264                path: log_path,
265                error,
266            })?;
267        Ok(RecordEventIter {
268            reader: DebugIgnore(BufReader::new(decoder)),
269            line_buf: String::new(),
270            line_number: 0,
271        })
272    }
273
274    /// Reads output for a specific file from the archive.
275    ///
276    /// The `file_name` should be the value from `ZipStoreOutput::file_name`,
277    /// e.g., "test-abc123-1-stdout".
278    ///
279    /// The [`OutputFileName`](crate::record::OutputFileName) type ensures that
280    /// file names are validated during deserialization, preventing path traversal.
281    ///
282    /// # Panics
283    ///
284    /// Panics if [`load_dictionaries`](Self::load_dictionaries) has not been called first.
285    pub fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
286        let path = format!("out/{file_name}");
287        let compressed = self.read_archive_file(&path)?;
288        let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
289
290        // Output files are stored pre-compressed with zstd dictionaries.
291        // Unknown file types indicate a format revision that should have been
292        // rejected during version validation.
293        let dict_bytes = self.get_dict_for_output(file_name).ok_or_else(|| {
294            RecordReadError::UnknownOutputType {
295                file_name: file_name.to_owned(),
296            }
297        })?;
298
299        decompress_with_dict(&compressed, dict_bytes, limit).map_err(|error| {
300            RecordReadError::Decompress {
301                file_name: path,
302                error,
303            }
304        })
305    }
306
307    /// Returns the dictionary bytes for the given output file name, if known.
308    ///
309    /// Returns `None` for unknown file types, which indicates a format revision
310    /// that should have been rejected during version validation.
311    ///
312    /// # Panics
313    ///
314    /// Panics if [`load_dictionaries`](Self::load_dictionaries) has not been called first.
315    fn get_dict_for_output(&self, file_name: &str) -> Option<&[u8]> {
316        match OutputDict::for_output_file_name(file_name) {
317            OutputDict::Stdout => Some(
318                self.stdout_dict
319                    .as_ref()
320                    .expect("load_dictionaries must be called first"),
321            ),
322            OutputDict::Stderr => Some(
323                self.stderr_dict
324                    .as_ref()
325                    .expect("load_dictionaries must be called first"),
326            ),
327            OutputDict::None => None,
328        }
329    }
330}
331
332impl StoreReader for RecordReader {
333    fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
334        RecordReader::read_cargo_metadata(self)
335    }
336
337    fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
338        RecordReader::read_test_list(self)
339    }
340
341    fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
342        RecordReader::read_record_opts(self)
343    }
344
345    fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
346        RecordReader::read_rerun_info(self)
347    }
348
349    fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
350        RecordReader::load_dictionaries(self)
351    }
352
353    fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
354        RecordReader::read_output(self, file_name)
355    }
356
357    fn extract_file_to_path(
358        &mut self,
359        store_path: &str,
360        output_path: &Utf8Path,
361    ) -> Result<u64, RecordReadError> {
362        let archive = self.ensure_archive()?;
363        let mut file =
364            archive
365                .by_name(store_path)
366                .map_err(|error| RecordReadError::ReadArchiveFile {
367                    file_name: store_path.to_owned(),
368                    error,
369                })?;
370
371        let mut output_file =
372            File::create(output_path).map_err(|error| RecordReadError::ExtractFile {
373                store_path: store_path.to_owned(),
374                output_path: output_path.to_owned(),
375                error,
376            })?;
377
378        io::copy(&mut file, &mut output_file).map_err(|error| RecordReadError::ExtractFile {
379            store_path: store_path.to_owned(),
380            output_path: output_path.to_owned(),
381            error,
382        })
383    }
384}
385
386/// Decompresses data using a pre-trained zstd dictionary, with a size limit.
387///
388/// The limit prevents compression bombs where a small compressed payload
389/// expands to an extremely large decompressed output.
390pub(super) fn decompress_with_dict(
391    compressed: &[u8],
392    dict_bytes: &[u8],
393    limit: u64,
394) -> std::io::Result<Vec<u8>> {
395    let dict = zstd::dict::DecoderDictionary::copy(dict_bytes);
396    let decoder = zstd::stream::Decoder::with_prepared_dictionary(compressed, &dict)?;
397    let mut decompressed = Vec::new();
398    decoder.take(limit).read_to_end(&mut decompressed)?;
399    Ok(decompressed)
400}
401
402/// Zstd decoder reading from a file.
403type LogDecoder = zstd::stream::Decoder<'static, BufReader<File>>;
404
405/// Iterator over recorded events.
406///
407/// Reads events one at a time from the zstd-compressed JSON Lines run log.
408#[derive(Debug)]
409pub struct RecordEventIter {
410    reader: DebugIgnore<BufReader<LogDecoder>>,
411    line_buf: String,
412    line_number: usize,
413}
414
415impl Iterator for RecordEventIter {
416    type Item = Result<TestEventSummary<ZipStoreOutput>, RecordReadError>;
417
418    fn next(&mut self) -> Option<Self::Item> {
419        loop {
420            self.line_buf.clear();
421            self.line_number += 1;
422
423            match self.reader.read_line(&mut self.line_buf) {
424                Ok(0) => return None,
425                Ok(_) => {
426                    let trimmed = self.line_buf.trim();
427                    if trimmed.is_empty() {
428                        continue;
429                    }
430                    return Some(serde_json::from_str(trimmed).map_err(|error| {
431                        RecordReadError::ParseEvent {
432                            line_number: self.line_number,
433                            error,
434                        }
435                    }));
436                }
437                Err(error) => {
438                    return Some(Err(RecordReadError::ReadRunLog {
439                        line_number: self.line_number,
440                        error,
441                    }));
442                }
443            }
444        }
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451
452    #[test]
453    fn test_record_reader_nonexistent_dir() {
454        let result = RecordReader::open(Utf8Path::new("/nonexistent/path"));
455        assert!(matches!(result, Err(RecordReadError::RunNotFound { .. })));
456    }
457}