1use super::{
22 format::{
23 CARGO_METADATA_JSON_PATH, OutputDict, PORTABLE_MANIFEST_FILE_NAME,
24 PORTABLE_RECORDING_FORMAT_VERSION, PortableManifest, RECORD_OPTS_JSON_PATH,
25 RERUN_INFO_JSON_PATH, RUN_LOG_FILE_NAME, RerunInfo, STDERR_DICT_PATH, STDOUT_DICT_PATH,
26 STORE_FORMAT_VERSION, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH, has_zip_extension,
27 stored_file_options,
28 },
29 reader::{StoreReader, decompress_with_dict},
30 store::{RecordedRunInfo, RunFilesExist, StoreRunsDir},
31 summary::{RecordOpts, TestEventSummary},
32};
33use crate::{
34 errors::{PortableRecordingError, PortableRecordingReadError, RecordReadError},
35 output_spec::RecordingSpec,
36 user_config::elements::MAX_MAX_OUTPUT_SIZE,
37};
38use atomicwrites::{AtomicFile, OverwriteBehavior};
39use bytesize::ByteSize;
40use camino::{Utf8Path, Utf8PathBuf};
41use countio::Counter;
42use debug_ignore::DebugIgnore;
43use eazip::{Archive, ArchiveWriter, CompressionMethod};
44use itertools::Either;
45use nextest_metadata::TestListSummary;
46use std::{
47 borrow::Cow,
48 fs::File,
49 io::{self, BufRead, BufReader, Cursor, Read, Seek, SeekFrom, Write},
50};
51
52#[derive(Debug)]
54pub struct PortableRecordingResult {
55 pub path: Utf8PathBuf,
57 pub size: u64,
59}
60
61#[derive(Debug)]
63pub struct ExtractOuterFileResult {
64 pub bytes_written: u64,
66 pub exceeded_limit: Option<u64>,
71}
72
73#[derive(Debug)]
75pub struct PortableRecordingWriter<'a> {
76 run_info: &'a RecordedRunInfo,
77 run_dir: Utf8PathBuf,
78}
79
80impl<'a> PortableRecordingWriter<'a> {
81 pub fn new(
85 run_info: &'a RecordedRunInfo,
86 runs_dir: StoreRunsDir<'_>,
87 ) -> Result<Self, PortableRecordingError> {
88 let run_dir = runs_dir.run_dir(run_info.run_id);
89
90 if !run_dir.exists() {
91 return Err(PortableRecordingError::RunDirNotFound { path: run_dir });
92 }
93
94 let store_zip_path = run_dir.join(STORE_ZIP_FILE_NAME);
95 if !store_zip_path.exists() {
96 return Err(PortableRecordingError::RequiredFileMissing {
97 run_dir,
98 file_name: STORE_ZIP_FILE_NAME,
99 });
100 }
101
102 let run_log_path = run_dir.join(RUN_LOG_FILE_NAME);
103 if !run_log_path.exists() {
104 return Err(PortableRecordingError::RequiredFileMissing {
105 run_dir,
106 file_name: RUN_LOG_FILE_NAME,
107 });
108 }
109
110 Ok(Self { run_info, run_dir })
111 }
112
113 pub fn default_filename(&self) -> String {
117 format!("nextest-run-{}.zip", self.run_info.run_id)
118 }
119
120 pub fn write_to_dir(
125 &self,
126 output_dir: &Utf8Path,
127 ) -> Result<PortableRecordingResult, PortableRecordingError> {
128 let output_path = output_dir.join(self.default_filename());
129 self.write_to_path(&output_path)
130 }
131
132 pub fn write_to_path(
136 &self,
137 output_path: &Utf8Path,
138 ) -> Result<PortableRecordingResult, PortableRecordingError> {
139 let atomic_file = AtomicFile::new(output_path, OverwriteBehavior::AllowOverwrite);
140
141 let final_size = atomic_file
142 .write(|temp_file| {
143 let counter = Counter::new(temp_file);
144 let mut zip_writer = ArchiveWriter::new(counter);
145
146 self.write_manifest(&mut zip_writer)?;
147 self.copy_file(&mut zip_writer, RUN_LOG_FILE_NAME)?;
148 self.copy_file(&mut zip_writer, STORE_ZIP_FILE_NAME)?;
149
150 let counter = zip_writer
151 .finish()
152 .map_err(PortableRecordingError::ZipFinalize)?;
153
154 let counter_bytes = counter.writer_bytes() as u64;
159 let file = counter.into_inner();
160 let size = file.metadata().map(|m| m.len()).unwrap_or(counter_bytes);
161
162 Ok(size)
163 })
164 .map_err(|err| match err {
165 atomicwrites::Error::Internal(source) => PortableRecordingError::AtomicWrite {
166 path: output_path.to_owned(),
167 source,
168 },
169 atomicwrites::Error::User(e) => e,
170 })?;
171
172 Ok(PortableRecordingResult {
173 path: output_path.to_owned(),
174 size: final_size,
175 })
176 }
177
178 fn write_manifest<W: Write>(
180 &self,
181 zip_writer: &mut ArchiveWriter<W>,
182 ) -> Result<(), PortableRecordingError> {
183 let manifest = PortableManifest::new(self.run_info);
184 let manifest_json = serde_json::to_vec_pretty(&manifest)
185 .map_err(PortableRecordingError::SerializeManifest)?;
186
187 let options = stored_file_options();
188
189 zip_writer
190 .add_file(PORTABLE_MANIFEST_FILE_NAME, &manifest_json[..], &options)
191 .map_err(|source| PortableRecordingError::ZipWrite {
192 file_name: PORTABLE_MANIFEST_FILE_NAME,
193 source,
194 })?;
195
196 Ok(())
197 }
198
199 fn copy_file<W: Write>(
204 &self,
205 zip_writer: &mut ArchiveWriter<W>,
206 file_name: &'static str,
207 ) -> Result<(), PortableRecordingError> {
208 let source_path = self.run_dir.join(file_name);
209 let mut file = File::open(&source_path)
210 .map_err(|source| PortableRecordingError::ReadFile { file_name, source })?;
211
212 let options = stored_file_options();
213
214 let mut streamer = zip_writer
215 .stream_file(file_name, &options)
216 .map_err(|source| PortableRecordingError::ZipStartFile { file_name, source })?;
217
218 io::copy(&mut file, &mut streamer)
219 .map_err(|source| PortableRecordingError::ZipWrite { file_name, source })?;
220
221 streamer
222 .finish()
223 .map_err(|source| PortableRecordingError::ZipWrite { file_name, source })?;
224
225 Ok(())
226 }
227}
228
229const SPOOL_SIZE_LIMIT: ByteSize = ByteSize(4 * 1024 * 1024 * 1024);
239
240#[cfg(windows)]
247enum WindowsFileKind {
248 Disk,
250 Pipe,
252 Other(u32),
255}
256
257#[cfg(windows)]
259fn classify_windows_handle(file: &File) -> WindowsFileKind {
260 use std::os::windows::io::AsRawHandle;
261 use windows_sys::Win32::Storage::FileSystem::{FILE_TYPE_DISK, FILE_TYPE_PIPE, GetFileType};
262
263 let file_type = unsafe { GetFileType(file.as_raw_handle()) };
265 match file_type {
266 FILE_TYPE_DISK => WindowsFileKind::Disk,
267 FILE_TYPE_PIPE => WindowsFileKind::Pipe,
268 other => WindowsFileKind::Other(other),
269 }
270}
271
272#[cfg(unix)]
275fn is_not_seekable_error(e: &io::Error) -> bool {
276 e.raw_os_error() == Some(libc::ESPIPE)
278}
279
280fn ensure_seekable(file: File, path: &Utf8Path) -> Result<File, PortableRecordingReadError> {
289 ensure_seekable_impl(file, path, SPOOL_SIZE_LIMIT)
290}
291
292fn ensure_seekable_impl(
296 file: File,
297 path: &Utf8Path,
298 spool_limit: ByteSize,
299) -> Result<File, PortableRecordingReadError> {
300 #[cfg(unix)]
303 {
304 let mut file = file;
305 match file.stream_position() {
306 Ok(_) => Ok(file),
307 Err(e) if is_not_seekable_error(&e) => spool_to_temp(file, path, spool_limit),
308 Err(e) => {
309 Err(PortableRecordingReadError::SeekProbe {
312 path: path.to_owned(),
313 error: e,
314 })
315 }
316 }
317 }
318
319 #[cfg(windows)]
323 match classify_windows_handle(&file) {
324 WindowsFileKind::Disk => Ok(file),
325 WindowsFileKind::Pipe => spool_to_temp(file, path, spool_limit),
326 WindowsFileKind::Other(file_type) => Err(PortableRecordingReadError::SeekProbe {
327 path: path.to_owned(),
328 error: io::Error::other(format!(
329 "unexpected file handle type {file_type:#x} (expected disk or pipe)"
330 )),
331 }),
332 }
333}
334
335fn spool_to_temp(
339 file: File,
340 path: &Utf8Path,
341 spool_limit: ByteSize,
342) -> Result<File, PortableRecordingReadError> {
343 let mut temp =
344 camino_tempfile::tempfile().map_err(|error| PortableRecordingReadError::SpoolTempFile {
345 path: path.to_owned(),
346 error,
347 })?;
348
349 let bytes_copied = io::copy(
353 &mut (&file).take(spool_limit.0.saturating_add(1)),
354 &mut temp,
355 )
356 .map_err(|error| PortableRecordingReadError::SpoolTempFile {
357 path: path.to_owned(),
358 error,
359 })?;
360
361 if bytes_copied > spool_limit.0 {
362 return Err(PortableRecordingReadError::SpoolTooLarge {
363 path: path.to_owned(),
364 limit: spool_limit,
365 });
366 }
367
368 temp.seek(SeekFrom::Start(0))
370 .map_err(|error| PortableRecordingReadError::SpoolTempFile {
371 path: path.to_owned(),
372 error,
373 })?;
374
375 Ok(temp)
376}
377
378type ArchiveReadStorage = Either<File, Cursor<Vec<u8>>>;
383
384pub struct PortableRecording {
386 archive_path: Utf8PathBuf,
387 manifest: PortableManifest,
388 outer_archive: Archive<BufReader<ArchiveReadStorage>>,
389}
390
391impl std::fmt::Debug for PortableRecording {
392 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393 f.debug_struct("PortableRecording")
394 .field("archive_path", &self.archive_path)
395 .field("manifest", &self.manifest)
396 .finish_non_exhaustive()
397 }
398}
399
400impl RunFilesExist for PortableRecording {
401 fn store_zip_exists(&self) -> bool {
402 self.outer_archive.index_of(STORE_ZIP_FILE_NAME).is_some()
403 }
404
405 fn run_log_exists(&self) -> bool {
406 self.outer_archive.index_of(RUN_LOG_FILE_NAME).is_some()
407 }
408}
409
410impl PortableRecording {
411 pub fn open(path: &Utf8Path) -> Result<Self, PortableRecordingReadError> {
421 let file = File::open(path).map_err(|error| PortableRecordingReadError::OpenArchive {
422 path: path.to_owned(),
423 error,
424 })?;
425
426 let file = ensure_seekable(file, path)?;
429
430 let mut outer_archive =
431 Archive::new(BufReader::new(Either::Left(file))).map_err(|error| {
432 PortableRecordingReadError::ReadArchive {
433 path: path.to_owned(),
434 error,
435 }
436 })?;
437
438 if outer_archive
440 .index_of(PORTABLE_MANIFEST_FILE_NAME)
441 .is_some()
442 {
443 return Self::open_validated(path, outer_archive);
444 }
445
446 let mut file_count = 0;
450 let mut zip_count = 0;
451 let mut zip_file: Option<String> = None;
452 for metadata in outer_archive.entries() {
453 let name = metadata.name();
454 if name.ends_with('/') || name.ends_with('\\') {
455 continue;
457 }
458 file_count += 1;
459 if has_zip_extension(Utf8Path::new(name)) {
460 zip_count += 1;
461 if zip_count == 1 {
462 zip_file = Some(name.to_owned());
463 }
464 }
465 }
466
467 if let Some(inner_name) = zip_file.filter(|_| file_count == 1 && zip_count == 1) {
468 let inner_bytes = read_outer_file(&mut outer_archive, inner_name.into(), path)?;
473 let inner_archive = Archive::new(BufReader::new(Either::Right(Cursor::new(
474 inner_bytes,
475 ))))
476 .map_err(|error| PortableRecordingReadError::ReadArchive {
477 path: path.to_owned(),
478 error,
479 })?;
480 Self::open_validated(path, inner_archive)
481 } else {
482 Err(PortableRecordingReadError::NotAWrapperArchive {
483 path: path.to_owned(),
484 file_count,
485 zip_count,
486 })
487 }
488 }
489
490 fn open_validated(
492 path: &Utf8Path,
493 mut outer_archive: Archive<BufReader<ArchiveReadStorage>>,
494 ) -> Result<Self, PortableRecordingReadError> {
495 let manifest_bytes =
497 read_outer_file(&mut outer_archive, PORTABLE_MANIFEST_FILE_NAME.into(), path)?;
498 let manifest: PortableManifest =
499 serde_json::from_slice(&manifest_bytes).map_err(|error| {
500 PortableRecordingReadError::ParseManifest {
501 path: path.to_owned(),
502 error,
503 }
504 })?;
505
506 if let Err(incompatibility) = manifest
508 .format_version
509 .check_readable_by(PORTABLE_RECORDING_FORMAT_VERSION)
510 {
511 return Err(PortableRecordingReadError::UnsupportedFormatVersion {
512 path: path.to_owned(),
513 found: manifest.format_version,
514 supported: PORTABLE_RECORDING_FORMAT_VERSION,
515 incompatibility,
516 });
517 }
518
519 let store_version = manifest.store_format_version();
521 if let Err(incompatibility) = store_version.check_readable_by(STORE_FORMAT_VERSION) {
522 return Err(PortableRecordingReadError::UnsupportedStoreFormatVersion {
523 path: path.to_owned(),
524 found: store_version,
525 supported: STORE_FORMAT_VERSION,
526 incompatibility,
527 });
528 }
529
530 Ok(Self {
531 archive_path: path.to_owned(),
532 manifest,
533 outer_archive,
534 })
535 }
536
537 pub fn archive_path(&self) -> &Utf8Path {
539 &self.archive_path
540 }
541
542 pub fn run_info(&self) -> RecordedRunInfo {
544 self.manifest.run_info()
545 }
546
547 pub fn read_run_log(&mut self) -> Result<PortableRecordingRunLog, PortableRecordingReadError> {
553 let run_log_bytes = read_outer_file(
554 &mut self.outer_archive,
555 RUN_LOG_FILE_NAME.into(),
556 &self.archive_path,
557 )?;
558 Ok(PortableRecordingRunLog {
559 archive_path: self.archive_path.clone(),
560 run_log_bytes,
561 })
562 }
563
564 pub fn extract_outer_file_to_path(
573 &mut self,
574 file_name: &'static str,
575 output_path: &Utf8Path,
576 check_limit: bool,
577 ) -> Result<ExtractOuterFileResult, PortableRecordingReadError> {
578 extract_outer_file_to_path(
579 &mut self.outer_archive,
580 file_name,
581 &self.archive_path,
582 output_path,
583 check_limit,
584 )
585 }
586
587 pub fn open_store(&mut self) -> Result<PortableStoreReader<'_>, PortableRecordingReadError> {
594 let file = self
595 .outer_archive
596 .get_by_name(STORE_ZIP_FILE_NAME)
597 .ok_or_else(|| PortableRecordingReadError::MissingFile {
598 path: self.archive_path.clone(),
599 file_name: Cow::Borrowed(STORE_ZIP_FILE_NAME),
600 })?;
601
602 let metadata = file.metadata();
603 if metadata.compression_method != CompressionMethod::STORE {
604 return Err(PortableRecordingReadError::CompressedInnerArchive {
605 archive_path: self.archive_path.clone(),
606 compression: metadata.compression_method,
607 });
608 }
609
610 let reader = file.into_reader();
611 let raw =
612 metadata
613 .read_raw(reader)
614 .map_err(|error| PortableRecordingReadError::ReadArchive {
615 path: self.archive_path.clone(),
616 error,
617 })?;
618
619 let store_archive =
620 Archive::new(raw).map_err(|error| PortableRecordingReadError::ReadArchive {
621 path: self.archive_path.clone(),
622 error,
623 })?;
624
625 Ok(PortableStoreReader {
626 archive_path: &self.archive_path,
627 store_archive,
628 stdout_dict: None,
629 stderr_dict: None,
630 })
631 }
632}
633
634fn read_outer_file(
636 archive: &mut Archive<BufReader<ArchiveReadStorage>>,
637 file_name: Cow<'static, str>,
638 archive_path: &Utf8Path,
639) -> Result<Vec<u8>, PortableRecordingReadError> {
640 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
641 let mut file =
642 archive
643 .get_by_name(&file_name)
644 .ok_or_else(|| PortableRecordingReadError::MissingFile {
645 path: archive_path.to_owned(),
646 file_name: file_name.clone(),
647 })?;
648
649 let claimed_size = file.metadata().uncompressed_size;
650 if claimed_size > limit {
651 return Err(PortableRecordingReadError::FileTooLarge {
652 path: archive_path.to_owned(),
653 file_name,
654 size: claimed_size,
655 limit,
656 });
657 }
658
659 let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
660 let mut contents = Vec::with_capacity(capacity);
661
662 file.read()
663 .and_then(|reader| reader.take(limit).read_to_end(&mut contents))
664 .map_err(|error| PortableRecordingReadError::ReadArchive {
665 path: archive_path.to_owned(),
666 error,
667 })?;
668
669 Ok(contents)
670}
671
672fn extract_outer_file_to_path(
674 archive: &mut Archive<BufReader<ArchiveReadStorage>>,
675 file_name: &'static str,
676 archive_path: &Utf8Path,
677 output_path: &Utf8Path,
678 check_limit: bool,
679) -> Result<ExtractOuterFileResult, PortableRecordingReadError> {
680 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
681 let mut file =
682 archive
683 .get_by_name(file_name)
684 .ok_or_else(|| PortableRecordingReadError::MissingFile {
685 path: archive_path.to_owned(),
686 file_name: Cow::Borrowed(file_name),
687 })?;
688
689 let claimed_size = file.metadata().uncompressed_size;
690 let exceeded_limit = if check_limit && claimed_size > limit {
691 Some(claimed_size)
692 } else {
693 None
694 };
695
696 let mut output_file =
697 File::create(output_path).map_err(|error| PortableRecordingReadError::ExtractFile {
698 archive_path: archive_path.to_owned(),
699 file_name,
700 output_path: output_path.to_owned(),
701 error,
702 })?;
703
704 let mut reader = file
705 .read()
706 .map_err(|error| PortableRecordingReadError::ReadArchive {
707 path: archive_path.to_owned(),
708 error,
709 })?;
710
711 let bytes_written = io::copy(&mut reader, &mut output_file).map_err(|error| {
712 PortableRecordingReadError::ExtractFile {
713 archive_path: archive_path.to_owned(),
714 file_name,
715 output_path: output_path.to_owned(),
716 error,
717 }
718 })?;
719
720 Ok(ExtractOuterFileResult {
721 bytes_written,
722 exceeded_limit,
723 })
724}
725
726#[derive(Debug)]
731pub struct PortableRecordingRunLog {
732 archive_path: Utf8PathBuf,
733 run_log_bytes: Vec<u8>,
734}
735
736impl PortableRecordingRunLog {
737 pub fn events(&self) -> Result<PortableRecordingEventIter<'_>, RecordReadError> {
739 let decoder =
742 zstd::stream::Decoder::with_buffer(&self.run_log_bytes[..]).map_err(|error| {
743 RecordReadError::OpenRunLog {
744 path: self.archive_path.join(RUN_LOG_FILE_NAME),
745 error,
746 }
747 })?;
748 Ok(PortableRecordingEventIter {
749 reader: DebugIgnore(BufReader::new(decoder)),
751 line_buf: String::new(),
752 line_number: 0,
753 })
754 }
755}
756
757#[derive(Debug)]
759pub struct PortableRecordingEventIter<'a> {
760 reader: DebugIgnore<BufReader<zstd::stream::Decoder<'static, &'a [u8]>>>,
761 line_buf: String,
762 line_number: usize,
763}
764
765impl Iterator for PortableRecordingEventIter<'_> {
766 type Item = Result<TestEventSummary<RecordingSpec>, RecordReadError>;
767
768 fn next(&mut self) -> Option<Self::Item> {
769 loop {
770 self.line_buf.clear();
771 self.line_number += 1;
772
773 match self.reader.read_line(&mut self.line_buf) {
774 Ok(0) => return None,
775 Ok(_) => {
776 let trimmed = self.line_buf.trim();
777 if trimmed.is_empty() {
778 continue;
779 }
780 return Some(serde_json::from_str(trimmed).map_err(|error| {
781 RecordReadError::ParseEvent {
782 line_number: self.line_number,
783 error,
784 }
785 }));
786 }
787 Err(error) => {
788 return Some(Err(RecordReadError::ReadRunLog {
789 line_number: self.line_number,
790 error,
791 }));
792 }
793 }
794 }
795 }
796}
797
798pub struct PortableStoreReader<'a> {
803 archive_path: &'a Utf8Path,
804 store_archive: Archive<io::Take<&'a mut BufReader<ArchiveReadStorage>>>,
805 stdout_dict: Option<Vec<u8>>,
807 stderr_dict: Option<Vec<u8>>,
809}
810
811impl std::fmt::Debug for PortableStoreReader<'_> {
812 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
813 f.debug_struct("PortableStoreReader")
814 .field("archive_path", &self.archive_path)
815 .field("stdout_dict", &self.stdout_dict.as_ref().map(|d| d.len()))
816 .field("stderr_dict", &self.stderr_dict.as_ref().map(|d| d.len()))
817 .finish_non_exhaustive()
818 }
819}
820
821impl PortableStoreReader<'_> {
822 fn read_store_file(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
824 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
825 let mut file = self.store_archive.get_by_name(file_name).ok_or_else(|| {
826 RecordReadError::FileNotFound {
827 file_name: file_name.to_string(),
828 }
829 })?;
830
831 let claimed_size = file.metadata().uncompressed_size;
832 if claimed_size > limit {
833 return Err(RecordReadError::FileTooLarge {
834 file_name: file_name.to_string(),
835 size: claimed_size,
836 limit,
837 });
838 }
839
840 let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
841 let mut contents = Vec::with_capacity(capacity);
842
843 file.read()
844 .and_then(|reader| reader.take(limit).read_to_end(&mut contents))
845 .map_err(|error| RecordReadError::Decompress {
846 file_name: file_name.to_string(),
847 error,
848 })?;
849
850 let actual_size = contents.len() as u64;
851 if actual_size != claimed_size {
852 return Err(RecordReadError::SizeMismatch {
853 file_name: file_name.to_string(),
854 claimed_size,
855 actual_size,
856 });
857 }
858
859 Ok(contents)
860 }
861
862 fn get_dict_for_output(&self, file_name: &str) -> Option<&[u8]> {
864 match OutputDict::for_output_file_name(file_name) {
865 OutputDict::Stdout => Some(
866 self.stdout_dict
867 .as_ref()
868 .expect("load_dictionaries must be called first"),
869 ),
870 OutputDict::Stderr => Some(
871 self.stderr_dict
872 .as_ref()
873 .expect("load_dictionaries must be called first"),
874 ),
875 OutputDict::None => None,
876 }
877 }
878}
879
880impl StoreReader for PortableStoreReader<'_> {
881 fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
882 let bytes = self.read_store_file(CARGO_METADATA_JSON_PATH)?;
883 String::from_utf8(bytes).map_err(|e| RecordReadError::Decompress {
884 file_name: CARGO_METADATA_JSON_PATH.to_string(),
885 error: std::io::Error::new(std::io::ErrorKind::InvalidData, e),
886 })
887 }
888
889 fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
890 let bytes = self.read_store_file(TEST_LIST_JSON_PATH)?;
891 serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
892 file_name: TEST_LIST_JSON_PATH.to_string(),
893 error,
894 })
895 }
896
897 fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
898 let bytes = self.read_store_file(RECORD_OPTS_JSON_PATH)?;
899 serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
900 file_name: RECORD_OPTS_JSON_PATH.to_string(),
901 error,
902 })
903 }
904
905 fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
906 match self.read_store_file(RERUN_INFO_JSON_PATH) {
907 Ok(bytes) => {
908 let info = serde_json::from_slice(&bytes).map_err(|error| {
909 RecordReadError::DeserializeMetadata {
910 file_name: RERUN_INFO_JSON_PATH.to_string(),
911 error,
912 }
913 })?;
914 Ok(Some(info))
915 }
916 Err(RecordReadError::FileNotFound { .. }) => {
917 Ok(None)
919 }
920 Err(e) => Err(e),
921 }
922 }
923
924 fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
925 self.stdout_dict = Some(self.read_store_file(STDOUT_DICT_PATH)?);
926 self.stderr_dict = Some(self.read_store_file(STDERR_DICT_PATH)?);
927 Ok(())
928 }
929
930 fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
931 let path = format!("out/{file_name}");
932 let compressed = self.read_store_file(&path)?;
933 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
934
935 let dict_bytes = self.get_dict_for_output(file_name).ok_or_else(|| {
936 RecordReadError::UnknownOutputType {
937 file_name: file_name.to_owned(),
938 }
939 })?;
940
941 decompress_with_dict(&compressed, dict_bytes, limit).map_err(|error| {
942 RecordReadError::Decompress {
943 file_name: path,
944 error,
945 }
946 })
947 }
948
949 fn extract_file_to_path(
950 &mut self,
951 store_path: &str,
952 output_path: &Utf8Path,
953 ) -> Result<u64, RecordReadError> {
954 let mut file = self.store_archive.get_by_name(store_path).ok_or_else(|| {
955 RecordReadError::FileNotFound {
956 file_name: store_path.to_owned(),
957 }
958 })?;
959
960 let mut output_file =
961 File::create(output_path).map_err(|error| RecordReadError::ExtractFile {
962 store_path: store_path.to_owned(),
963 output_path: output_path.to_owned(),
964 error,
965 })?;
966
967 let mut reader = file
968 .read()
969 .map_err(|error| RecordReadError::ReadArchiveFile {
970 file_name: store_path.to_owned(),
971 error,
972 })?;
973
974 io::copy(&mut reader, &mut output_file).map_err(|error| RecordReadError::ExtractFile {
975 store_path: store_path.to_owned(),
976 output_path: output_path.to_owned(),
977 error,
978 })
979 }
980}
981
982#[cfg(test)]
983mod tests {
984 use super::*;
985 use crate::record::{
986 format::{PORTABLE_RECORDING_FORMAT_VERSION, STORE_FORMAT_VERSION},
987 store::{CompletedRunStats, RecordedRunStatus, RecordedSizes},
988 };
989 use camino_tempfile::{NamedUtf8TempFile, Utf8TempDir};
990 use chrono::Local;
991 use eazip::write::FileOptions;
992 use quick_junit::ReportUuid;
993 use semver::Version;
994 use std::{collections::BTreeMap, io::Read};
995
996 fn create_test_run_dir(run_id: ReportUuid) -> (Utf8TempDir, Utf8PathBuf) {
997 let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
998 let runs_dir = temp_dir.path().to_owned();
999 let run_dir = runs_dir.join(run_id.to_string());
1000 std::fs::create_dir_all(&run_dir).expect("create run dir");
1001
1002 let store_path = run_dir.join(STORE_ZIP_FILE_NAME);
1003 let store_file = File::create(&store_path).expect("create store.zip");
1004 let mut zip_writer = ArchiveWriter::new(store_file);
1005 let options = FileOptions::default();
1006 zip_writer
1007 .add_file("test.txt", &b"test content"[..], &options)
1008 .expect("add file");
1009 zip_writer.finish().expect("finish zip");
1010
1011 let log_path = run_dir.join(RUN_LOG_FILE_NAME);
1012 let log_file = File::create(&log_path).expect("create run.log.zst");
1013 let mut encoder = zstd::stream::Encoder::new(log_file, 3).expect("create encoder");
1014 encoder.write_all(b"test log content").expect("write log");
1015 encoder.finish().expect("finish encoder");
1016
1017 (temp_dir, runs_dir)
1018 }
1019
1020 fn create_test_run_info(run_id: ReportUuid) -> RecordedRunInfo {
1021 let now = Local::now().fixed_offset();
1022 RecordedRunInfo {
1023 run_id,
1024 store_format_version: STORE_FORMAT_VERSION,
1025 nextest_version: Version::new(0, 9, 111),
1026 started_at: now,
1027 last_written_at: now,
1028 duration_secs: Some(12.345),
1029 cli_args: vec!["cargo".to_owned(), "nextest".to_owned(), "run".to_owned()],
1030 build_scope_args: vec!["--workspace".to_owned()],
1031 env_vars: BTreeMap::from([("CARGO_TERM_COLOR".to_owned(), "always".to_owned())]),
1032 parent_run_id: None,
1033 sizes: RecordedSizes::default(),
1034 status: RecordedRunStatus::Completed(CompletedRunStats {
1035 initial_run_count: 10,
1036 passed: 9,
1037 failed: 1,
1038 exit_code: 100,
1039 }),
1040 }
1041 }
1042
1043 #[test]
1044 fn test_default_filename() {
1045 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1046 let (_temp_dir, runs_dir) = create_test_run_dir(run_id);
1047 let run_info = create_test_run_info(run_id);
1048
1049 let writer = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir))
1050 .expect("create writer");
1051
1052 assert_eq!(
1053 writer.default_filename(),
1054 "nextest-run-550e8400-e29b-41d4-a716-446655440000.zip"
1055 );
1056 }
1057
1058 #[test]
1059 fn test_write_portable_recording() {
1060 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1061 let (_temp_dir, runs_dir) = create_test_run_dir(run_id);
1062 let run_info = create_test_run_info(run_id);
1063
1064 let writer = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir))
1065 .expect("create writer");
1066
1067 let output_dir = camino_tempfile::tempdir().expect("create output dir");
1068
1069 let result = writer
1070 .write_to_dir(output_dir.path())
1071 .expect("write archive");
1072
1073 assert!(result.path.exists());
1074 assert!(result.size > 0);
1075
1076 let actual_size = std::fs::metadata(&result.path)
1078 .expect("get file metadata")
1079 .len();
1080 assert_eq!(
1081 result.size, actual_size,
1082 "reported size should match actual file size"
1083 );
1084
1085 assert_eq!(
1086 result.path.file_name(),
1087 Some("nextest-run-550e8400-e29b-41d4-a716-446655440000.zip")
1088 );
1089
1090 let archive_file = File::open(&result.path).expect("open archive");
1091 let mut archive = Archive::new(BufReader::new(archive_file)).expect("read archive");
1092
1093 assert_eq!(archive.entries().len(), 3);
1094
1095 {
1096 let mut manifest_file = archive
1097 .get_by_name(PORTABLE_MANIFEST_FILE_NAME)
1098 .expect("manifest");
1099 let mut manifest_content = String::new();
1100 manifest_file
1101 .read()
1102 .expect("get reader")
1103 .read_to_string(&mut manifest_content)
1104 .expect("read manifest");
1105 let manifest: PortableManifest =
1106 serde_json::from_str(&manifest_content).expect("parse manifest");
1107 assert_eq!(manifest.format_version, PORTABLE_RECORDING_FORMAT_VERSION);
1108 assert_eq!(manifest.run.run_id, run_id);
1109 }
1110
1111 {
1112 let store_file = archive.get_by_name(STORE_ZIP_FILE_NAME).expect("store.zip");
1113 assert!(store_file.metadata().uncompressed_size > 0);
1114 }
1115
1116 {
1117 let log_file = archive.get_by_name(RUN_LOG_FILE_NAME).expect("run.log.zst");
1118 assert!(log_file.metadata().uncompressed_size > 0);
1119 }
1120 }
1121
1122 #[test]
1123 fn test_missing_run_dir() {
1124 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1125 let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
1126 let runs_dir = temp_dir.path().to_owned();
1127 let run_info = create_test_run_info(run_id);
1128
1129 let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
1130
1131 assert!(matches!(
1132 result,
1133 Err(PortableRecordingError::RunDirNotFound { .. })
1134 ));
1135 }
1136
1137 #[test]
1138 fn test_missing_store_zip() {
1139 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1140 let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
1141 let runs_dir = temp_dir.path().to_owned();
1142 let run_dir = runs_dir.join(run_id.to_string());
1143 std::fs::create_dir_all(&run_dir).expect("create run dir");
1144
1145 let log_path = run_dir.join(RUN_LOG_FILE_NAME);
1146 let log_file = File::create(&log_path).expect("create run.log.zst");
1147 let mut encoder = zstd::stream::Encoder::new(log_file, 3).expect("create encoder");
1148 encoder.write_all(b"test").expect("write");
1149 encoder.finish().expect("finish");
1150
1151 let run_info = create_test_run_info(run_id);
1152 let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
1153
1154 assert!(
1155 matches!(
1156 &result,
1157 Err(PortableRecordingError::RequiredFileMissing { file_name, .. })
1158 if *file_name == STORE_ZIP_FILE_NAME
1159 ),
1160 "expected RequiredFileMissing for store.zip, got {result:?}"
1161 );
1162 }
1163
1164 #[test]
1165 fn test_missing_run_log() {
1166 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1167 let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
1168 let runs_dir = temp_dir.path().to_owned();
1169 let run_dir = runs_dir.join(run_id.to_string());
1170 std::fs::create_dir_all(&run_dir).expect("create run dir");
1171
1172 let store_path = run_dir.join(STORE_ZIP_FILE_NAME);
1173 let store_file = File::create(&store_path).expect("create store.zip");
1174 let mut zip_writer = ArchiveWriter::new(store_file);
1175 let options = FileOptions::default();
1176 zip_writer
1177 .add_file("test.txt", &b"test"[..], &options)
1178 .expect("add file");
1179 zip_writer.finish().expect("finish");
1180
1181 let run_info = create_test_run_info(run_id);
1182 let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
1183
1184 assert!(
1185 matches!(
1186 &result,
1187 Err(PortableRecordingError::RequiredFileMissing { file_name, .. })
1188 if *file_name == RUN_LOG_FILE_NAME
1189 ),
1190 "expected RequiredFileMissing for run.log.zst, got {result:?}"
1191 );
1192 }
1193
1194 #[test]
1195 fn test_ensure_seekable_regular_file() {
1196 let temp = NamedUtf8TempFile::new().expect("created temp file");
1198 let path = temp.path().to_owned();
1199
1200 std::fs::write(&path, b"hello world").expect("wrote to temp file");
1201 let file = File::open(&path).expect("opened temp file");
1202
1203 #[cfg(unix)]
1205 let original_fd = {
1206 use std::os::unix::io::AsRawFd;
1207 file.as_raw_fd()
1208 };
1209
1210 let result = ensure_seekable(file, &path).expect("ensure_seekable succeeded");
1211
1212 #[cfg(unix)]
1214 {
1215 use std::os::unix::io::AsRawFd;
1216 assert_eq!(
1217 result.as_raw_fd(),
1218 original_fd,
1219 "seekable file should be returned as-is"
1220 );
1221 }
1222
1223 let mut contents = String::new();
1225 let mut reader = io::BufReader::new(result);
1226 reader
1227 .read_to_string(&mut contents)
1228 .expect("read file contents");
1229 assert_eq!(contents, "hello world");
1230 }
1231
1232 #[cfg(unix)]
1235 fn pipe_reader_to_file(reader: std::io::PipeReader) -> File {
1236 use std::os::fd::OwnedFd;
1237 File::from(OwnedFd::from(reader))
1238 }
1239
1240 #[cfg(windows)]
1243 fn pipe_reader_to_file(reader: std::io::PipeReader) -> File {
1244 use std::os::windows::io::OwnedHandle;
1245 File::from(OwnedHandle::from(reader))
1246 }
1247
1248 #[test]
1253 fn test_ensure_seekable_pipe() {
1254 let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1255 let test_data = b"zip-like test content for pipe spooling";
1256
1257 pipe_writer.write_all(test_data).expect("wrote to pipe");
1259 drop(pipe_writer);
1260
1261 let pipe_file = pipe_reader_to_file(pipe_reader);
1262
1263 let path = Utf8Path::new("/dev/fd/99");
1264 let result = ensure_seekable(pipe_file, path).expect("ensure_seekable succeeded");
1265
1266 let mut contents = Vec::new();
1268 let mut reader = io::BufReader::new(result);
1269 reader
1270 .read_to_end(&mut contents)
1271 .expect("read spooled contents");
1272 assert_eq!(contents, test_data);
1273 }
1274
1275 #[test]
1281 fn test_ensure_seekable_empty_pipe() {
1282 let (pipe_reader, pipe_writer) = std::io::pipe().expect("created pipe");
1283 drop(pipe_writer);
1285
1286 let pipe_file = pipe_reader_to_file(pipe_reader);
1287 let path = Utf8Path::new("/dev/fd/42");
1288 let mut result = ensure_seekable(pipe_file, path).expect("empty pipe should succeed");
1289
1290 let mut contents = Vec::new();
1291 result.read_to_end(&mut contents).expect("read contents");
1292 assert!(contents.is_empty());
1293 }
1294
1295 #[test]
1300 fn test_ensure_seekable_spool_too_large() {
1301 let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1302
1303 pipe_writer
1305 .write_all(b"01234567890123456789")
1306 .expect("wrote to pipe");
1307 drop(pipe_writer);
1308
1309 let pipe_file = pipe_reader_to_file(pipe_reader);
1310
1311 let path = Utf8Path::new("/dev/fd/42");
1312 let result = ensure_seekable_impl(pipe_file, path, ByteSize(10));
1313 assert!(
1314 matches!(
1315 &result,
1316 Err(PortableRecordingReadError::SpoolTooLarge {
1317 limit: ByteSize(10),
1318 ..
1319 })
1320 ),
1321 "expected SpoolTooLarge, got {result:?}"
1322 );
1323 }
1324
1325 #[test]
1330 fn test_ensure_seekable_spool_one_over_limit() {
1331 let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1332
1333 pipe_writer
1335 .write_all(b"01234567890")
1336 .expect("wrote to pipe");
1337 drop(pipe_writer);
1338
1339 let pipe_file = pipe_reader_to_file(pipe_reader);
1340
1341 let path = Utf8Path::new("/dev/fd/42");
1342 let result = ensure_seekable_impl(pipe_file, path, ByteSize(10));
1343 assert!(
1344 matches!(
1345 &result,
1346 Err(PortableRecordingReadError::SpoolTooLarge {
1347 limit: ByteSize(10),
1348 ..
1349 })
1350 ),
1351 "expected SpoolTooLarge at limit+1 bytes, got {result:?}"
1352 );
1353 }
1354
1355 #[test]
1357 fn test_ensure_seekable_spool_exact_limit() {
1358 let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1359
1360 pipe_writer.write_all(b"0123456789").expect("wrote to pipe");
1362 drop(pipe_writer);
1363
1364 let pipe_file = pipe_reader_to_file(pipe_reader);
1365
1366 let path = Utf8Path::new("/dev/fd/42");
1367 let mut result = ensure_seekable_impl(pipe_file, path, ByteSize(10))
1368 .expect("exact limit should succeed");
1369
1370 let mut contents = Vec::new();
1372 result.read_to_end(&mut contents).expect("read contents");
1373 assert_eq!(contents, b"0123456789");
1374 }
1375}