1use super::{
22 format::{
23 CARGO_METADATA_JSON_PATH, OutputDict, PORTABLE_MANIFEST_FILE_NAME,
24 PORTABLE_RECORDING_FORMAT_VERSION, PortableManifest, RECORD_OPTS_JSON_PATH,
25 RERUN_INFO_JSON_PATH, RUN_LOG_FILE_NAME, RerunInfo, STDERR_DICT_PATH, STDOUT_DICT_PATH,
26 STORE_FORMAT_VERSION, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH, has_zip_extension,
27 },
28 reader::{StoreReader, decompress_with_dict},
29 store::{RecordedRunInfo, RunFilesExist, StoreRunsDir},
30 summary::{RecordOpts, TestEventSummary, ZipStoreOutput},
31};
32use crate::{
33 errors::{PortableRecordingError, PortableRecordingReadError, RecordReadError},
34 user_config::elements::MAX_MAX_OUTPUT_SIZE,
35};
36use atomicwrites::{AtomicFile, OverwriteBehavior};
37use bytesize::ByteSize;
38use camino::{Utf8Path, Utf8PathBuf};
39use countio::Counter;
40use debug_ignore::DebugIgnore;
41use itertools::Either;
42use nextest_metadata::TestListSummary;
43use std::{
44 borrow::Cow,
45 fs::File,
46 io::{self, BufRead, BufReader, Cursor, Read, Seek, SeekFrom, Write},
47};
48use zip::{
49 CompressionMethod, ZipArchive, ZipWriter, read::ZipFileSeek, result::ZipError,
50 write::SimpleFileOptions,
51};
52
53#[derive(Debug)]
55pub struct PortableRecordingResult {
56 pub path: Utf8PathBuf,
58 pub size: u64,
60}
61
62#[derive(Debug)]
64pub struct ExtractOuterFileResult {
65 pub bytes_written: u64,
67 pub exceeded_limit: Option<u64>,
72}
73
74#[derive(Debug)]
76pub struct PortableRecordingWriter<'a> {
77 run_info: &'a RecordedRunInfo,
78 run_dir: Utf8PathBuf,
79}
80
81impl<'a> PortableRecordingWriter<'a> {
82 pub fn new(
86 run_info: &'a RecordedRunInfo,
87 runs_dir: StoreRunsDir<'_>,
88 ) -> Result<Self, PortableRecordingError> {
89 let run_dir = runs_dir.run_dir(run_info.run_id);
90
91 if !run_dir.exists() {
92 return Err(PortableRecordingError::RunDirNotFound { path: run_dir });
93 }
94
95 let store_zip_path = run_dir.join(STORE_ZIP_FILE_NAME);
96 if !store_zip_path.exists() {
97 return Err(PortableRecordingError::RequiredFileMissing {
98 run_dir,
99 file_name: STORE_ZIP_FILE_NAME,
100 });
101 }
102
103 let run_log_path = run_dir.join(RUN_LOG_FILE_NAME);
104 if !run_log_path.exists() {
105 return Err(PortableRecordingError::RequiredFileMissing {
106 run_dir,
107 file_name: RUN_LOG_FILE_NAME,
108 });
109 }
110
111 Ok(Self { run_info, run_dir })
112 }
113
114 pub fn default_filename(&self) -> String {
118 format!("nextest-run-{}.zip", self.run_info.run_id)
119 }
120
121 pub fn write_to_dir(
126 &self,
127 output_dir: &Utf8Path,
128 ) -> Result<PortableRecordingResult, PortableRecordingError> {
129 let output_path = output_dir.join(self.default_filename());
130 self.write_to_path(&output_path)
131 }
132
133 pub fn write_to_path(
137 &self,
138 output_path: &Utf8Path,
139 ) -> Result<PortableRecordingResult, PortableRecordingError> {
140 let atomic_file = AtomicFile::new(output_path, OverwriteBehavior::AllowOverwrite);
141
142 let final_size = atomic_file
143 .write(|temp_file| {
144 let counter = Counter::new(temp_file);
145 let mut zip_writer = ZipWriter::new(counter);
146
147 self.write_manifest(&mut zip_writer)?;
148 self.copy_file(&mut zip_writer, RUN_LOG_FILE_NAME)?;
149 self.copy_file(&mut zip_writer, STORE_ZIP_FILE_NAME)?;
150
151 let counter = zip_writer
152 .finish()
153 .map_err(PortableRecordingError::ZipFinalize)?;
154
155 let counter_bytes = counter.writer_bytes() as u64;
160 let file = counter.into_inner();
161 let size = file.metadata().map(|m| m.len()).unwrap_or(counter_bytes);
162
163 Ok(size)
164 })
165 .map_err(|err| match err {
166 atomicwrites::Error::Internal(source) => PortableRecordingError::AtomicWrite {
167 path: output_path.to_owned(),
168 source,
169 },
170 atomicwrites::Error::User(e) => e,
171 })?;
172
173 Ok(PortableRecordingResult {
174 path: output_path.to_owned(),
175 size: final_size,
176 })
177 }
178
179 fn write_manifest<W: Write + io::Seek>(
181 &self,
182 zip_writer: &mut ZipWriter<W>,
183 ) -> Result<(), PortableRecordingError> {
184 let manifest = PortableManifest::new(self.run_info);
185 let manifest_json = serde_json::to_vec_pretty(&manifest)
186 .map_err(PortableRecordingError::SerializeManifest)?;
187
188 let options = SimpleFileOptions::default().compression_method(CompressionMethod::Stored);
189
190 zip_writer
191 .start_file(PORTABLE_MANIFEST_FILE_NAME, options)
192 .map_err(|source| PortableRecordingError::ZipStartFile {
193 file_name: PORTABLE_MANIFEST_FILE_NAME,
194 source,
195 })?;
196
197 zip_writer.write_all(&manifest_json).map_err(|source| {
198 PortableRecordingError::ZipWrite {
199 file_name: PORTABLE_MANIFEST_FILE_NAME,
200 source,
201 }
202 })?;
203
204 Ok(())
205 }
206
207 fn copy_file<W: Write + io::Seek>(
212 &self,
213 zip_writer: &mut ZipWriter<W>,
214 file_name: &'static str,
215 ) -> Result<(), PortableRecordingError> {
216 let source_path = self.run_dir.join(file_name);
217 let mut file = File::open(&source_path)
218 .map_err(|source| PortableRecordingError::ReadFile { file_name, source })?;
219
220 let options = SimpleFileOptions::default().compression_method(CompressionMethod::Stored);
221
222 zip_writer
223 .start_file(file_name, options)
224 .map_err(|source| PortableRecordingError::ZipStartFile { file_name, source })?;
225
226 io::copy(&mut file, zip_writer)
227 .map_err(|source| PortableRecordingError::ZipWrite { file_name, source })?;
228
229 Ok(())
230 }
231}
232
233const SPOOL_SIZE_LIMIT: ByteSize = ByteSize(4 * 1024 * 1024 * 1024);
243
244#[cfg(windows)]
251enum WindowsFileKind {
252 Disk,
254 Pipe,
256 Other(u32),
259}
260
261#[cfg(windows)]
263fn classify_windows_handle(file: &File) -> WindowsFileKind {
264 use std::os::windows::io::AsRawHandle;
265 use windows_sys::Win32::Storage::FileSystem::{FILE_TYPE_DISK, FILE_TYPE_PIPE, GetFileType};
266
267 let file_type = unsafe { GetFileType(file.as_raw_handle()) };
269 match file_type {
270 FILE_TYPE_DISK => WindowsFileKind::Disk,
271 FILE_TYPE_PIPE => WindowsFileKind::Pipe,
272 other => WindowsFileKind::Other(other),
273 }
274}
275
276#[cfg(unix)]
279fn is_not_seekable_error(e: &io::Error) -> bool {
280 e.raw_os_error() == Some(libc::ESPIPE)
282}
283
284fn ensure_seekable(file: File, path: &Utf8Path) -> Result<File, PortableRecordingReadError> {
293 ensure_seekable_impl(file, path, SPOOL_SIZE_LIMIT)
294}
295
296fn ensure_seekable_impl(
300 file: File,
301 path: &Utf8Path,
302 spool_limit: ByteSize,
303) -> Result<File, PortableRecordingReadError> {
304 #[cfg(unix)]
307 {
308 let mut file = file;
309 match file.stream_position() {
310 Ok(_) => Ok(file),
311 Err(e) if is_not_seekable_error(&e) => spool_to_temp(file, path, spool_limit),
312 Err(e) => {
313 Err(PortableRecordingReadError::SeekProbe {
316 path: path.to_owned(),
317 error: e,
318 })
319 }
320 }
321 }
322
323 #[cfg(windows)]
327 match classify_windows_handle(&file) {
328 WindowsFileKind::Disk => Ok(file),
329 WindowsFileKind::Pipe => spool_to_temp(file, path, spool_limit),
330 WindowsFileKind::Other(file_type) => Err(PortableRecordingReadError::SeekProbe {
331 path: path.to_owned(),
332 error: io::Error::other(format!(
333 "unexpected file handle type {file_type:#x} (expected disk or pipe)"
334 )),
335 }),
336 }
337}
338
339fn spool_to_temp(
343 file: File,
344 path: &Utf8Path,
345 spool_limit: ByteSize,
346) -> Result<File, PortableRecordingReadError> {
347 let mut temp =
348 camino_tempfile::tempfile().map_err(|error| PortableRecordingReadError::SpoolTempFile {
349 path: path.to_owned(),
350 error,
351 })?;
352
353 let bytes_copied = io::copy(
357 &mut (&file).take(spool_limit.0.saturating_add(1)),
358 &mut temp,
359 )
360 .map_err(|error| PortableRecordingReadError::SpoolTempFile {
361 path: path.to_owned(),
362 error,
363 })?;
364
365 if bytes_copied > spool_limit.0 {
366 return Err(PortableRecordingReadError::SpoolTooLarge {
367 path: path.to_owned(),
368 limit: spool_limit,
369 });
370 }
371
372 temp.seek(SeekFrom::Start(0))
374 .map_err(|error| PortableRecordingReadError::SpoolTempFile {
375 path: path.to_owned(),
376 error,
377 })?;
378
379 Ok(temp)
380}
381
382type ArchiveReadStorage = Either<File, Cursor<Vec<u8>>>;
387
388#[derive(Debug)]
390pub struct PortableRecording {
391 archive_path: Utf8PathBuf,
392 manifest: PortableManifest,
393 outer_archive: ZipArchive<ArchiveReadStorage>,
394}
395
396impl RunFilesExist for PortableRecording {
397 fn store_zip_exists(&self) -> bool {
398 self.outer_archive
399 .index_for_name(STORE_ZIP_FILE_NAME)
400 .is_some()
401 }
402
403 fn run_log_exists(&self) -> bool {
404 self.outer_archive
405 .index_for_name(RUN_LOG_FILE_NAME)
406 .is_some()
407 }
408}
409
410impl PortableRecording {
411 pub fn open(path: &Utf8Path) -> Result<Self, PortableRecordingReadError> {
421 let file = File::open(path).map_err(|error| PortableRecordingReadError::OpenArchive {
422 path: path.to_owned(),
423 error,
424 })?;
425
426 let file = ensure_seekable(file, path)?;
429
430 let mut outer_archive = ZipArchive::new(Either::Left(file)).map_err(|error| {
431 PortableRecordingReadError::ReadArchive {
432 path: path.to_owned(),
433 error,
434 }
435 })?;
436
437 if outer_archive
439 .index_for_name(PORTABLE_MANIFEST_FILE_NAME)
440 .is_some()
441 {
442 return Self::open_validated(path, outer_archive);
443 }
444
445 let mut file_count = 0;
449 let mut zip_count = 0;
450 let mut zip_file: Option<String> = None;
451 for name in outer_archive.file_names() {
452 if name.ends_with('/') || name.ends_with('\\') {
453 continue;
455 }
456 file_count += 1;
457 if has_zip_extension(Utf8Path::new(name)) {
458 zip_count += 1;
459 if zip_count == 1 {
460 zip_file = Some(name.to_owned());
461 }
462 }
463 }
464
465 if let Some(inner_name) = zip_file.filter(|_| file_count == 1 && zip_count == 1) {
466 let inner_bytes = read_outer_file(&mut outer_archive, inner_name.into(), path)?;
471 let inner_archive =
472 ZipArchive::new(Either::Right(Cursor::new(inner_bytes))).map_err(|error| {
473 PortableRecordingReadError::ReadArchive {
474 path: path.to_owned(),
475 error,
476 }
477 })?;
478 Self::open_validated(path, inner_archive)
479 } else {
480 Err(PortableRecordingReadError::NotAWrapperArchive {
481 path: path.to_owned(),
482 file_count,
483 zip_count,
484 })
485 }
486 }
487
488 fn open_validated(
490 path: &Utf8Path,
491 mut outer_archive: ZipArchive<ArchiveReadStorage>,
492 ) -> Result<Self, PortableRecordingReadError> {
493 let manifest_bytes =
495 read_outer_file(&mut outer_archive, PORTABLE_MANIFEST_FILE_NAME.into(), path)?;
496 let manifest: PortableManifest =
497 serde_json::from_slice(&manifest_bytes).map_err(|error| {
498 PortableRecordingReadError::ParseManifest {
499 path: path.to_owned(),
500 error,
501 }
502 })?;
503
504 if let Err(incompatibility) = manifest
506 .format_version
507 .check_readable_by(PORTABLE_RECORDING_FORMAT_VERSION)
508 {
509 return Err(PortableRecordingReadError::UnsupportedFormatVersion {
510 path: path.to_owned(),
511 found: manifest.format_version,
512 supported: PORTABLE_RECORDING_FORMAT_VERSION,
513 incompatibility,
514 });
515 }
516
517 let store_version = manifest.store_format_version();
519 if let Err(incompatibility) = store_version.check_readable_by(STORE_FORMAT_VERSION) {
520 return Err(PortableRecordingReadError::UnsupportedStoreFormatVersion {
521 path: path.to_owned(),
522 found: store_version,
523 supported: STORE_FORMAT_VERSION,
524 incompatibility,
525 });
526 }
527
528 Ok(Self {
529 archive_path: path.to_owned(),
530 manifest,
531 outer_archive,
532 })
533 }
534
535 pub fn archive_path(&self) -> &Utf8Path {
537 &self.archive_path
538 }
539
540 pub fn run_info(&self) -> RecordedRunInfo {
542 self.manifest.run_info()
543 }
544
545 pub fn read_run_log(&mut self) -> Result<PortableRecordingRunLog, PortableRecordingReadError> {
551 let run_log_bytes = read_outer_file(
552 &mut self.outer_archive,
553 RUN_LOG_FILE_NAME.into(),
554 &self.archive_path,
555 )?;
556 Ok(PortableRecordingRunLog {
557 archive_path: self.archive_path.clone(),
558 run_log_bytes,
559 })
560 }
561
562 pub fn extract_outer_file_to_path(
571 &mut self,
572 file_name: &'static str,
573 output_path: &Utf8Path,
574 check_limit: bool,
575 ) -> Result<ExtractOuterFileResult, PortableRecordingReadError> {
576 extract_outer_file_to_path(
577 &mut self.outer_archive,
578 file_name,
579 &self.archive_path,
580 output_path,
581 check_limit,
582 )
583 }
584
585 pub fn open_store(&mut self) -> Result<PortableStoreReader<'_>, PortableRecordingReadError> {
589 let store_handle = self
591 .outer_archive
592 .by_name_seek(STORE_ZIP_FILE_NAME)
593 .map_err(|error| match error {
594 ZipError::FileNotFound => PortableRecordingReadError::MissingFile {
595 path: self.archive_path.clone(),
596 file_name: Cow::Borrowed(STORE_ZIP_FILE_NAME),
597 },
598 _ => PortableRecordingReadError::ReadArchive {
599 path: self.archive_path.clone(),
600 error,
601 },
602 })?;
603
604 let store_archive = ZipArchive::new(store_handle).map_err(|error| {
605 PortableRecordingReadError::ReadArchive {
606 path: self.archive_path.clone(),
607 error,
608 }
609 })?;
610
611 Ok(PortableStoreReader {
612 archive_path: &self.archive_path,
613 store_archive,
614 stdout_dict: None,
615 stderr_dict: None,
616 })
617 }
618}
619
620fn read_outer_file(
622 archive: &mut ZipArchive<ArchiveReadStorage>,
623 file_name: Cow<'static, str>,
624 archive_path: &Utf8Path,
625) -> Result<Vec<u8>, PortableRecordingReadError> {
626 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
627 let file = archive.by_name(&file_name).map_err(|error| match error {
628 ZipError::FileNotFound => PortableRecordingReadError::MissingFile {
629 path: archive_path.to_owned(),
630 file_name: file_name.clone(),
631 },
632 _ => PortableRecordingReadError::ReadArchive {
633 path: archive_path.to_owned(),
634 error,
635 },
636 })?;
637
638 let claimed_size = file.size();
639 if claimed_size > limit {
640 return Err(PortableRecordingReadError::FileTooLarge {
641 path: archive_path.to_owned(),
642 file_name,
643 size: claimed_size,
644 limit,
645 });
646 }
647
648 let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
649 let mut contents = Vec::with_capacity(capacity);
650
651 file.take(limit)
652 .read_to_end(&mut contents)
653 .map_err(|error| PortableRecordingReadError::ReadArchive {
654 path: archive_path.to_owned(),
655 error: ZipError::Io(error),
656 })?;
657
658 Ok(contents)
659}
660
661fn extract_outer_file_to_path(
663 archive: &mut ZipArchive<ArchiveReadStorage>,
664 file_name: &'static str,
665 archive_path: &Utf8Path,
666 output_path: &Utf8Path,
667 check_limit: bool,
668) -> Result<ExtractOuterFileResult, PortableRecordingReadError> {
669 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
670 let mut file = archive.by_name(file_name).map_err(|error| match error {
671 ZipError::FileNotFound => PortableRecordingReadError::MissingFile {
672 path: archive_path.to_owned(),
673 file_name: Cow::Borrowed(file_name),
674 },
675 _ => PortableRecordingReadError::ReadArchive {
676 path: archive_path.to_owned(),
677 error,
678 },
679 })?;
680
681 let claimed_size = file.size();
682 let exceeded_limit = if check_limit && claimed_size > limit {
683 Some(claimed_size)
684 } else {
685 None
686 };
687
688 let mut output_file =
689 File::create(output_path).map_err(|error| PortableRecordingReadError::ExtractFile {
690 archive_path: archive_path.to_owned(),
691 file_name,
692 output_path: output_path.to_owned(),
693 error,
694 })?;
695
696 let bytes_written = io::copy(&mut file, &mut output_file).map_err(|error| {
697 PortableRecordingReadError::ExtractFile {
698 archive_path: archive_path.to_owned(),
699 file_name,
700 output_path: output_path.to_owned(),
701 error,
702 }
703 })?;
704
705 Ok(ExtractOuterFileResult {
706 bytes_written,
707 exceeded_limit,
708 })
709}
710
711#[derive(Debug)]
716pub struct PortableRecordingRunLog {
717 archive_path: Utf8PathBuf,
718 run_log_bytes: Vec<u8>,
719}
720
721impl PortableRecordingRunLog {
722 pub fn events(&self) -> Result<PortableRecordingEventIter<'_>, RecordReadError> {
724 let decoder =
727 zstd::stream::Decoder::with_buffer(&self.run_log_bytes[..]).map_err(|error| {
728 RecordReadError::OpenRunLog {
729 path: self.archive_path.join(RUN_LOG_FILE_NAME),
730 error,
731 }
732 })?;
733 Ok(PortableRecordingEventIter {
734 reader: DebugIgnore(BufReader::new(decoder)),
736 line_buf: String::new(),
737 line_number: 0,
738 })
739 }
740}
741
742#[derive(Debug)]
744pub struct PortableRecordingEventIter<'a> {
745 reader: DebugIgnore<BufReader<zstd::stream::Decoder<'static, &'a [u8]>>>,
746 line_buf: String,
747 line_number: usize,
748}
749
750impl Iterator for PortableRecordingEventIter<'_> {
751 type Item = Result<TestEventSummary<ZipStoreOutput>, RecordReadError>;
752
753 fn next(&mut self) -> Option<Self::Item> {
754 loop {
755 self.line_buf.clear();
756 self.line_number += 1;
757
758 match self.reader.read_line(&mut self.line_buf) {
759 Ok(0) => return None,
760 Ok(_) => {
761 let trimmed = self.line_buf.trim();
762 if trimmed.is_empty() {
763 continue;
764 }
765 return Some(serde_json::from_str(trimmed).map_err(|error| {
766 RecordReadError::ParseEvent {
767 line_number: self.line_number,
768 error,
769 }
770 }));
771 }
772 Err(error) => {
773 return Some(Err(RecordReadError::ReadRunLog {
774 line_number: self.line_number,
775 error,
776 }));
777 }
778 }
779 }
780 }
781}
782
783pub struct PortableStoreReader<'a> {
787 archive_path: &'a Utf8Path,
788 store_archive: ZipArchive<ZipFileSeek<'a, ArchiveReadStorage>>,
789 stdout_dict: Option<Vec<u8>>,
791 stderr_dict: Option<Vec<u8>>,
793}
794
795impl std::fmt::Debug for PortableStoreReader<'_> {
796 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
797 f.debug_struct("PortableStoreReader")
798 .field("archive_path", &self.archive_path)
799 .field("stdout_dict", &self.stdout_dict.as_ref().map(|d| d.len()))
800 .field("stderr_dict", &self.stderr_dict.as_ref().map(|d| d.len()))
801 .finish_non_exhaustive()
802 }
803}
804
805impl PortableStoreReader<'_> {
806 fn read_store_file(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
808 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
809 let file = self.store_archive.by_name(file_name).map_err(|error| {
810 RecordReadError::ReadArchiveFile {
811 file_name: file_name.to_string(),
812 error,
813 }
814 })?;
815
816 let claimed_size = file.size();
817 if claimed_size > limit {
818 return Err(RecordReadError::FileTooLarge {
819 file_name: file_name.to_string(),
820 size: claimed_size,
821 limit,
822 });
823 }
824
825 let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
826 let mut contents = Vec::with_capacity(capacity);
827
828 file.take(limit)
829 .read_to_end(&mut contents)
830 .map_err(|error| RecordReadError::Decompress {
831 file_name: file_name.to_string(),
832 error,
833 })?;
834
835 let actual_size = contents.len() as u64;
836 if actual_size != claimed_size {
837 return Err(RecordReadError::SizeMismatch {
838 file_name: file_name.to_string(),
839 claimed_size,
840 actual_size,
841 });
842 }
843
844 Ok(contents)
845 }
846
847 fn get_dict_for_output(&self, file_name: &str) -> Option<&[u8]> {
849 match OutputDict::for_output_file_name(file_name) {
850 OutputDict::Stdout => Some(
851 self.stdout_dict
852 .as_ref()
853 .expect("load_dictionaries must be called first"),
854 ),
855 OutputDict::Stderr => Some(
856 self.stderr_dict
857 .as_ref()
858 .expect("load_dictionaries must be called first"),
859 ),
860 OutputDict::None => None,
861 }
862 }
863}
864
865impl StoreReader for PortableStoreReader<'_> {
866 fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
867 let bytes = self.read_store_file(CARGO_METADATA_JSON_PATH)?;
868 String::from_utf8(bytes).map_err(|e| RecordReadError::Decompress {
869 file_name: CARGO_METADATA_JSON_PATH.to_string(),
870 error: std::io::Error::new(std::io::ErrorKind::InvalidData, e),
871 })
872 }
873
874 fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
875 let bytes = self.read_store_file(TEST_LIST_JSON_PATH)?;
876 serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
877 file_name: TEST_LIST_JSON_PATH.to_string(),
878 error,
879 })
880 }
881
882 fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
883 let bytes = self.read_store_file(RECORD_OPTS_JSON_PATH)?;
884 serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
885 file_name: RECORD_OPTS_JSON_PATH.to_string(),
886 error,
887 })
888 }
889
890 fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
891 match self.read_store_file(RERUN_INFO_JSON_PATH) {
892 Ok(bytes) => {
893 let info = serde_json::from_slice(&bytes).map_err(|error| {
894 RecordReadError::DeserializeMetadata {
895 file_name: RERUN_INFO_JSON_PATH.to_string(),
896 error,
897 }
898 })?;
899 Ok(Some(info))
900 }
901 Err(RecordReadError::ReadArchiveFile {
902 error: ZipError::FileNotFound,
903 ..
904 }) => {
905 Ok(None)
907 }
908 Err(e) => Err(e),
909 }
910 }
911
912 fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
913 self.stdout_dict = Some(self.read_store_file(STDOUT_DICT_PATH)?);
914 self.stderr_dict = Some(self.read_store_file(STDERR_DICT_PATH)?);
915 Ok(())
916 }
917
918 fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
919 let path = format!("out/{file_name}");
920 let compressed = self.read_store_file(&path)?;
921 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
922
923 let dict_bytes = self.get_dict_for_output(file_name).ok_or_else(|| {
924 RecordReadError::UnknownOutputType {
925 file_name: file_name.to_owned(),
926 }
927 })?;
928
929 decompress_with_dict(&compressed, dict_bytes, limit).map_err(|error| {
930 RecordReadError::Decompress {
931 file_name: path,
932 error,
933 }
934 })
935 }
936
937 fn extract_file_to_path(
938 &mut self,
939 store_path: &str,
940 output_path: &Utf8Path,
941 ) -> Result<u64, RecordReadError> {
942 let mut file = self.store_archive.by_name(store_path).map_err(|error| {
943 RecordReadError::ReadArchiveFile {
944 file_name: store_path.to_owned(),
945 error,
946 }
947 })?;
948
949 let mut output_file =
950 File::create(output_path).map_err(|error| RecordReadError::ExtractFile {
951 store_path: store_path.to_owned(),
952 output_path: output_path.to_owned(),
953 error,
954 })?;
955
956 io::copy(&mut file, &mut output_file).map_err(|error| RecordReadError::ExtractFile {
957 store_path: store_path.to_owned(),
958 output_path: output_path.to_owned(),
959 error,
960 })
961 }
962}
963
964#[cfg(test)]
965mod tests {
966 use super::*;
967 use crate::record::{
968 format::{PORTABLE_RECORDING_FORMAT_VERSION, STORE_FORMAT_VERSION},
969 store::{CompletedRunStats, RecordedRunStatus, RecordedSizes},
970 };
971 use camino_tempfile::{NamedUtf8TempFile, Utf8TempDir};
972 use chrono::Local;
973 use quick_junit::ReportUuid;
974 use semver::Version;
975 use std::{collections::BTreeMap, io::Read};
976 use zip::ZipArchive;
977
978 fn create_test_run_dir(run_id: ReportUuid) -> (Utf8TempDir, Utf8PathBuf) {
979 let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
980 let runs_dir = temp_dir.path().to_owned();
981 let run_dir = runs_dir.join(run_id.to_string());
982 std::fs::create_dir_all(&run_dir).expect("create run dir");
983
984 let store_path = run_dir.join(STORE_ZIP_FILE_NAME);
985 let store_file = File::create(&store_path).expect("create store.zip");
986 let mut zip_writer = ZipWriter::new(store_file);
987 zip_writer
988 .start_file("test.txt", SimpleFileOptions::default())
989 .expect("start file");
990 zip_writer
991 .write_all(b"test content")
992 .expect("write content");
993 zip_writer.finish().expect("finish zip");
994
995 let log_path = run_dir.join(RUN_LOG_FILE_NAME);
996 let log_file = File::create(&log_path).expect("create run.log.zst");
997 let mut encoder = zstd::stream::Encoder::new(log_file, 3).expect("create encoder");
998 encoder.write_all(b"test log content").expect("write log");
999 encoder.finish().expect("finish encoder");
1000
1001 (temp_dir, runs_dir)
1002 }
1003
1004 fn create_test_run_info(run_id: ReportUuid) -> RecordedRunInfo {
1005 let now = Local::now().fixed_offset();
1006 RecordedRunInfo {
1007 run_id,
1008 store_format_version: STORE_FORMAT_VERSION,
1009 nextest_version: Version::new(0, 9, 111),
1010 started_at: now,
1011 last_written_at: now,
1012 duration_secs: Some(12.345),
1013 cli_args: vec!["cargo".to_owned(), "nextest".to_owned(), "run".to_owned()],
1014 build_scope_args: vec!["--workspace".to_owned()],
1015 env_vars: BTreeMap::from([("CARGO_TERM_COLOR".to_owned(), "always".to_owned())]),
1016 parent_run_id: None,
1017 sizes: RecordedSizes::default(),
1018 status: RecordedRunStatus::Completed(CompletedRunStats {
1019 initial_run_count: 10,
1020 passed: 9,
1021 failed: 1,
1022 exit_code: 100,
1023 }),
1024 }
1025 }
1026
1027 #[test]
1028 fn test_default_filename() {
1029 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1030 let (_temp_dir, runs_dir) = create_test_run_dir(run_id);
1031 let run_info = create_test_run_info(run_id);
1032
1033 let writer = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir))
1034 .expect("create writer");
1035
1036 assert_eq!(
1037 writer.default_filename(),
1038 "nextest-run-550e8400-e29b-41d4-a716-446655440000.zip"
1039 );
1040 }
1041
1042 #[test]
1043 fn test_write_portable_recording() {
1044 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1045 let (_temp_dir, runs_dir) = create_test_run_dir(run_id);
1046 let run_info = create_test_run_info(run_id);
1047
1048 let writer = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir))
1049 .expect("create writer");
1050
1051 let output_dir = camino_tempfile::tempdir().expect("create output dir");
1052
1053 let result = writer
1054 .write_to_dir(output_dir.path())
1055 .expect("write archive");
1056
1057 assert!(result.path.exists());
1058 assert!(result.size > 0);
1059
1060 let actual_size = std::fs::metadata(&result.path)
1062 .expect("get file metadata")
1063 .len();
1064 assert_eq!(
1065 result.size, actual_size,
1066 "reported size should match actual file size"
1067 );
1068
1069 assert_eq!(
1070 result.path.file_name(),
1071 Some("nextest-run-550e8400-e29b-41d4-a716-446655440000.zip")
1072 );
1073
1074 let archive_file = File::open(&result.path).expect("open archive");
1075 let mut archive = ZipArchive::new(archive_file).expect("read archive");
1076
1077 assert_eq!(archive.len(), 3);
1078
1079 {
1080 let mut manifest_file = archive
1081 .by_name(PORTABLE_MANIFEST_FILE_NAME)
1082 .expect("manifest");
1083 let mut manifest_content = String::new();
1084 manifest_file
1085 .read_to_string(&mut manifest_content)
1086 .expect("read manifest");
1087 let manifest: PortableManifest =
1088 serde_json::from_str(&manifest_content).expect("parse manifest");
1089 assert_eq!(manifest.format_version, PORTABLE_RECORDING_FORMAT_VERSION);
1090 assert_eq!(manifest.run.run_id, run_id);
1091 }
1092
1093 {
1094 let store_file = archive.by_name(STORE_ZIP_FILE_NAME).expect("store.zip");
1095 assert!(store_file.size() > 0);
1096 }
1097
1098 {
1099 let log_file = archive.by_name(RUN_LOG_FILE_NAME).expect("run.log.zst");
1100 assert!(log_file.size() > 0);
1101 }
1102 }
1103
1104 #[test]
1105 fn test_missing_run_dir() {
1106 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1107 let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
1108 let runs_dir = temp_dir.path().to_owned();
1109 let run_info = create_test_run_info(run_id);
1110
1111 let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
1112
1113 assert!(matches!(
1114 result,
1115 Err(PortableRecordingError::RunDirNotFound { .. })
1116 ));
1117 }
1118
1119 #[test]
1120 fn test_missing_store_zip() {
1121 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1122 let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
1123 let runs_dir = temp_dir.path().to_owned();
1124 let run_dir = runs_dir.join(run_id.to_string());
1125 std::fs::create_dir_all(&run_dir).expect("create run dir");
1126
1127 let log_path = run_dir.join(RUN_LOG_FILE_NAME);
1128 let log_file = File::create(&log_path).expect("create run.log.zst");
1129 let mut encoder = zstd::stream::Encoder::new(log_file, 3).expect("create encoder");
1130 encoder.write_all(b"test").expect("write");
1131 encoder.finish().expect("finish");
1132
1133 let run_info = create_test_run_info(run_id);
1134 let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
1135
1136 assert!(
1137 matches!(
1138 &result,
1139 Err(PortableRecordingError::RequiredFileMissing { file_name, .. })
1140 if *file_name == STORE_ZIP_FILE_NAME
1141 ),
1142 "expected RequiredFileMissing for store.zip, got {result:?}"
1143 );
1144 }
1145
1146 #[test]
1147 fn test_missing_run_log() {
1148 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1149 let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
1150 let runs_dir = temp_dir.path().to_owned();
1151 let run_dir = runs_dir.join(run_id.to_string());
1152 std::fs::create_dir_all(&run_dir).expect("create run dir");
1153
1154 let store_path = run_dir.join(STORE_ZIP_FILE_NAME);
1155 let store_file = File::create(&store_path).expect("create store.zip");
1156 let mut zip_writer = ZipWriter::new(store_file);
1157 zip_writer
1158 .start_file("test.txt", SimpleFileOptions::default())
1159 .expect("start");
1160 zip_writer.write_all(b"test").expect("write");
1161 zip_writer.finish().expect("finish");
1162
1163 let run_info = create_test_run_info(run_id);
1164 let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
1165
1166 assert!(
1167 matches!(
1168 &result,
1169 Err(PortableRecordingError::RequiredFileMissing { file_name, .. })
1170 if *file_name == RUN_LOG_FILE_NAME
1171 ),
1172 "expected RequiredFileMissing for run.log.zst, got {result:?}"
1173 );
1174 }
1175
1176 #[test]
1177 fn test_ensure_seekable_regular_file() {
1178 let temp = NamedUtf8TempFile::new().expect("created temp file");
1180 let path = temp.path().to_owned();
1181
1182 std::fs::write(&path, b"hello world").expect("wrote to temp file");
1183 let file = File::open(&path).expect("opened temp file");
1184
1185 #[cfg(unix)]
1187 let original_fd = {
1188 use std::os::unix::io::AsRawFd;
1189 file.as_raw_fd()
1190 };
1191
1192 let result = ensure_seekable(file, &path).expect("ensure_seekable succeeded");
1193
1194 #[cfg(unix)]
1196 {
1197 use std::os::unix::io::AsRawFd;
1198 assert_eq!(
1199 result.as_raw_fd(),
1200 original_fd,
1201 "seekable file should be returned as-is"
1202 );
1203 }
1204
1205 let mut contents = String::new();
1207 let mut reader = io::BufReader::new(result);
1208 reader
1209 .read_to_string(&mut contents)
1210 .expect("read file contents");
1211 assert_eq!(contents, "hello world");
1212 }
1213
1214 #[cfg(unix)]
1217 fn pipe_reader_to_file(reader: std::io::PipeReader) -> File {
1218 use std::os::fd::OwnedFd;
1219 File::from(OwnedFd::from(reader))
1220 }
1221
1222 #[cfg(windows)]
1225 fn pipe_reader_to_file(reader: std::io::PipeReader) -> File {
1226 use std::os::windows::io::OwnedHandle;
1227 File::from(OwnedHandle::from(reader))
1228 }
1229
1230 #[test]
1235 fn test_ensure_seekable_pipe() {
1236 let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1237 let test_data = b"zip-like test content for pipe spooling";
1238
1239 pipe_writer.write_all(test_data).expect("wrote to pipe");
1241 drop(pipe_writer);
1242
1243 let pipe_file = pipe_reader_to_file(pipe_reader);
1244
1245 let path = Utf8Path::new("/dev/fd/99");
1246 let result = ensure_seekable(pipe_file, path).expect("ensure_seekable succeeded");
1247
1248 let mut contents = Vec::new();
1250 let mut reader = io::BufReader::new(result);
1251 reader
1252 .read_to_end(&mut contents)
1253 .expect("read spooled contents");
1254 assert_eq!(contents, test_data);
1255 }
1256
1257 #[test]
1263 fn test_ensure_seekable_empty_pipe() {
1264 let (pipe_reader, pipe_writer) = std::io::pipe().expect("created pipe");
1265 drop(pipe_writer);
1267
1268 let pipe_file = pipe_reader_to_file(pipe_reader);
1269 let path = Utf8Path::new("/dev/fd/42");
1270 let mut result = ensure_seekable(pipe_file, path).expect("empty pipe should succeed");
1271
1272 let mut contents = Vec::new();
1273 result.read_to_end(&mut contents).expect("read contents");
1274 assert!(contents.is_empty());
1275 }
1276
1277 #[test]
1282 fn test_ensure_seekable_spool_too_large() {
1283 let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1284
1285 pipe_writer
1287 .write_all(b"01234567890123456789")
1288 .expect("wrote to pipe");
1289 drop(pipe_writer);
1290
1291 let pipe_file = pipe_reader_to_file(pipe_reader);
1292
1293 let path = Utf8Path::new("/dev/fd/42");
1294 let result = ensure_seekable_impl(pipe_file, path, ByteSize(10));
1295 assert!(
1296 matches!(
1297 &result,
1298 Err(PortableRecordingReadError::SpoolTooLarge {
1299 limit: ByteSize(10),
1300 ..
1301 })
1302 ),
1303 "expected SpoolTooLarge, got {result:?}"
1304 );
1305 }
1306
1307 #[test]
1312 fn test_ensure_seekable_spool_one_over_limit() {
1313 let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1314
1315 pipe_writer
1317 .write_all(b"01234567890")
1318 .expect("wrote to pipe");
1319 drop(pipe_writer);
1320
1321 let pipe_file = pipe_reader_to_file(pipe_reader);
1322
1323 let path = Utf8Path::new("/dev/fd/42");
1324 let result = ensure_seekable_impl(pipe_file, path, ByteSize(10));
1325 assert!(
1326 matches!(
1327 &result,
1328 Err(PortableRecordingReadError::SpoolTooLarge {
1329 limit: ByteSize(10),
1330 ..
1331 })
1332 ),
1333 "expected SpoolTooLarge at limit+1 bytes, got {result:?}"
1334 );
1335 }
1336
1337 #[test]
1339 fn test_ensure_seekable_spool_exact_limit() {
1340 let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1341
1342 pipe_writer.write_all(b"0123456789").expect("wrote to pipe");
1344 drop(pipe_writer);
1345
1346 let pipe_file = pipe_reader_to_file(pipe_reader);
1347
1348 let path = Utf8Path::new("/dev/fd/42");
1349 let mut result = ensure_seekable_impl(pipe_file, path, ByteSize(10))
1350 .expect("exact limit should succeed");
1351
1352 let mut contents = Vec::new();
1354 result.read_to_end(&mut contents).expect("read contents");
1355 assert_eq!(contents, b"0123456789");
1356 }
1357}