1use super::{
13 format::{
14 CARGO_METADATA_JSON_PATH, OutputDict, RECORD_OPTS_JSON_PATH, RUN_LOG_FILE_NAME,
15 STDERR_DICT_PATH, STDOUT_DICT_PATH, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH,
16 },
17 summary::{RecordOpts, TestEventSummary},
18};
19use crate::{
20 errors::RecordReadError,
21 output_spec::RecordingSpec,
22 record::format::{RERUN_INFO_JSON_PATH, RerunInfo},
23 user_config::elements::MAX_MAX_OUTPUT_SIZE,
24};
25use camino::{Utf8Path, Utf8PathBuf};
26use debug_ignore::DebugIgnore;
27use eazip::Archive;
28use nextest_metadata::TestListSummary;
29use std::{
30 fs::File,
31 io::{self, BufRead, BufReader, Read},
32};
33
34pub trait StoreReader {
40 fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError>;
42
43 fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError>;
45
46 fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError>;
48
49 fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError>;
53
54 fn load_dictionaries(&mut self) -> Result<(), RecordReadError>;
58
59 fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError>;
68
69 fn extract_file_to_path(
74 &mut self,
75 store_path: &str,
76 output_path: &Utf8Path,
77 ) -> Result<u64, RecordReadError>;
78}
79
80#[derive(Debug)]
85pub struct RecordReader {
86 run_dir: Utf8PathBuf,
87 archive: Option<DebugIgnore<Archive<BufReader<File>>>>,
88 stdout_dict: Option<Vec<u8>>,
90 stderr_dict: Option<Vec<u8>>,
92}
93
94impl RecordReader {
95 pub fn open(run_dir: &Utf8Path) -> Result<Self, RecordReadError> {
99 if !run_dir.exists() {
100 return Err(RecordReadError::RunNotFound {
101 path: run_dir.to_owned(),
102 });
103 }
104
105 Ok(Self {
106 run_dir: run_dir.to_owned(),
107 archive: None,
108 stdout_dict: None,
109 stderr_dict: None,
110 })
111 }
112
113 pub fn run_dir(&self) -> &Utf8Path {
115 &self.run_dir
116 }
117
118 fn ensure_archive(&mut self) -> Result<&mut Archive<BufReader<File>>, RecordReadError> {
120 if self.archive.is_none() {
121 let store_path = self.run_dir.join(STORE_ZIP_FILE_NAME);
122 let file = File::open(&store_path).map_err(|error| RecordReadError::OpenArchive {
123 path: store_path.clone(),
124 error,
125 })?;
126 let archive = Archive::new(BufReader::new(file)).map_err(|error| {
127 RecordReadError::ParseArchive {
128 path: store_path,
129 error,
130 }
131 })?;
132 self.archive = Some(DebugIgnore(archive));
133 }
134 Ok(self.archive.as_mut().expect("archive was just set"))
135 }
136
137 fn read_archive_file(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
147 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
148 let archive = self.ensure_archive()?;
149 let mut file =
150 archive
151 .get_by_name(file_name)
152 .ok_or_else(|| RecordReadError::FileNotFound {
153 file_name: file_name.to_string(),
154 })?;
155
156 let claimed_size = file.metadata().uncompressed_size;
157 if claimed_size > limit {
158 return Err(RecordReadError::FileTooLarge {
159 file_name: file_name.to_string(),
160 size: claimed_size,
161 limit,
162 });
163 }
164
165 let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
166 let mut contents = Vec::with_capacity(capacity);
167
168 file.read()
171 .and_then(|reader| reader.take(limit).read_to_end(&mut contents))
172 .map_err(|error| RecordReadError::Decompress {
173 file_name: file_name.to_string(),
174 error,
175 })?;
176
177 let actual_size = contents.len() as u64;
178 if actual_size != claimed_size {
179 return Err(RecordReadError::SizeMismatch {
180 file_name: file_name.to_string(),
181 claimed_size,
182 actual_size,
183 });
184 }
185
186 Ok(contents)
187 }
188
189 pub fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
191 let bytes = self.read_archive_file(CARGO_METADATA_JSON_PATH)?;
192 String::from_utf8(bytes).map_err(|e| RecordReadError::Decompress {
193 file_name: CARGO_METADATA_JSON_PATH.to_string(),
194 error: std::io::Error::new(std::io::ErrorKind::InvalidData, e),
195 })
196 }
197
198 pub fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
200 let bytes = self.read_archive_file(TEST_LIST_JSON_PATH)?;
201 serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
202 file_name: TEST_LIST_JSON_PATH.to_string(),
203 error,
204 })
205 }
206
207 pub fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
209 let bytes = self.read_archive_file(RECORD_OPTS_JSON_PATH)?;
210 serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
211 file_name: RECORD_OPTS_JSON_PATH.to_string(),
212 error,
213 })
214 }
215
216 pub fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
221 match self.read_archive_file(RERUN_INFO_JSON_PATH) {
222 Ok(bytes) => {
223 let info = serde_json::from_slice(&bytes).map_err(|error| {
224 RecordReadError::DeserializeMetadata {
225 file_name: RERUN_INFO_JSON_PATH.to_string(),
226 error,
227 }
228 })?;
229 Ok(Some(info))
230 }
231 Err(RecordReadError::FileNotFound { .. }) => {
232 Ok(None)
234 }
235 Err(e) => Err(e),
236 }
237 }
238
239 pub fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
248 self.stdout_dict = Some(self.read_archive_file(STDOUT_DICT_PATH)?);
249 self.stderr_dict = Some(self.read_archive_file(STDERR_DICT_PATH)?);
250 Ok(())
251 }
252
253 pub fn events(&self) -> Result<RecordEventIter, RecordReadError> {
257 let log_path = self.run_dir.join(RUN_LOG_FILE_NAME);
258 let file = File::open(&log_path).map_err(|error| RecordReadError::OpenRunLog {
259 path: log_path.clone(),
260 error,
261 })?;
262 let decoder =
263 zstd::stream::Decoder::new(file).map_err(|error| RecordReadError::OpenRunLog {
264 path: log_path,
265 error,
266 })?;
267 Ok(RecordEventIter {
268 reader: DebugIgnore(BufReader::new(decoder)),
269 line_buf: String::new(),
270 line_number: 0,
271 })
272 }
273
274 pub fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
286 let path = format!("out/{file_name}");
287 let compressed = self.read_archive_file(&path)?;
288 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
289
290 let dict_bytes = self.get_dict_for_output(file_name).ok_or_else(|| {
294 RecordReadError::UnknownOutputType {
295 file_name: file_name.to_owned(),
296 }
297 })?;
298
299 decompress_with_dict(&compressed, dict_bytes, limit).map_err(|error| {
300 RecordReadError::Decompress {
301 file_name: path,
302 error,
303 }
304 })
305 }
306
307 fn get_dict_for_output(&self, file_name: &str) -> Option<&[u8]> {
316 match OutputDict::for_output_file_name(file_name) {
317 OutputDict::Stdout => Some(
318 self.stdout_dict
319 .as_ref()
320 .expect("load_dictionaries must be called first"),
321 ),
322 OutputDict::Stderr => Some(
323 self.stderr_dict
324 .as_ref()
325 .expect("load_dictionaries must be called first"),
326 ),
327 OutputDict::None => None,
328 }
329 }
330}
331
332impl StoreReader for RecordReader {
333 fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
334 RecordReader::read_cargo_metadata(self)
335 }
336
337 fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
338 RecordReader::read_test_list(self)
339 }
340
341 fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
342 RecordReader::read_record_opts(self)
343 }
344
345 fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
346 RecordReader::read_rerun_info(self)
347 }
348
349 fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
350 RecordReader::load_dictionaries(self)
351 }
352
353 fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
354 RecordReader::read_output(self, file_name)
355 }
356
357 fn extract_file_to_path(
358 &mut self,
359 store_path: &str,
360 output_path: &Utf8Path,
361 ) -> Result<u64, RecordReadError> {
362 let archive = self.ensure_archive()?;
363 let mut file =
364 archive
365 .get_by_name(store_path)
366 .ok_or_else(|| RecordReadError::FileNotFound {
367 file_name: store_path.to_owned(),
368 })?;
369
370 let mut output_file =
371 File::create(output_path).map_err(|error| RecordReadError::ExtractFile {
372 store_path: store_path.to_owned(),
373 output_path: output_path.to_owned(),
374 error,
375 })?;
376
377 let mut reader = file
379 .read()
380 .map_err(|error| RecordReadError::ReadArchiveFile {
381 file_name: store_path.to_owned(),
382 error,
383 })?;
384
385 io::copy(&mut reader, &mut output_file).map_err(|error| RecordReadError::ExtractFile {
386 store_path: store_path.to_owned(),
387 output_path: output_path.to_owned(),
388 error,
389 })
390 }
391}
392
393pub(super) fn decompress_with_dict(
398 compressed: &[u8],
399 dict_bytes: &[u8],
400 limit: u64,
401) -> std::io::Result<Vec<u8>> {
402 let dict = zstd::dict::DecoderDictionary::copy(dict_bytes);
403 let decoder = zstd::stream::Decoder::with_prepared_dictionary(compressed, &dict)?;
404 let mut decompressed = Vec::new();
405 decoder.take(limit).read_to_end(&mut decompressed)?;
406 Ok(decompressed)
407}
408
409type LogDecoder = zstd::stream::Decoder<'static, BufReader<File>>;
411
412#[derive(Debug)]
416pub struct RecordEventIter {
417 reader: DebugIgnore<BufReader<LogDecoder>>,
418 line_buf: String,
419 line_number: usize,
420}
421
422impl Iterator for RecordEventIter {
423 type Item = Result<TestEventSummary<RecordingSpec>, RecordReadError>;
424
425 fn next(&mut self) -> Option<Self::Item> {
426 loop {
427 self.line_buf.clear();
428 self.line_number += 1;
429
430 match self.reader.read_line(&mut self.line_buf) {
431 Ok(0) => return None,
432 Ok(_) => {
433 let trimmed = self.line_buf.trim();
434 if trimmed.is_empty() {
435 continue;
436 }
437 return Some(serde_json::from_str(trimmed).map_err(|error| {
438 RecordReadError::ParseEvent {
439 line_number: self.line_number,
440 error,
441 }
442 }));
443 }
444 Err(error) => {
445 return Some(Err(RecordReadError::ReadRunLog {
446 line_number: self.line_number,
447 error,
448 }));
449 }
450 }
451 }
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458
459 #[test]
460 fn test_record_reader_nonexistent_dir() {
461 let result = RecordReader::open(Utf8Path::new("/nonexistent/path"));
462 assert!(matches!(result, Err(RecordReadError::RunNotFound { .. })));
463 }
464}