Skip to main content

nextest_runner/record/
portable.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Portable archive creation and reading for recorded runs.
5//!
6//! A portable recording packages a single recorded run into a self-contained zip
7//! file that can be shared and imported elsewhere.
8//!
9//! # Reading portable recordings
10//!
11//! Use [`PortableRecording::open`] to open a portable recording for reading. The
12//! archive contains:
13//!
14//! - A manifest (`manifest.json`) with run metadata.
15//! - A run log (`run.log.zst`) with test events.
16//! - An inner store (`store.zip`) with metadata and test output.
17//!
18//! To read from the inner store, call [`PortableRecording::open_store`] to get a
19//! [`PortableStoreReader`] that implements [`StoreReader`](super::reader::StoreReader).
20
21use super::{
22    format::{
23        CARGO_METADATA_JSON_PATH, OutputDict, PORTABLE_MANIFEST_FILE_NAME,
24        PORTABLE_RECORDING_FORMAT_VERSION, PortableManifest, RECORD_OPTS_JSON_PATH,
25        RERUN_INFO_JSON_PATH, RUN_LOG_FILE_NAME, RerunInfo, STDERR_DICT_PATH, STDOUT_DICT_PATH,
26        STORE_FORMAT_VERSION, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH, has_zip_extension,
27        stored_file_options,
28    },
29    reader::{StoreReader, decompress_with_dict},
30    store::{RecordedRunInfo, RunFilesExist, StoreRunsDir},
31    summary::{RecordOpts, TestEventSummary},
32};
33use crate::{
34    errors::{PortableRecordingError, PortableRecordingReadError, RecordReadError},
35    output_spec::RecordingSpec,
36    user_config::elements::MAX_MAX_OUTPUT_SIZE,
37};
38use atomicwrites::{AtomicFile, OverwriteBehavior};
39use bytesize::ByteSize;
40use camino::{Utf8Path, Utf8PathBuf};
41use countio::Counter;
42use debug_ignore::DebugIgnore;
43use eazip::{Archive, ArchiveWriter, CompressionMethod};
44use itertools::Either;
45use nextest_metadata::TestListSummary;
46use std::{
47    borrow::Cow,
48    fs::File,
49    io::{self, BufRead, BufReader, Cursor, Read, Seek, SeekFrom, Write},
50};
51
52/// Result of writing a portable recording.
53#[derive(Debug)]
54pub struct PortableRecordingResult {
55    /// The path to the written archive.
56    pub path: Utf8PathBuf,
57    /// The total size of the archive in bytes.
58    pub size: u64,
59}
60
61/// Result of extracting a file from a portable recording.
62#[derive(Debug)]
63pub struct ExtractOuterFileResult {
64    /// The number of bytes written to the output file.
65    pub bytes_written: u64,
66    /// If the file size exceeded the limit threshold, contains the claimed size.
67    ///
68    /// This is informational only; the full file is always extracted regardless
69    /// of whether this is `Some`.
70    pub exceeded_limit: Option<u64>,
71}
72
73/// Writer to create a portable recording from a recorded run.
74#[derive(Debug)]
75pub struct PortableRecordingWriter<'a> {
76    run_info: &'a RecordedRunInfo,
77    run_dir: Utf8PathBuf,
78}
79
80impl<'a> PortableRecordingWriter<'a> {
81    /// Creates a new writer for the given run.
82    ///
83    /// Validates that the run directory exists and contains the required files.
84    pub fn new(
85        run_info: &'a RecordedRunInfo,
86        runs_dir: StoreRunsDir<'_>,
87    ) -> Result<Self, PortableRecordingError> {
88        let run_dir = runs_dir.run_dir(run_info.run_id);
89
90        if !run_dir.exists() {
91            return Err(PortableRecordingError::RunDirNotFound { path: run_dir });
92        }
93
94        let store_zip_path = run_dir.join(STORE_ZIP_FILE_NAME);
95        if !store_zip_path.exists() {
96            return Err(PortableRecordingError::RequiredFileMissing {
97                run_dir,
98                file_name: STORE_ZIP_FILE_NAME,
99            });
100        }
101
102        let run_log_path = run_dir.join(RUN_LOG_FILE_NAME);
103        if !run_log_path.exists() {
104            return Err(PortableRecordingError::RequiredFileMissing {
105                run_dir,
106                file_name: RUN_LOG_FILE_NAME,
107            });
108        }
109
110        Ok(Self { run_info, run_dir })
111    }
112
113    /// Returns the default filename for this archive.
114    ///
115    /// Format: `nextest-run-{run_id}.zip`
116    pub fn default_filename(&self) -> String {
117        format!("nextest-run-{}.zip", self.run_info.run_id)
118    }
119
120    /// Writes the portable recording to the given directory.
121    ///
122    /// The archive is written atomically using a temporary file and rename.
123    /// The filename will be the default filename (`nextest-run-{run_id}.zip`).
124    pub fn write_to_dir(
125        &self,
126        output_dir: &Utf8Path,
127    ) -> Result<PortableRecordingResult, PortableRecordingError> {
128        let output_path = output_dir.join(self.default_filename());
129        self.write_to_path(&output_path)
130    }
131
132    /// Writes the portable recording to the given path.
133    ///
134    /// The archive is written atomically using a temporary file and rename.
135    pub fn write_to_path(
136        &self,
137        output_path: &Utf8Path,
138    ) -> Result<PortableRecordingResult, PortableRecordingError> {
139        let atomic_file = AtomicFile::new(output_path, OverwriteBehavior::AllowOverwrite);
140
141        let final_size = atomic_file
142            .write(|temp_file| {
143                let counter = Counter::new(temp_file);
144                let mut zip_writer = ArchiveWriter::new(counter);
145
146                self.write_manifest(&mut zip_writer)?;
147                self.copy_file(&mut zip_writer, RUN_LOG_FILE_NAME)?;
148                self.copy_file(&mut zip_writer, STORE_ZIP_FILE_NAME)?;
149
150                let counter = zip_writer
151                    .finish()
152                    .map_err(PortableRecordingError::ZipFinalize)?;
153
154                // Prefer the actual file size from metadata since ArchiveWriter
155                // writes data descriptors after entries, causing the counter to
156                // slightly overcount. Fall back to the counter value if metadata
157                // is unavailable.
158                let counter_bytes = counter.writer_bytes() as u64;
159                let file = counter.into_inner();
160                let size = file.metadata().map(|m| m.len()).unwrap_or(counter_bytes);
161
162                Ok(size)
163            })
164            .map_err(|err| match err {
165                atomicwrites::Error::Internal(source) => PortableRecordingError::AtomicWrite {
166                    path: output_path.to_owned(),
167                    source,
168                },
169                atomicwrites::Error::User(e) => e,
170            })?;
171
172        Ok(PortableRecordingResult {
173            path: output_path.to_owned(),
174            size: final_size,
175        })
176    }
177
178    /// Writes the manifest to the archive.
179    fn write_manifest<W: Write>(
180        &self,
181        zip_writer: &mut ArchiveWriter<W>,
182    ) -> Result<(), PortableRecordingError> {
183        let manifest = PortableManifest::new(self.run_info);
184        let manifest_json = serde_json::to_vec_pretty(&manifest)
185            .map_err(PortableRecordingError::SerializeManifest)?;
186
187        let options = stored_file_options();
188
189        zip_writer
190            .add_file(PORTABLE_MANIFEST_FILE_NAME, &manifest_json[..], &options)
191            .map_err(|source| PortableRecordingError::ZipWrite {
192                file_name: PORTABLE_MANIFEST_FILE_NAME,
193                source,
194            })?;
195
196        Ok(())
197    }
198
199    /// Copies a file from the run directory to the archive.
200    ///
201    /// The file is stored without additional compression since `run.log.zst`
202    /// and `store.zip` are already compressed.
203    fn copy_file<W: Write>(
204        &self,
205        zip_writer: &mut ArchiveWriter<W>,
206        file_name: &'static str,
207    ) -> Result<(), PortableRecordingError> {
208        let source_path = self.run_dir.join(file_name);
209        let mut file = File::open(&source_path)
210            .map_err(|source| PortableRecordingError::ReadFile { file_name, source })?;
211
212        let options = stored_file_options();
213
214        let mut streamer = zip_writer
215            .stream_file(file_name, &options)
216            .map_err(|source| PortableRecordingError::ZipStartFile { file_name, source })?;
217
218        io::copy(&mut file, &mut streamer)
219            .map_err(|source| PortableRecordingError::ZipWrite { file_name, source })?;
220
221        streamer
222            .finish()
223            .map_err(|source| PortableRecordingError::ZipWrite { file_name, source })?;
224
225        Ok(())
226    }
227}
228
229// ---
230// Portable recording reading
231// ---
232
233/// Maximum size for spooling a non-seekable input to a temporary file (4 GiB).
234///
235/// This is a safety limit to avoid filling up disk when reading from a pipe.
236/// Portable recordings are typically small (a few hundred MB at most), so this
237/// is generous.
238const SPOOL_SIZE_LIMIT: ByteSize = ByteSize(4 * 1024 * 1024 * 1024);
239
240/// Classifies a Windows file handle for seekability.
241///
242/// On Windows, `SetFilePointerEx` can spuriously succeed on named pipe handles
243/// (returning meaningless position values), so seek-based probing is
244/// unreliable. We use `GetFileType` instead, which definitively classifies the
245/// handle.
246#[cfg(windows)]
247enum WindowsFileKind {
248    /// A regular disk file (seekable).
249    Disk,
250    /// A pipe, FIFO, or socket (not seekable, must be spooled).
251    Pipe,
252    /// A character device or unknown handle type (not expected for recording
253    /// files).
254    Other(u32),
255}
256
257/// Classifies a Windows file handle using `GetFileType`.
258#[cfg(windows)]
259fn classify_windows_handle(file: &File) -> WindowsFileKind {
260    use std::os::windows::io::AsRawHandle;
261    use windows_sys::Win32::Storage::FileSystem::{FILE_TYPE_DISK, FILE_TYPE_PIPE, GetFileType};
262
263    // SAFETY: the handle is valid because `file` is a live `File`.
264    let file_type = unsafe { GetFileType(file.as_raw_handle()) };
265    match file_type {
266        FILE_TYPE_DISK => WindowsFileKind::Disk,
267        FILE_TYPE_PIPE => WindowsFileKind::Pipe,
268        other => WindowsFileKind::Other(other),
269    }
270}
271
272/// Returns true if the I/O error indicates that the file descriptor does not
273/// support seeking (i.e. it is a pipe, FIFO, or socket).
274#[cfg(unix)]
275fn is_not_seekable_error(e: &io::Error) -> bool {
276    // Pipes/FIFOs/sockets fail lseek with ESPIPE.
277    e.raw_os_error() == Some(libc::ESPIPE)
278}
279
280/// Ensures that a file is seekable, spooling to a temp file if necessary.
281///
282/// Process substitution paths (e.g. `/proc/self/fd/11` from `<(curl url)`)
283/// produce pipe fds that are not seekable. ZIP reading requires seeking, so we
284/// spool the pipe contents to an anonymous temporary file first.
285///
286/// Returns the original file if it's already seekable, or a new temp file
287/// containing the spooled data.
288fn ensure_seekable(file: File, path: &Utf8Path) -> Result<File, PortableRecordingReadError> {
289    ensure_seekable_impl(file, path, SPOOL_SIZE_LIMIT)
290}
291
292/// Inner implementation of [`ensure_seekable`] with a configurable size limit.
293///
294/// Separated so tests can exercise the limit enforcement without writing 4 GiB.
295fn ensure_seekable_impl(
296    file: File,
297    path: &Utf8Path,
298    spool_limit: ByteSize,
299) -> Result<File, PortableRecordingReadError> {
300    // On Unix, lseek reliably fails with ESPIPE on pipes/FIFOs/sockets, so
301    // a seek probe is sufficient.
302    #[cfg(unix)]
303    {
304        let mut file = file;
305        match file.stream_position() {
306            Ok(_) => Ok(file),
307            Err(e) if is_not_seekable_error(&e) => spool_to_temp(file, path, spool_limit),
308            Err(e) => {
309                // Unexpected seek error (e.g. EBADF, EIO): propagate rather than
310                // silently falling into the spool path.
311                Err(PortableRecordingReadError::SeekProbe {
312                    path: path.to_owned(),
313                    error: e,
314                })
315            }
316        }
317    }
318
319    // On Windows, SetFilePointerEx can spuriously succeed on named pipe
320    // handles, so seek-based probing is unreliable. Use GetFileType to
321    // definitively classify the handle.
322    #[cfg(windows)]
323    match classify_windows_handle(&file) {
324        WindowsFileKind::Disk => Ok(file),
325        WindowsFileKind::Pipe => spool_to_temp(file, path, spool_limit),
326        WindowsFileKind::Other(file_type) => Err(PortableRecordingReadError::SeekProbe {
327            path: path.to_owned(),
328            error: io::Error::other(format!(
329                "unexpected file handle type {file_type:#x} (expected disk or pipe)"
330            )),
331        }),
332    }
333}
334
335/// Spools the contents of a non-seekable file to an anonymous temporary file.
336///
337/// Returns the temp file, rewound to the beginning so callers can read it.
338fn spool_to_temp(
339    file: File,
340    path: &Utf8Path,
341    spool_limit: ByteSize,
342) -> Result<File, PortableRecordingReadError> {
343    let mut temp =
344        camino_tempfile::tempfile().map_err(|error| PortableRecordingReadError::SpoolTempFile {
345            path: path.to_owned(),
346            error,
347        })?;
348
349    // Read up to spool_limit + 1 bytes. If we get more than the limit, the
350    // input is too large. Use saturating_add to avoid wrapping if the limit
351    // is u64::MAX (not an issue in practice since the limit is 4 GiB).
352    let bytes_copied = io::copy(
353        &mut (&file).take(spool_limit.0.saturating_add(1)),
354        &mut temp,
355    )
356    .map_err(|error| PortableRecordingReadError::SpoolTempFile {
357        path: path.to_owned(),
358        error,
359    })?;
360
361    if bytes_copied > spool_limit.0 {
362        return Err(PortableRecordingReadError::SpoolTooLarge {
363            path: path.to_owned(),
364            limit: spool_limit,
365        });
366    }
367
368    // Rewind so the archive reader can read from the beginning.
369    temp.seek(SeekFrom::Start(0))
370        .map_err(|error| PortableRecordingReadError::SpoolTempFile {
371            path: path.to_owned(),
372            error,
373        })?;
374
375    Ok(temp)
376}
377
378/// Backing storage for an archive.
379///
380/// - `Left(File)`: Direct file-backed archive (normal case).
381/// - `Right(Cursor<Vec<u8>>)`: Memory-backed archive (unwrapped from a wrapper zip).
382type ArchiveReadStorage = Either<File, Cursor<Vec<u8>>>;
383
384/// A portable recording opened for reading.
385pub struct PortableRecording {
386    archive_path: Utf8PathBuf,
387    manifest: PortableManifest,
388    outer_archive: Archive<BufReader<ArchiveReadStorage>>,
389}
390
391impl std::fmt::Debug for PortableRecording {
392    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393        f.debug_struct("PortableRecording")
394            .field("archive_path", &self.archive_path)
395            .field("manifest", &self.manifest)
396            .finish_non_exhaustive()
397    }
398}
399
400impl RunFilesExist for PortableRecording {
401    fn store_zip_exists(&self) -> bool {
402        self.outer_archive.index_of(STORE_ZIP_FILE_NAME).is_some()
403    }
404
405    fn run_log_exists(&self) -> bool {
406        self.outer_archive.index_of(RUN_LOG_FILE_NAME).is_some()
407    }
408}
409
410impl PortableRecording {
411    /// Opens a portable recording from a file path.
412    ///
413    /// Validates the format and store versions on open to fail fast if the
414    /// archive cannot be read by this version of nextest.
415    ///
416    /// This method also handles "wrapper" archives: if the archive does not
417    /// contain `manifest.json` but contains exactly one `.zip` file, that inner
418    /// file is treated as the nextest portable recording. This supports GitHub
419    /// Actions artifact downloads, which wrap archives in an outer zip.
420    pub fn open(path: &Utf8Path) -> Result<Self, PortableRecordingReadError> {
421        let file = File::open(path).map_err(|error| PortableRecordingReadError::OpenArchive {
422            path: path.to_owned(),
423            error,
424        })?;
425
426        // Ensure the file is seekable. Process substitution paths (e.g.
427        // `/proc/self/fd/11`) produce pipe fds; spool to a temp file if needed.
428        let file = ensure_seekable(file, path)?;
429
430        let mut outer_archive =
431            Archive::new(BufReader::new(Either::Left(file))).map_err(|error| {
432                PortableRecordingReadError::ReadArchive {
433                    path: path.to_owned(),
434                    error,
435                }
436            })?;
437
438        // Check if this is a direct nextest archive (has manifest.json).
439        if outer_archive
440            .index_of(PORTABLE_MANIFEST_FILE_NAME)
441            .is_some()
442        {
443            return Self::open_validated(path, outer_archive);
444        }
445
446        // No manifest.json found. Check if this is a wrapper archive containing
447        // exactly one .zip file. Filter out directory entries (names ending with
448        // '/' or '\').
449        let mut file_count = 0;
450        let mut zip_count = 0;
451        let mut zip_file: Option<String> = None;
452        for metadata in outer_archive.entries() {
453            let name = metadata.name();
454            if name.ends_with('/') || name.ends_with('\\') {
455                // This is a directory entry, skip it.
456                continue;
457            }
458            file_count += 1;
459            if has_zip_extension(Utf8Path::new(name)) {
460                zip_count += 1;
461                if zip_count == 1 {
462                    zip_file = Some(name.to_owned());
463                }
464            }
465        }
466
467        if let Some(inner_name) = zip_file.filter(|_| file_count == 1 && zip_count == 1) {
468            // We only support reading up to the MAX_MAX_OUTPUT_SIZE cap. We'll
469            // see if anyone complains -- they have to have both a wrapper zip
470            // and to exceed the cap. (Probably worth extracting to a file on
471            // disk or something at that point.)
472            let inner_bytes = read_outer_file(&mut outer_archive, inner_name.into(), path)?;
473            let inner_archive = Archive::new(BufReader::new(Either::Right(Cursor::new(
474                inner_bytes,
475            ))))
476            .map_err(|error| PortableRecordingReadError::ReadArchive {
477                path: path.to_owned(),
478                error,
479            })?;
480            Self::open_validated(path, inner_archive)
481        } else {
482            Err(PortableRecordingReadError::NotAWrapperArchive {
483                path: path.to_owned(),
484                file_count,
485                zip_count,
486            })
487        }
488    }
489
490    /// Opens and validates an archive that is known to contain `manifest.json`.
491    fn open_validated(
492        path: &Utf8Path,
493        mut outer_archive: Archive<BufReader<ArchiveReadStorage>>,
494    ) -> Result<Self, PortableRecordingReadError> {
495        // Read and parse the manifest.
496        let manifest_bytes =
497            read_outer_file(&mut outer_archive, PORTABLE_MANIFEST_FILE_NAME.into(), path)?;
498        let manifest: PortableManifest =
499            serde_json::from_slice(&manifest_bytes).map_err(|error| {
500                PortableRecordingReadError::ParseManifest {
501                    path: path.to_owned(),
502                    error,
503                }
504            })?;
505
506        // Validate format version.
507        if let Err(incompatibility) = manifest
508            .format_version
509            .check_readable_by(PORTABLE_RECORDING_FORMAT_VERSION)
510        {
511            return Err(PortableRecordingReadError::UnsupportedFormatVersion {
512                path: path.to_owned(),
513                found: manifest.format_version,
514                supported: PORTABLE_RECORDING_FORMAT_VERSION,
515                incompatibility,
516            });
517        }
518
519        // Validate store format version.
520        let store_version = manifest.store_format_version();
521        if let Err(incompatibility) = store_version.check_readable_by(STORE_FORMAT_VERSION) {
522            return Err(PortableRecordingReadError::UnsupportedStoreFormatVersion {
523                path: path.to_owned(),
524                found: store_version,
525                supported: STORE_FORMAT_VERSION,
526                incompatibility,
527            });
528        }
529
530        Ok(Self {
531            archive_path: path.to_owned(),
532            manifest,
533            outer_archive,
534        })
535    }
536
537    /// Returns the path to the archive file.
538    pub fn archive_path(&self) -> &Utf8Path {
539        &self.archive_path
540    }
541
542    /// Returns run info extracted from the manifest.
543    pub fn run_info(&self) -> RecordedRunInfo {
544        self.manifest.run_info()
545    }
546
547    /// Reads the run log into memory and returns it as an owned struct.
548    ///
549    /// The returned [`PortableRecordingRunLog`] can be used to iterate over events
550    /// independently of this archive, avoiding borrow conflicts with
551    /// [`open_store`](Self::open_store).
552    pub fn read_run_log(&mut self) -> Result<PortableRecordingRunLog, PortableRecordingReadError> {
553        let run_log_bytes = read_outer_file(
554            &mut self.outer_archive,
555            RUN_LOG_FILE_NAME.into(),
556            &self.archive_path,
557        )?;
558        Ok(PortableRecordingRunLog {
559            archive_path: self.archive_path.clone(),
560            run_log_bytes,
561        })
562    }
563
564    /// Extracts a file from the outer archive to a path, streaming directly.
565    ///
566    /// This avoids loading the entire file into memory. The full file is always
567    /// extracted regardless of size.
568    ///
569    /// If `check_limit` is true, the result will indicate whether the file
570    /// exceeded [`MAX_MAX_OUTPUT_SIZE`]. This is informational only and does
571    /// not affect extraction.
572    pub fn extract_outer_file_to_path(
573        &mut self,
574        file_name: &'static str,
575        output_path: &Utf8Path,
576        check_limit: bool,
577    ) -> Result<ExtractOuterFileResult, PortableRecordingReadError> {
578        extract_outer_file_to_path(
579            &mut self.outer_archive,
580            file_name,
581            &self.archive_path,
582            output_path,
583            check_limit,
584        )
585    }
586
587    /// Opens the inner store.zip for reading.
588    ///
589    /// The returned reader borrows from this archive via a zero-copy
590    /// `Take` window into the outer archive's reader. CRC verification of
591    /// the outer store.zip entry is skipped because each inner entry has its
592    /// own CRC check.
593    pub fn open_store(&mut self) -> Result<PortableStoreReader<'_>, PortableRecordingReadError> {
594        let file = self
595            .outer_archive
596            .get_by_name(STORE_ZIP_FILE_NAME)
597            .ok_or_else(|| PortableRecordingReadError::MissingFile {
598                path: self.archive_path.clone(),
599                file_name: Cow::Borrowed(STORE_ZIP_FILE_NAME),
600            })?;
601
602        let metadata = file.metadata();
603        if metadata.compression_method != CompressionMethod::STORE {
604            return Err(PortableRecordingReadError::CompressedInnerArchive {
605                archive_path: self.archive_path.clone(),
606                compression: metadata.compression_method,
607            });
608        }
609
610        let reader = file.into_reader();
611        let raw =
612            metadata
613                .read_raw(reader)
614                .map_err(|error| PortableRecordingReadError::ReadArchive {
615                    path: self.archive_path.clone(),
616                    error,
617                })?;
618
619        let store_archive =
620            Archive::new(raw).map_err(|error| PortableRecordingReadError::ReadArchive {
621                path: self.archive_path.clone(),
622                error,
623            })?;
624
625        Ok(PortableStoreReader {
626            archive_path: &self.archive_path,
627            store_archive,
628            stdout_dict: None,
629            stderr_dict: None,
630        })
631    }
632}
633
634/// Reads a file from the outer archive into memory, with size limits.
635fn read_outer_file(
636    archive: &mut Archive<BufReader<ArchiveReadStorage>>,
637    file_name: Cow<'static, str>,
638    archive_path: &Utf8Path,
639) -> Result<Vec<u8>, PortableRecordingReadError> {
640    let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
641    let mut file =
642        archive
643            .get_by_name(&file_name)
644            .ok_or_else(|| PortableRecordingReadError::MissingFile {
645                path: archive_path.to_owned(),
646                file_name: file_name.clone(),
647            })?;
648
649    let claimed_size = file.metadata().uncompressed_size;
650    if claimed_size > limit {
651        return Err(PortableRecordingReadError::FileTooLarge {
652            path: archive_path.to_owned(),
653            file_name,
654            size: claimed_size,
655            limit,
656        });
657    }
658
659    let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
660    let mut contents = Vec::with_capacity(capacity);
661
662    file.read()
663        .and_then(|reader| reader.take(limit).read_to_end(&mut contents))
664        .map_err(|error| PortableRecordingReadError::ReadArchive {
665            path: archive_path.to_owned(),
666            error,
667        })?;
668
669    Ok(contents)
670}
671
672/// Extracts a file from the outer archive to a path, streaming directly.
673fn extract_outer_file_to_path(
674    archive: &mut Archive<BufReader<ArchiveReadStorage>>,
675    file_name: &'static str,
676    archive_path: &Utf8Path,
677    output_path: &Utf8Path,
678    check_limit: bool,
679) -> Result<ExtractOuterFileResult, PortableRecordingReadError> {
680    let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
681    let mut file =
682        archive
683            .get_by_name(file_name)
684            .ok_or_else(|| PortableRecordingReadError::MissingFile {
685                path: archive_path.to_owned(),
686                file_name: Cow::Borrowed(file_name),
687            })?;
688
689    let claimed_size = file.metadata().uncompressed_size;
690    let exceeded_limit = if check_limit && claimed_size > limit {
691        Some(claimed_size)
692    } else {
693        None
694    };
695
696    let mut output_file =
697        File::create(output_path).map_err(|error| PortableRecordingReadError::ExtractFile {
698            archive_path: archive_path.to_owned(),
699            file_name,
700            output_path: output_path.to_owned(),
701            error,
702        })?;
703
704    let mut reader = file
705        .read()
706        .map_err(|error| PortableRecordingReadError::ReadArchive {
707            path: archive_path.to_owned(),
708            error,
709        })?;
710
711    let bytes_written = io::copy(&mut reader, &mut output_file).map_err(|error| {
712        PortableRecordingReadError::ExtractFile {
713            archive_path: archive_path.to_owned(),
714            file_name,
715            output_path: output_path.to_owned(),
716            error,
717        }
718    })?;
719
720    Ok(ExtractOuterFileResult {
721        bytes_written,
722        exceeded_limit,
723    })
724}
725
726/// The run log from a portable recording, read into memory.
727///
728/// This struct owns the run log bytes and can create event iterators
729/// independently of the [`PortableRecording`] it came from.
730#[derive(Debug)]
731pub struct PortableRecordingRunLog {
732    archive_path: Utf8PathBuf,
733    run_log_bytes: Vec<u8>,
734}
735
736impl PortableRecordingRunLog {
737    /// Returns an iterator over events from the run log.
738    pub fn events(&self) -> Result<PortableRecordingEventIter<'_>, RecordReadError> {
739        // The run log is zstd-compressed JSON Lines. Use with_buffer since the
740        // data is already in memory (no need for Decoder's internal BufReader).
741        let decoder =
742            zstd::stream::Decoder::with_buffer(&self.run_log_bytes[..]).map_err(|error| {
743                RecordReadError::OpenRunLog {
744                    path: self.archive_path.join(RUN_LOG_FILE_NAME),
745                    error,
746                }
747            })?;
748        Ok(PortableRecordingEventIter {
749            // BufReader is still needed for read_line().
750            reader: DebugIgnore(BufReader::new(decoder)),
751            line_buf: String::new(),
752            line_number: 0,
753        })
754    }
755}
756
757/// Iterator over events from a portable recording's run log.
758#[derive(Debug)]
759pub struct PortableRecordingEventIter<'a> {
760    reader: DebugIgnore<BufReader<zstd::stream::Decoder<'static, &'a [u8]>>>,
761    line_buf: String,
762    line_number: usize,
763}
764
765impl Iterator for PortableRecordingEventIter<'_> {
766    type Item = Result<TestEventSummary<RecordingSpec>, RecordReadError>;
767
768    fn next(&mut self) -> Option<Self::Item> {
769        loop {
770            self.line_buf.clear();
771            self.line_number += 1;
772
773            match self.reader.read_line(&mut self.line_buf) {
774                Ok(0) => return None,
775                Ok(_) => {
776                    let trimmed = self.line_buf.trim();
777                    if trimmed.is_empty() {
778                        continue;
779                    }
780                    return Some(serde_json::from_str(trimmed).map_err(|error| {
781                        RecordReadError::ParseEvent {
782                            line_number: self.line_number,
783                            error,
784                        }
785                    }));
786                }
787                Err(error) => {
788                    return Some(Err(RecordReadError::ReadRunLog {
789                        line_number: self.line_number,
790                        error,
791                    }));
792                }
793            }
794        }
795    }
796}
797
798/// Reader for the inner store.zip within a portable recording.
799///
800/// Borrows from [`PortableRecording`] via a zero-copy `Take` window into the
801/// outer archive's reader. Implements [`StoreReader`].
802pub struct PortableStoreReader<'a> {
803    archive_path: &'a Utf8Path,
804    store_archive: Archive<io::Take<&'a mut BufReader<ArchiveReadStorage>>>,
805    /// Cached stdout dictionary loaded from the archive.
806    stdout_dict: Option<Vec<u8>>,
807    /// Cached stderr dictionary loaded from the archive.
808    stderr_dict: Option<Vec<u8>>,
809}
810
811impl std::fmt::Debug for PortableStoreReader<'_> {
812    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
813        f.debug_struct("PortableStoreReader")
814            .field("archive_path", &self.archive_path)
815            .field("stdout_dict", &self.stdout_dict.as_ref().map(|d| d.len()))
816            .field("stderr_dict", &self.stderr_dict.as_ref().map(|d| d.len()))
817            .finish_non_exhaustive()
818    }
819}
820
821impl PortableStoreReader<'_> {
822    /// Reads a file from the store archive as bytes, with size limit.
823    fn read_store_file(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
824        let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
825        let mut file = self.store_archive.get_by_name(file_name).ok_or_else(|| {
826            RecordReadError::FileNotFound {
827                file_name: file_name.to_string(),
828            }
829        })?;
830
831        let claimed_size = file.metadata().uncompressed_size;
832        if claimed_size > limit {
833            return Err(RecordReadError::FileTooLarge {
834                file_name: file_name.to_string(),
835                size: claimed_size,
836                limit,
837            });
838        }
839
840        let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
841        let mut contents = Vec::with_capacity(capacity);
842
843        file.read()
844            .and_then(|reader| reader.take(limit).read_to_end(&mut contents))
845            .map_err(|error| RecordReadError::Decompress {
846                file_name: file_name.to_string(),
847                error,
848            })?;
849
850        let actual_size = contents.len() as u64;
851        if actual_size != claimed_size {
852            return Err(RecordReadError::SizeMismatch {
853                file_name: file_name.to_string(),
854                claimed_size,
855                actual_size,
856            });
857        }
858
859        Ok(contents)
860    }
861
862    /// Returns the dictionary bytes for the given output file name, if known.
863    fn get_dict_for_output(&self, file_name: &str) -> Option<&[u8]> {
864        match OutputDict::for_output_file_name(file_name) {
865            OutputDict::Stdout => Some(
866                self.stdout_dict
867                    .as_ref()
868                    .expect("load_dictionaries must be called first"),
869            ),
870            OutputDict::Stderr => Some(
871                self.stderr_dict
872                    .as_ref()
873                    .expect("load_dictionaries must be called first"),
874            ),
875            OutputDict::None => None,
876        }
877    }
878}
879
880impl StoreReader for PortableStoreReader<'_> {
881    fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
882        let bytes = self.read_store_file(CARGO_METADATA_JSON_PATH)?;
883        String::from_utf8(bytes).map_err(|e| RecordReadError::Decompress {
884            file_name: CARGO_METADATA_JSON_PATH.to_string(),
885            error: std::io::Error::new(std::io::ErrorKind::InvalidData, e),
886        })
887    }
888
889    fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
890        let bytes = self.read_store_file(TEST_LIST_JSON_PATH)?;
891        serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
892            file_name: TEST_LIST_JSON_PATH.to_string(),
893            error,
894        })
895    }
896
897    fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
898        let bytes = self.read_store_file(RECORD_OPTS_JSON_PATH)?;
899        serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
900            file_name: RECORD_OPTS_JSON_PATH.to_string(),
901            error,
902        })
903    }
904
905    fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
906        match self.read_store_file(RERUN_INFO_JSON_PATH) {
907            Ok(bytes) => {
908                let info = serde_json::from_slice(&bytes).map_err(|error| {
909                    RecordReadError::DeserializeMetadata {
910                        file_name: RERUN_INFO_JSON_PATH.to_string(),
911                        error,
912                    }
913                })?;
914                Ok(Some(info))
915            }
916            Err(RecordReadError::FileNotFound { .. }) => {
917                // File doesn't exist; this is not a rerun.
918                Ok(None)
919            }
920            Err(e) => Err(e),
921        }
922    }
923
924    fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
925        self.stdout_dict = Some(self.read_store_file(STDOUT_DICT_PATH)?);
926        self.stderr_dict = Some(self.read_store_file(STDERR_DICT_PATH)?);
927        Ok(())
928    }
929
930    fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
931        let path = format!("out/{file_name}");
932        let compressed = self.read_store_file(&path)?;
933        let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
934
935        let dict_bytes = self.get_dict_for_output(file_name).ok_or_else(|| {
936            RecordReadError::UnknownOutputType {
937                file_name: file_name.to_owned(),
938            }
939        })?;
940
941        decompress_with_dict(&compressed, dict_bytes, limit).map_err(|error| {
942            RecordReadError::Decompress {
943                file_name: path,
944                error,
945            }
946        })
947    }
948
949    fn extract_file_to_path(
950        &mut self,
951        store_path: &str,
952        output_path: &Utf8Path,
953    ) -> Result<u64, RecordReadError> {
954        let mut file = self.store_archive.get_by_name(store_path).ok_or_else(|| {
955            RecordReadError::FileNotFound {
956                file_name: store_path.to_owned(),
957            }
958        })?;
959
960        let mut output_file =
961            File::create(output_path).map_err(|error| RecordReadError::ExtractFile {
962                store_path: store_path.to_owned(),
963                output_path: output_path.to_owned(),
964                error,
965            })?;
966
967        let mut reader = file
968            .read()
969            .map_err(|error| RecordReadError::ReadArchiveFile {
970                file_name: store_path.to_owned(),
971                error,
972            })?;
973
974        io::copy(&mut reader, &mut output_file).map_err(|error| RecordReadError::ExtractFile {
975            store_path: store_path.to_owned(),
976            output_path: output_path.to_owned(),
977            error,
978        })
979    }
980}
981
982#[cfg(test)]
983mod tests {
984    use super::*;
985    use crate::record::{
986        format::{PORTABLE_RECORDING_FORMAT_VERSION, STORE_FORMAT_VERSION},
987        store::{CompletedRunStats, RecordedRunStatus, RecordedSizes},
988    };
989    use camino_tempfile::{NamedUtf8TempFile, Utf8TempDir};
990    use chrono::Local;
991    use eazip::write::FileOptions;
992    use quick_junit::ReportUuid;
993    use semver::Version;
994    use std::{collections::BTreeMap, io::Read};
995
996    fn create_test_run_dir(run_id: ReportUuid) -> (Utf8TempDir, Utf8PathBuf) {
997        let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
998        let runs_dir = temp_dir.path().to_owned();
999        let run_dir = runs_dir.join(run_id.to_string());
1000        std::fs::create_dir_all(&run_dir).expect("create run dir");
1001
1002        let store_path = run_dir.join(STORE_ZIP_FILE_NAME);
1003        let store_file = File::create(&store_path).expect("create store.zip");
1004        let mut zip_writer = ArchiveWriter::new(store_file);
1005        let options = FileOptions::default();
1006        zip_writer
1007            .add_file("test.txt", &b"test content"[..], &options)
1008            .expect("add file");
1009        zip_writer.finish().expect("finish zip");
1010
1011        let log_path = run_dir.join(RUN_LOG_FILE_NAME);
1012        let log_file = File::create(&log_path).expect("create run.log.zst");
1013        let mut encoder = zstd::stream::Encoder::new(log_file, 3).expect("create encoder");
1014        encoder.write_all(b"test log content").expect("write log");
1015        encoder.finish().expect("finish encoder");
1016
1017        (temp_dir, runs_dir)
1018    }
1019
1020    fn create_test_run_info(run_id: ReportUuid) -> RecordedRunInfo {
1021        let now = Local::now().fixed_offset();
1022        RecordedRunInfo {
1023            run_id,
1024            store_format_version: STORE_FORMAT_VERSION,
1025            nextest_version: Version::new(0, 9, 111),
1026            started_at: now,
1027            last_written_at: now,
1028            duration_secs: Some(12.345),
1029            cli_args: vec!["cargo".to_owned(), "nextest".to_owned(), "run".to_owned()],
1030            build_scope_args: vec!["--workspace".to_owned()],
1031            env_vars: BTreeMap::from([("CARGO_TERM_COLOR".to_owned(), "always".to_owned())]),
1032            parent_run_id: None,
1033            sizes: RecordedSizes::default(),
1034            status: RecordedRunStatus::Completed(CompletedRunStats {
1035                initial_run_count: 10,
1036                passed: 9,
1037                failed: 1,
1038                exit_code: 100,
1039            }),
1040        }
1041    }
1042
1043    #[test]
1044    fn test_default_filename() {
1045        let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1046        let (_temp_dir, runs_dir) = create_test_run_dir(run_id);
1047        let run_info = create_test_run_info(run_id);
1048
1049        let writer = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir))
1050            .expect("create writer");
1051
1052        assert_eq!(
1053            writer.default_filename(),
1054            "nextest-run-550e8400-e29b-41d4-a716-446655440000.zip"
1055        );
1056    }
1057
1058    #[test]
1059    fn test_write_portable_recording() {
1060        let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1061        let (_temp_dir, runs_dir) = create_test_run_dir(run_id);
1062        let run_info = create_test_run_info(run_id);
1063
1064        let writer = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir))
1065            .expect("create writer");
1066
1067        let output_dir = camino_tempfile::tempdir().expect("create output dir");
1068
1069        let result = writer
1070            .write_to_dir(output_dir.path())
1071            .expect("write archive");
1072
1073        assert!(result.path.exists());
1074        assert!(result.size > 0);
1075
1076        // Verify that the reported size matches the actual file size on disk.
1077        let actual_size = std::fs::metadata(&result.path)
1078            .expect("get file metadata")
1079            .len();
1080        assert_eq!(
1081            result.size, actual_size,
1082            "reported size should match actual file size"
1083        );
1084
1085        assert_eq!(
1086            result.path.file_name(),
1087            Some("nextest-run-550e8400-e29b-41d4-a716-446655440000.zip")
1088        );
1089
1090        let archive_file = File::open(&result.path).expect("open archive");
1091        let mut archive = Archive::new(BufReader::new(archive_file)).expect("read archive");
1092
1093        assert_eq!(archive.entries().len(), 3);
1094
1095        {
1096            let mut manifest_file = archive
1097                .get_by_name(PORTABLE_MANIFEST_FILE_NAME)
1098                .expect("manifest");
1099            let mut manifest_content = String::new();
1100            manifest_file
1101                .read()
1102                .expect("get reader")
1103                .read_to_string(&mut manifest_content)
1104                .expect("read manifest");
1105            let manifest: PortableManifest =
1106                serde_json::from_str(&manifest_content).expect("parse manifest");
1107            assert_eq!(manifest.format_version, PORTABLE_RECORDING_FORMAT_VERSION);
1108            assert_eq!(manifest.run.run_id, run_id);
1109        }
1110
1111        {
1112            let store_file = archive.get_by_name(STORE_ZIP_FILE_NAME).expect("store.zip");
1113            assert!(store_file.metadata().uncompressed_size > 0);
1114        }
1115
1116        {
1117            let log_file = archive.get_by_name(RUN_LOG_FILE_NAME).expect("run.log.zst");
1118            assert!(log_file.metadata().uncompressed_size > 0);
1119        }
1120    }
1121
1122    #[test]
1123    fn test_missing_run_dir() {
1124        let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1125        let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
1126        let runs_dir = temp_dir.path().to_owned();
1127        let run_info = create_test_run_info(run_id);
1128
1129        let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
1130
1131        assert!(matches!(
1132            result,
1133            Err(PortableRecordingError::RunDirNotFound { .. })
1134        ));
1135    }
1136
1137    #[test]
1138    fn test_missing_store_zip() {
1139        let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1140        let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
1141        let runs_dir = temp_dir.path().to_owned();
1142        let run_dir = runs_dir.join(run_id.to_string());
1143        std::fs::create_dir_all(&run_dir).expect("create run dir");
1144
1145        let log_path = run_dir.join(RUN_LOG_FILE_NAME);
1146        let log_file = File::create(&log_path).expect("create run.log.zst");
1147        let mut encoder = zstd::stream::Encoder::new(log_file, 3).expect("create encoder");
1148        encoder.write_all(b"test").expect("write");
1149        encoder.finish().expect("finish");
1150
1151        let run_info = create_test_run_info(run_id);
1152        let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
1153
1154        assert!(
1155            matches!(
1156                &result,
1157                Err(PortableRecordingError::RequiredFileMissing { file_name, .. })
1158                if *file_name == STORE_ZIP_FILE_NAME
1159            ),
1160            "expected RequiredFileMissing for store.zip, got {result:?}"
1161        );
1162    }
1163
1164    #[test]
1165    fn test_missing_run_log() {
1166        let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1167        let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
1168        let runs_dir = temp_dir.path().to_owned();
1169        let run_dir = runs_dir.join(run_id.to_string());
1170        std::fs::create_dir_all(&run_dir).expect("create run dir");
1171
1172        let store_path = run_dir.join(STORE_ZIP_FILE_NAME);
1173        let store_file = File::create(&store_path).expect("create store.zip");
1174        let mut zip_writer = ArchiveWriter::new(store_file);
1175        let options = FileOptions::default();
1176        zip_writer
1177            .add_file("test.txt", &b"test"[..], &options)
1178            .expect("add file");
1179        zip_writer.finish().expect("finish");
1180
1181        let run_info = create_test_run_info(run_id);
1182        let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
1183
1184        assert!(
1185            matches!(
1186                &result,
1187                Err(PortableRecordingError::RequiredFileMissing { file_name, .. })
1188                if *file_name == RUN_LOG_FILE_NAME
1189            ),
1190            "expected RequiredFileMissing for run.log.zst, got {result:?}"
1191        );
1192    }
1193
1194    #[test]
1195    fn test_ensure_seekable_regular_file() {
1196        // A regular file is already seekable and should be returned as-is.
1197        let temp = NamedUtf8TempFile::new().expect("created temp file");
1198        let path = temp.path().to_owned();
1199
1200        std::fs::write(&path, b"hello world").expect("wrote to temp file");
1201        let file = File::open(&path).expect("opened temp file");
1202
1203        // Get the file's OS-level fd/handle for identity comparison.
1204        #[cfg(unix)]
1205        let original_fd = {
1206            use std::os::unix::io::AsRawFd;
1207            file.as_raw_fd()
1208        };
1209
1210        let result = ensure_seekable(file, &path).expect("ensure_seekable succeeded");
1211
1212        // The returned file should be the same fd (no spooling occurred).
1213        #[cfg(unix)]
1214        {
1215            use std::os::unix::io::AsRawFd;
1216            assert_eq!(
1217                result.as_raw_fd(),
1218                original_fd,
1219                "seekable file should be returned as-is"
1220            );
1221        }
1222
1223        // Verify the content is still readable.
1224        let mut contents = String::new();
1225        let mut reader = io::BufReader::new(result);
1226        reader
1227            .read_to_string(&mut contents)
1228            .expect("read file contents");
1229        assert_eq!(contents, "hello world");
1230    }
1231
1232    /// Converts a `PipeReader` into a `File` using platform-specific owned
1233    /// I/O types.
1234    #[cfg(unix)]
1235    fn pipe_reader_to_file(reader: std::io::PipeReader) -> File {
1236        use std::os::fd::OwnedFd;
1237        File::from(OwnedFd::from(reader))
1238    }
1239
1240    /// Converts a `PipeReader` into a `File` using platform-specific owned
1241    /// I/O types.
1242    #[cfg(windows)]
1243    fn pipe_reader_to_file(reader: std::io::PipeReader) -> File {
1244        use std::os::windows::io::OwnedHandle;
1245        File::from(OwnedHandle::from(reader))
1246    }
1247
1248    /// Tests that non-seekable inputs (pipes) are spooled to a temp file.
1249    ///
1250    /// This test uses `std::io::pipe()` to create a real pipe, which is the
1251    /// same mechanism the OS uses for process substitution (`<(command)`).
1252    #[test]
1253    fn test_ensure_seekable_pipe() {
1254        let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1255        let test_data = b"zip-like test content for pipe spooling";
1256
1257        // Write data and close the write end so the read end reaches EOF.
1258        pipe_writer.write_all(test_data).expect("wrote to pipe");
1259        drop(pipe_writer);
1260
1261        let pipe_file = pipe_reader_to_file(pipe_reader);
1262
1263        let path = Utf8Path::new("/dev/fd/99");
1264        let result = ensure_seekable(pipe_file, path).expect("ensure_seekable succeeded");
1265
1266        // The result should be a seekable temp file containing the pipe data.
1267        let mut contents = Vec::new();
1268        let mut reader = io::BufReader::new(result);
1269        reader
1270            .read_to_end(&mut contents)
1271            .expect("read spooled contents");
1272        assert_eq!(contents, test_data);
1273    }
1274
1275    /// Tests that an empty pipe (zero bytes) is handled correctly.
1276    ///
1277    /// This simulates a download failure where the source produces no data.
1278    /// `ensure_seekable` should succeed (the temp file is created and rewound),
1279    /// and the downstream ZIP reader will report a proper error.
1280    #[test]
1281    fn test_ensure_seekable_empty_pipe() {
1282        let (pipe_reader, pipe_writer) = std::io::pipe().expect("created pipe");
1283        // Close writer immediately to produce an empty pipe.
1284        drop(pipe_writer);
1285
1286        let pipe_file = pipe_reader_to_file(pipe_reader);
1287        let path = Utf8Path::new("/dev/fd/42");
1288        let mut result = ensure_seekable(pipe_file, path).expect("empty pipe should succeed");
1289
1290        let mut contents = Vec::new();
1291        result.read_to_end(&mut contents).expect("read contents");
1292        assert!(contents.is_empty());
1293    }
1294
1295    /// Tests that the spool size limit is enforced for pipes.
1296    ///
1297    /// Uses `ensure_seekable_impl` with a small limit so we can trigger the
1298    /// `SpoolTooLarge` error without writing gigabytes.
1299    #[test]
1300    fn test_ensure_seekable_spool_too_large() {
1301        let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1302
1303        // Write 20 bytes, then set a limit of 10.
1304        pipe_writer
1305            .write_all(b"01234567890123456789")
1306            .expect("wrote to pipe");
1307        drop(pipe_writer);
1308
1309        let pipe_file = pipe_reader_to_file(pipe_reader);
1310
1311        let path = Utf8Path::new("/dev/fd/42");
1312        let result = ensure_seekable_impl(pipe_file, path, ByteSize(10));
1313        assert!(
1314            matches!(
1315                &result,
1316                Err(PortableRecordingReadError::SpoolTooLarge {
1317                    limit: ByteSize(10),
1318                    ..
1319                })
1320            ),
1321            "expected SpoolTooLarge, got {result:?}"
1322        );
1323    }
1324
1325    /// Tests that data exactly one byte over the spool limit fails.
1326    ///
1327    /// This is the precise boundary: `take(limit + 1)` reads exactly
1328    /// `limit + 1` bytes, and `bytes_copied > limit` triggers the error.
1329    #[test]
1330    fn test_ensure_seekable_spool_one_over_limit() {
1331        let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1332
1333        // Write exactly limit + 1 = 11 bytes with a limit of 10.
1334        pipe_writer
1335            .write_all(b"01234567890")
1336            .expect("wrote to pipe");
1337        drop(pipe_writer);
1338
1339        let pipe_file = pipe_reader_to_file(pipe_reader);
1340
1341        let path = Utf8Path::new("/dev/fd/42");
1342        let result = ensure_seekable_impl(pipe_file, path, ByteSize(10));
1343        assert!(
1344            matches!(
1345                &result,
1346                Err(PortableRecordingReadError::SpoolTooLarge {
1347                    limit: ByteSize(10),
1348                    ..
1349                })
1350            ),
1351            "expected SpoolTooLarge at limit+1 bytes, got {result:?}"
1352        );
1353    }
1354
1355    /// Tests that data exactly at the spool limit succeeds.
1356    #[test]
1357    fn test_ensure_seekable_spool_exact_limit() {
1358        let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1359
1360        // Write exactly 10 bytes with a limit of 10.
1361        pipe_writer.write_all(b"0123456789").expect("wrote to pipe");
1362        drop(pipe_writer);
1363
1364        let pipe_file = pipe_reader_to_file(pipe_reader);
1365
1366        let path = Utf8Path::new("/dev/fd/42");
1367        let mut result = ensure_seekable_impl(pipe_file, path, ByteSize(10))
1368            .expect("exact limit should succeed");
1369
1370        // Verify the spooled content is correct.
1371        let mut contents = Vec::new();
1372        result.read_to_end(&mut contents).expect("read contents");
1373        assert_eq!(contents, b"0123456789");
1374    }
1375}