Skip to main content

archive_trait/
builder.rs

1//! Format-neutral archive construction.
2//!
3//! Archive formats implement [`ArchiveBuilder`] and wrap the resulting writer
4//! in a stateful [`Builder`] to use the format-neutral construction APIs.
5
6mod traversal;
7
8use std::{
9    collections::VecDeque,
10    io::{self, Read},
11    mem,
12    ops::Range,
13    path::{Path, PathBuf},
14    pin::Pin,
15};
16
17use thiserror::Error;
18use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt};
19
20pub use self::traversal::TraversalError;
21use self::traversal::{TraversalEntry, TraversalKind, TraversalStream, stream_directory_entries};
22use crate::{
23    NameValidator,
24    component_tree::{ComponentTree, ROOT_NODE},
25    name::NameValidation,
26};
27
28const BUFFERED_SOURCE_FILE_BYTES: usize = 1024 * 1024;
29const FILE_PAYLOAD_CHUNK_BYTES: usize = 2 * 1024 * 1024;
30// A preparation batch may exceed this target by one buffered file, so its
31// payload storage remains below twice this value.
32const SOURCE_FILE_PREPARATION_BATCH_BYTES: usize = BUFFERED_SOURCE_FILE_BYTES;
33
34/// Minimal regular-file metadata accepted by [`Builder::add_file`].
35#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
36pub struct EntryMetadata {
37    executable: bool,
38}
39
40impl EntryMetadata {
41    /// Configures whether the regular file carries executable intent.
42    pub fn executable(mut self, executable: bool) -> Self {
43        self.executable = executable;
44        self
45    }
46
47    /// Returns whether this entry carries executable intent.
48    pub fn is_executable(self) -> bool {
49        self.executable
50    }
51}
52
53/// Controls format-neutral archive construction behavior.
54#[derive(Clone, Copy, Debug, Default)]
55pub struct BuilderPolicy {
56    name_validation: NameValidation,
57    symlink_policy: SymlinkPolicy,
58}
59
60/// Controls how source symbolic links are handled during recursive builds.
61#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
62pub enum SymlinkPolicy {
63    /// Reject recursive sources containing symbolic links.
64    #[default]
65    Reject,
66    /// Preserve symbolic links as link members in the resulting archive.
67    Preserve,
68    // TODO: Consider adding some kind of "Dereference" policy in the future,
69    // where symlinks get followed and replaced with their normal file/directory
70    // contents.
71}
72
73impl BuilderPolicy {
74    /// Configures validation for member names and preserved symbolic-link targets.
75    ///
76    /// Passing [`None`] disables configurable name validation. UTF-8 and
77    /// archive-format requirements still apply.
78    pub fn name_validator(mut self, validator: Option<NameValidator>) -> Self {
79        self.name_validation = NameValidation::from_validator(validator);
80        self
81    }
82
83    /// Configures how recursive builds handle source symbolic links.
84    ///
85    /// Symbolic links are **rejected by default**. Use
86    /// [`SymlinkPolicy::Preserve`] to write link members instead.
87    pub fn symlink_policy(mut self, policy: SymlinkPolicy) -> Self {
88        self.symlink_policy = policy;
89        self
90    }
91}
92
93struct BuilderState {
94    policy: BuilderPolicy,
95    entries: BuildEntries,
96    source_buffer: Vec<u8>,
97    poisoned: bool,
98}
99
100impl BuilderState {
101    fn new(policy: BuilderPolicy) -> Self {
102        Self {
103            policy,
104            entries: BuildEntries::new(),
105            source_buffer: Vec::new(),
106            poisoned: false,
107        }
108    }
109
110    fn ensure_active<E>(&self) -> Result<(), BuildError<E>> {
111        if self.poisoned {
112            return Err(BuildError::Poisoned);
113        }
114        Ok(())
115    }
116
117    // A backend write is provisionally poisoning. Completion clears this flag
118    // before the returned failure is classified; cancellation leaves it set.
119    fn begin_write(&mut self) {
120        self.poisoned = true;
121    }
122
123    fn complete_write(&mut self) {
124        self.poisoned = false;
125    }
126
127    fn poison(&mut self) {
128        self.poisoned = true;
129    }
130}
131
132/// A format-neutral, uncompressed file payload supplied to an [`ArchiveBuilder`]
133/// implementation.
134pub struct FilePayload<'a> {
135    size: u64,
136    started: bool,
137    inner: FilePayloadInner<'a>,
138}
139
140enum FilePayloadInner<'a> {
141    /// A pre-buffered payload.
142    ///
143    /// Observe that this is stored as an `Option` so that we can use the `None`
144    /// state to communicate that the payload has been consumed.
145    Buffered(Option<&'a [u8]>),
146    Reader {
147        source: Pin<Box<dyn AsyncRead + 'a>>,
148        filesystem_path: Option<&'a Path>,
149        buffer: Vec<u8>,
150        remaining: u64,
151        filled: usize,
152    },
153}
154
155impl<'a> FilePayload<'a> {
156    /// Creates a payload with a declared logical size and [`AsyncRead`] source.
157    ///
158    /// The builder reads exactly `size` bytes from `source`. Additional bytes
159    /// remain unread, while a source that ends early causes the addition to
160    /// fail.
161    pub fn new<R>(size: u64, source: R) -> Self
162    where
163        R: AsyncRead + 'a,
164    {
165        Self::streaming(size, source, Vec::new(), None)
166    }
167
168    /// Returns the logical, uncompressed source size in bytes.
169    ///
170    /// This is the total number of bytes yielded by [`Self::next_chunk`], not
171    /// necessarily the size ultimately stored by the archive format.
172    pub fn size(&self) -> u64 {
173        self.size
174    }
175
176    /// Returns the next chunk of logical, uncompressed source bytes.
177    ///
178    /// Once this method has been called, the payload cannot be passed to
179    /// [`Builder::add_file`].
180    pub async fn next_chunk<E>(&mut self) -> Result<Option<&[u8]>, BuildError<E>> {
181        self.started = true;
182        match &mut self.inner {
183            FilePayloadInner::Buffered(bytes) => Ok(bytes.take().filter(|bytes| !bytes.is_empty())),
184            FilePayloadInner::Reader {
185                source,
186                filesystem_path,
187                buffer,
188                remaining,
189                filled,
190            } => read_streaming_chunk(source, buffer, remaining, filled, *filesystem_path).await,
191        }
192    }
193
194    fn streaming<R>(
195        size: u64,
196        source: R,
197        buffer: Vec<u8>,
198        filesystem_path: Option<&'a Path>,
199    ) -> Self
200    where
201        R: AsyncRead + 'a,
202    {
203        Self {
204            size,
205            started: false,
206            inner: FilePayloadInner::Reader {
207                source: Box::pin(source),
208                filesystem_path,
209                buffer,
210                remaining: size,
211                filled: 0,
212            },
213        }
214    }
215
216    fn swap_buffer(&mut self, buffer: &mut Vec<u8>) {
217        if let FilePayloadInner::Reader {
218            buffer: payload_buffer,
219            ..
220        } = &mut self.inner
221        {
222            mem::swap(payload_buffer, buffer);
223        }
224    }
225}
226
227impl FilePayload<'static> {
228    /// Opens `path` and creates a payload from the complete regular file.
229    pub async fn from_path<P: AsRef<Path>>(path: P) -> io::Result<Self> {
230        let file = tokio::fs::File::open(path).await?;
231        Self::from_file(file).await
232    }
233
234    /// Creates a payload from the unread contents of a [`tokio::fs::File`].
235    ///
236    /// The declared size is the file length minus its current stream position.
237    /// This constructor rejects filesystem objects that are not regular files.
238    pub async fn from_file(mut file: tokio::fs::File) -> io::Result<Self> {
239        let metadata = file.metadata().await?;
240        if !metadata.is_file() {
241            return Err(io::Error::other(
242                "file payload source is not a regular file",
243            ));
244        }
245        let position = file.stream_position().await?;
246        Ok(Self::new(metadata.len().saturating_sub(position), file))
247    }
248}
249
250impl<'a> From<&'a [u8]> for FilePayload<'a> {
251    fn from(bytes: &'a [u8]) -> Self {
252        let size = bytes.len() as u64;
253        Self {
254            size,
255            started: false,
256            inner: FilePayloadInner::Buffered(Some(bytes)),
257        }
258    }
259}
260
261async fn read_streaming_chunk<'a, E, R>(
262    source: &mut R,
263    buffer: &'a mut Vec<u8>,
264    remaining: &mut u64,
265    filled: &mut usize,
266    filesystem_path: Option<&Path>,
267) -> Result<Option<&'a [u8]>, BuildError<E>>
268where
269    R: AsyncRead + Unpin + ?Sized,
270{
271    if *remaining == 0 {
272        return Ok(None);
273    }
274
275    let chunk_size = (*remaining).min(FILE_PAYLOAD_CHUNK_BYTES as u64);
276    let chunk_len = usize::try_from(chunk_size)
277        .map_err(|_| arithmetic_overflow("file payload read buffer size"))?;
278    buffer.resize(chunk_len, 0);
279    // Progress lives in the payload rather than this future, so cancelling and
280    // retrying `FilePayload::next_chunk` cannot discard completed reads.
281    while *filled < chunk_len {
282        let read = source
283            .read(&mut buffer[*filled..])
284            .await
285            .map_err(|source| file_payload_read_error(filesystem_path, source))?;
286        if read == 0 {
287            return Err(file_payload_read_error(
288                filesystem_path,
289                io::Error::new(
290                    io::ErrorKind::UnexpectedEof,
291                    "file payload source ended before its declared size",
292                ),
293            ));
294        }
295        *filled += read;
296    }
297    *remaining -= chunk_size;
298    *filled = 0;
299    Ok(Some(buffer))
300}
301
302/// A failure returned by an [`ArchiveBuilder`] format hook.
303///
304/// This distinguishes errors known to precede output from errors that may have
305/// left a partial member in the output archive.
306#[derive(Debug)]
307pub struct BuildFailure<E> {
308    error: BuildError<E>,
309    // TODO: Maybe make all failures poisoning?
310    // I'm not sure we really need the distinction here.
311    poisons_builder: bool,
312}
313
314impl<E> BuildFailure<E> {
315    /// Reports a failure that occurred before the hook wrote any output.
316    pub fn recoverable(error: BuildError<E>) -> Self {
317        Self {
318            error,
319            poisons_builder: false,
320        }
321    }
322
323    /// Reports a failure that may have left partial output.
324    pub fn poisoned(error: BuildError<E>) -> Self {
325        Self {
326            error,
327            poisons_builder: true,
328        }
329    }
330
331    fn into_parts(self) -> (BuildError<E>, bool) {
332        (self.error, self.poisons_builder)
333    }
334}
335
336/// A format-specific archive writer that can create a stateful [`Builder`].
337///
338/// The asynchronous methods on this trait are implementation hooks for
339/// [`Builder`]. Archive construction callers must not invoke them directly;
340/// doing so bypasses builder policy, collision tracking, and cancellation
341/// poisoning. Use [`Self::builder`] or [`Self::builder_with_policy`] and then
342/// the [`Builder`] APIs instead.
343///
344/// Hook implementations must return [`BuildFailure::recoverable`] only when the
345/// failed invocation wrote no output. Any failure after output may have begun
346/// must use [`BuildFailure::poisoned`].
347#[expect(
348    async_fn_in_trait,
349    reason = "archive writers may be !Send and run on a local executor"
350)]
351pub trait ArchiveBuilder: Sized {
352    /// The archive-format error returned while encoding entries.
353    type Error;
354
355    /// Wraps this format writer in a builder using default policy.
356    ///
357    /// Implementors should not override this default implementation.
358    fn builder(self) -> Builder<Self> {
359        Builder {
360            backend: self,
361            state: BuilderState::new(BuilderPolicy::default()),
362        }
363    }
364
365    /// Wraps this format writer in a builder using `policy`.
366    ///
367    /// Implementors should not override this default implementation.
368    fn builder_with_policy(self, policy: BuilderPolicy) -> Builder<Self> {
369        Builder {
370            backend: self,
371            state: BuilderState::new(policy),
372        }
373    }
374
375    /// Writes any format-specific archive terminator or index.
376    async fn finish_archive(&mut self) -> Result<(), BuildFailure<Self::Error>>;
377
378    /// Writes one regular-file member and its complete payload.
379    ///
380    /// Implementations must call [`FilePayload::next_chunk`] through
381    /// completion and classify failures using [`BuildFailure`].
382    async fn write_file_member(
383        &mut self,
384        path: &str,
385        payload: &mut FilePayload<'_>,
386        metadata: EntryMetadata,
387    ) -> Result<(), BuildFailure<Self::Error>>;
388
389    /// Writes one directory member.
390    async fn write_directory_member(&mut self, path: &str)
391    -> Result<(), BuildFailure<Self::Error>>;
392
393    /// Writes one symbolic-link member.
394    async fn write_symbolic_link_member(
395        &mut self,
396        path: &str,
397        target: &str,
398    ) -> Result<(), BuildFailure<Self::Error>>;
399}
400
401/// A stateful format-neutral archive construction engine.
402///
403/// Create this wrapper with [`ArchiveBuilder::builder`] or
404/// [`ArchiveBuilder::builder_with_policy`].
405pub struct Builder<B> {
406    backend: B,
407    state: BuilderState,
408}
409
410impl<B: ArchiveBuilder> Builder<B> {
411    /// Adds one regular file from a [`FilePayload`].
412    ///
413    /// If the payload ends before its declared size or returns an error, the
414    /// addition fails and the builder is poisoned if the archive member's
415    /// output may already have begun.
416    ///
417    /// The payload must not have been read through [`FilePayload::next_chunk`]
418    /// before this method is called.
419    pub async fn add_file<'a, P>(
420        &mut self,
421        path: P,
422        payload: impl Into<FilePayload<'a>>,
423        metadata: EntryMetadata,
424    ) -> Result<(), BuildError<B::Error>>
425    where
426        P: AsRef<Path>,
427    {
428        self.state.ensure_active()?;
429        let archive_path = path.as_ref();
430        let Some(path) = archive_path.to_str() else {
431            return Err(BuildError::InvalidArchivePath {
432                path: archive_path.to_path_buf(),
433                reason: "path is not valid UTF-8",
434            });
435        };
436        if !self.state.policy.name_validation.accepts(path) {
437            return Err(BuildError::NameRejected {
438                context: "member path",
439                value: path.to_owned(),
440            });
441        }
442        let path = path.to_owned();
443        let reservation = self
444            .state
445            .entries
446            .preflight_entry(&path, ArchivedEntry::NonDirectory)?;
447        let mut payload = payload.into();
448        if payload.started {
449            return Err(BuildError::FilePayloadAlreadyRead);
450        }
451        payload.swap_buffer(&mut self.state.source_buffer);
452        self.state.begin_write();
453        let result = self
454            .backend
455            .write_file_member(&path, &mut payload, metadata)
456            .await;
457        self.state.complete_write();
458        payload.swap_buffer(&mut self.state.source_buffer);
459        self.resolve_hook(result)?;
460        self.state.entries.commit_entry(&path, reservation);
461        Ok(())
462    }
463
464    /// Adds one directory member without reading from the filesystem.
465    ///
466    /// This creates only the named directory member. It does not add any child
467    /// members; use [`Self::add_directory_all`] to recursively add a filesystem
468    /// directory and its contents.
469    pub async fn add_directory<P: AsRef<Path>>(
470        &mut self,
471        path: P,
472    ) -> Result<(), BuildError<B::Error>> {
473        self.state.ensure_active()?;
474        let archive_path = path.as_ref();
475        let Some(path) = archive_path.to_str() else {
476            return Err(BuildError::InvalidArchivePath {
477                path: archive_path.to_path_buf(),
478                reason: "path is not valid UTF-8",
479            });
480        };
481        if !self.state.policy.name_validation.accepts(path) {
482            return Err(BuildError::NameRejected {
483                context: "member path",
484                value: path.to_owned(),
485            });
486        }
487        let path = path.to_owned();
488        let reservation = self
489            .state
490            .entries
491            .preflight_entry(&path, ArchivedEntry::Directory { explicit: true })?;
492        self.state.begin_write();
493        let result = self.backend.write_directory_member(&path).await;
494        self.state.complete_write();
495        self.resolve_hook(result)?;
496        self.state.entries.commit_entry(&path, reservation);
497        Ok(())
498    }
499
500    /// Recursively adds a filesystem directory beneath its UTF-8 basename.
501    ///
502    /// Entries are visited in deterministic sorted order and files are streamed
503    /// with bounded memory. Source symbolic links are rejected by default;
504    /// [`BuilderPolicy::symlink_policy`] can instead preserve them. A late
505    /// traversal or validation failure may leave partial output and poison
506    /// this builder.
507    pub async fn add_directory_all<P: AsRef<Path>>(
508        &mut self,
509        source: P,
510    ) -> Result<(), BuildError<B::Error>> {
511        self.state.ensure_active()?;
512        let source = source.as_ref().to_path_buf();
513        let mut entries = stream_directory_entries(
514            source,
515            self.state.policy.name_validation,
516            self.state.policy.symlink_policy,
517        )
518        .map_err(BuildError::Traversal)?;
519        self.state.begin_write();
520        let mut traversal = DirectoryBuild {
521            entries: &mut self.state.entries,
522            source_buffer: mem::take(&mut self.state.source_buffer),
523            emitted: false,
524        };
525        let write_result =
526            write_directory_entries(&mut self.backend, &mut entries, &mut traversal).await;
527        let traversal_result = entries
528            .finish()
529            .await
530            .map_err(BuildError::Traversal)
531            .map_err(BuildFailure::recoverable);
532        let result = write_result.and(traversal_result);
533        let DirectoryBuild {
534            entries: _,
535            source_buffer,
536            emitted,
537        } = traversal;
538        self.state.complete_write();
539        self.state.source_buffer = source_buffer;
540        match result {
541            Ok(()) => Ok(()),
542            Err(error) => {
543                let (error, hook_poisoned) = error.into_parts();
544                if emitted || hook_poisoned {
545                    self.state.poison();
546                }
547                Err(error)
548            }
549        }
550    }
551
552    /// Finalizes and consumes this archive builder.
553    ///
554    /// Callers that need to retain access to an output sink should lend it to
555    /// the format writer before wrapping it rather than transferring ownership.
556    pub async fn finish(mut self) -> Result<(), BuildError<B::Error>> {
557        self.state.ensure_active()?;
558        let result = self.backend.finish_archive().await;
559        self.resolve_hook(result)
560    }
561
562    fn resolve_hook<T>(
563        &mut self,
564        result: Result<T, BuildFailure<B::Error>>,
565    ) -> Result<T, BuildError<B::Error>> {
566        match result {
567            Ok(value) => Ok(value),
568            Err(error) => {
569                let (error, poisons_builder) = error.into_parts();
570                if poisons_builder {
571                    self.state.poison();
572                }
573                Err(error)
574            }
575        }
576    }
577}
578
579async fn write_directory_entries<B: ArchiveBuilder>(
580    builder: &mut B,
581    entries: &mut TraversalStream,
582    traversal: &mut DirectoryBuild<'_>,
583) -> Result<(), BuildFailure<B::Error>> {
584    while let Some(entries) = entries.recv().await {
585        let mut entries = VecDeque::from(entries);
586        while !entries.is_empty() {
587            let buffer = mem::take(&mut traversal.source_buffer);
588            let (prepared, remaining) = prepare_directory_entries(entries, buffer)
589                .await
590                .map_err(SourceError::into_build_error)
591                .map_err(BuildFailure::recoverable)?;
592            entries = remaining;
593            let PreparedDirectoryBatch {
594                entries: prepared_entries,
595                mut buffer,
596            } = prepared;
597            let result =
598                write_prepared_directory_entries(builder, prepared_entries, &mut buffer, traversal)
599                    .await;
600            traversal.source_buffer = buffer;
601            result?;
602        }
603    }
604    Ok(())
605}
606
607async fn write_prepared_directory_entries<B: ArchiveBuilder>(
608    builder: &mut B,
609    entries: Vec<PreparedTraversalEntry>,
610    buffer: &mut Vec<u8>,
611    traversal: &mut DirectoryBuild<'_>,
612) -> Result<(), BuildFailure<B::Error>> {
613    for entry in entries {
614        let reservation = traversal
615            .entries
616            .preflight_entry(
617                &entry.archive_path,
618                if matches!(&entry.kind, PreparedTraversalKind::Directory) {
619                    ArchivedEntry::Directory { explicit: true }
620                } else {
621                    ArchivedEntry::NonDirectory
622                },
623            )
624            .map_err(BuildFailure::recoverable)?;
625        match entry.kind {
626            PreparedTraversalKind::Directory => {
627                builder.write_directory_member(&entry.archive_path).await?;
628            }
629            PreparedTraversalKind::BufferedFile { range, executable } => {
630                let data = buffer.get(range).ok_or_else(|| {
631                    BuildFailure::recoverable(arithmetic_overflow(
632                        "prepared source file buffer range",
633                    ))
634                })?;
635                let mut payload = FilePayload::from(data);
636                builder
637                    .write_file_member(
638                        &entry.archive_path,
639                        &mut payload,
640                        EntryMetadata::default().executable(executable),
641                    )
642                    .await?;
643            }
644            PreparedTraversalKind::StreamingFile {
645                file,
646                path,
647                size,
648                executable,
649            } => {
650                let mut file = tokio::fs::File::from_std(file);
651                file.set_max_buf_size(FILE_PAYLOAD_CHUNK_BYTES);
652                let mut payload =
653                    FilePayload::streaming(size, file, mem::take(buffer), Some(path.as_path()));
654                let result = builder
655                    .write_file_member(
656                        &entry.archive_path,
657                        &mut payload,
658                        EntryMetadata::default().executable(executable),
659                    )
660                    .await;
661                payload.swap_buffer(buffer);
662                result?;
663            }
664            PreparedTraversalKind::SymbolicLink { target } => {
665                builder
666                    .write_symbolic_link_member(&entry.archive_path, &target)
667                    .await?;
668            }
669        }
670        traversal
671            .entries
672            .commit_entry(&entry.archive_path, reservation);
673        traversal.emitted = true;
674    }
675    Ok(())
676}
677
678struct DirectoryBuild<'entries> {
679    entries: &'entries mut BuildEntries,
680    source_buffer: Vec<u8>,
681    emitted: bool,
682}
683
684struct PreparedDirectoryBatch {
685    entries: Vec<PreparedTraversalEntry>,
686    buffer: Vec<u8>,
687}
688
689struct PreparedTraversalEntry {
690    archive_path: String,
691    kind: PreparedTraversalKind,
692}
693
694enum PreparedTraversalKind {
695    Directory,
696    BufferedFile {
697        range: Range<usize>,
698        executable: bool,
699    },
700    StreamingFile {
701        file: std::fs::File,
702        path: PathBuf,
703        size: u64,
704        executable: bool,
705    },
706    SymbolicLink {
707        target: String,
708    },
709}
710
711async fn prepare_directory_entries(
712    mut entries: VecDeque<TraversalEntry>,
713    mut buffer: Vec<u8>,
714) -> Result<(PreparedDirectoryBatch, VecDeque<TraversalEntry>), SourceError> {
715    tokio::task::spawn_blocking(move || {
716        buffer.clear();
717        let mut prepared = Vec::with_capacity(entries.len());
718        while let Some(entry) = entries.pop_front() {
719            let TraversalEntry {
720                source,
721                archive_path,
722                kind,
723            } = entry;
724            let (kind, batch_complete) = match kind {
725                TraversalKind::Directory => (PreparedTraversalKind::Directory, false),
726                TraversalKind::Regular => prepare_regular_file(source, &mut buffer)?,
727                TraversalKind::SymbolicLink { target } => {
728                    (PreparedTraversalKind::SymbolicLink { target }, false)
729                }
730            };
731            prepared.push(PreparedTraversalEntry { archive_path, kind });
732            if batch_complete {
733                break;
734            }
735        }
736        Ok((
737            PreparedDirectoryBatch {
738                entries: prepared,
739                buffer,
740            },
741            entries,
742        ))
743    })
744    .await
745    .map_err(SourceError::BlockingTask)?
746}
747
748fn prepare_regular_file(
749    path: PathBuf,
750    buffer: &mut Vec<u8>,
751) -> Result<(PreparedTraversalKind, bool), SourceError> {
752    let file = std::fs::File::open(&path)
753        .map_err(|source| SourceError::filesystem("open source file", &path, source))?;
754    let metadata = file
755        .metadata()
756        .map_err(|source| SourceError::filesystem("inspect source file", &path, source))?;
757    if !metadata.is_file() {
758        return Err(SourceError::filesystem(
759            "inspect source file",
760            &path,
761            io::Error::other("source is not a regular file"),
762        ));
763    }
764    let size = metadata.len();
765    let executable = is_executable(&metadata);
766    if size > BUFFERED_SOURCE_FILE_BYTES as u64 {
767        return Ok((
768            PreparedTraversalKind::StreamingFile {
769                file,
770                path,
771                size,
772                executable,
773            },
774            true,
775        ));
776    }
777    let payload_size = usize::try_from(size).map_err(|_| SourceError::ArithmeticOverflow {
778        context: "buffered source file size",
779    })?;
780    let start = buffer.len();
781    let end = start
782        .checked_add(payload_size)
783        .ok_or(SourceError::ArithmeticOverflow {
784            context: "buffered source batch size",
785        })?;
786    buffer.resize(end, 0);
787    (&file)
788        .read_exact(&mut buffer[start..end])
789        .map_err(|source| SourceError::filesystem("read source file", &path, source))?;
790    Ok((
791        PreparedTraversalKind::BufferedFile {
792            range: start..end,
793            executable,
794        },
795        buffer.len() >= SOURCE_FILE_PREPARATION_BATCH_BYTES,
796    ))
797}
798
799enum SourceError {
800    Filesystem {
801        operation: &'static str,
802        path: PathBuf,
803        source: io::Error,
804    },
805    BlockingTask(tokio::task::JoinError),
806    ArithmeticOverflow {
807        context: &'static str,
808    },
809}
810
811impl SourceError {
812    fn filesystem(operation: &'static str, path: &Path, source: io::Error) -> Self {
813        Self::Filesystem {
814            operation,
815            path: path.to_path_buf(),
816            source,
817        }
818    }
819
820    fn into_build_error<E>(self) -> BuildError<E> {
821        match self {
822            Self::Filesystem {
823                operation,
824                path,
825                source,
826            } => BuildError::Filesystem {
827                operation,
828                path,
829                source,
830            },
831            Self::BlockingTask(error) => BuildError::BlockingTask(error),
832            Self::ArithmeticOverflow { context } => BuildError::ArithmeticOverflow { context },
833        }
834    }
835}
836
837/// A failure while constructing an archive.
838#[derive(Debug, Error)]
839pub enum BuildError<E> {
840    /// The archive format encoder failed.
841    #[error(transparent)]
842    Encoder(E),
843    /// Traversing a recursive source failed.
844    #[error(transparent)]
845    Traversal(#[from] TraversalError),
846    /// A requested archive path cannot be represented by the UTF-8 builder.
847    #[error("invalid archive path {path:?}: {reason}")]
848    InvalidArchivePath {
849        /// The rejected archive path.
850        path: PathBuf,
851        /// The reason the path cannot be represented.
852        reason: &'static str,
853    },
854    /// An archive name was rejected by the configured [`BuilderPolicy`].
855    #[error("archive {context} rejected by builder policy: {value:?}")]
856    NameRejected {
857        /// The role of the rejected archive text.
858        context: &'static str,
859        /// The rejected UTF-8 value.
860        value: String,
861    },
862    /// An archive path collides with a previously reserved entry.
863    #[error("archive entry collides with existing path {path}")]
864    PathCollision {
865        /// The conflicting normalized archive path.
866        path: String,
867    },
868    /// A file payload was read before it was passed to [`Builder::add_file`].
869    #[error("file payload was already read before being added to the archive")]
870    FilePayloadAlreadyRead,
871    /// A source filesystem operation failed.
872    #[error("failed to {operation} {path}: {source}")]
873    Filesystem {
874        /// The operation that failed.
875        operation: &'static str,
876        /// The affected source filesystem path.
877        path: PathBuf,
878        /// The underlying I/O error.
879        #[source]
880        source: io::Error,
881    },
882    /// Reading an asynchronous file payload source failed.
883    #[error("failed to read archive file payload source")]
884    SourceRead {
885        /// The underlying I/O error.
886        #[source]
887        source: io::Error,
888    },
889    /// A blocking filesystem operation failed to complete.
890    #[error("failed to complete blocking archive filesystem operation: {0}")]
891    BlockingTask(#[from] tokio::task::JoinError),
892    /// The builder cannot continue because a prior failure may have written bytes.
893    #[error("archive builder is poisoned after a previous partial write")]
894    Poisoned,
895    /// A size computation exceeded this API's range.
896    #[error("arithmetic overflow while computing {context}")]
897    ArithmeticOverflow {
898        /// The failed computation.
899        context: &'static str,
900    },
901}
902
903#[derive(Clone, Copy, Debug)]
904enum ArchivedEntry {
905    Directory { explicit: bool },
906    NonDirectory,
907}
908
909/// Builder collision state keyed by literal `/`-separated archive components.
910#[derive(Debug)]
911struct BuildEntries(ComponentTree<Box<str>, ArchivedEntry>);
912
913/// Proof that an entry was checked against the current collision state.
914struct EntryReservation {
915    entry: ArchivedEntry,
916}
917
918impl BuildEntries {
919    fn new() -> Self {
920        Self(ComponentTree::new(None))
921    }
922
923    fn preflight_entry<E>(
924        &self,
925        path: &str,
926        entry: ArchivedEntry,
927    ) -> Result<EntryReservation, BuildError<E>> {
928        let mut parent = ROOT_NODE;
929        let mut components = archive_path_components(path).peekable();
930        while let Some((component, prefix)) = components.next() {
931            let Some(node) = self.0.child(parent, component) else {
932                return Ok(EntryReservation { entry });
933            };
934            if components.peek().is_some() {
935                match self.0.state(node) {
936                    Some(ArchivedEntry::Directory { .. }) => parent = node,
937                    Some(ArchivedEntry::NonDirectory) => return Err(path_collision(prefix)),
938                    None => return Ok(EntryReservation { entry }),
939                }
940            } else {
941                match (self.0.state(node), entry) {
942                    (
943                        Some(ArchivedEntry::Directory { explicit: false }),
944                        ArchivedEntry::Directory { .. },
945                    )
946                    | (None, _) => return Ok(EntryReservation { entry }),
947                    (Some(_), _) => return Err(path_collision(prefix)),
948                }
949            }
950        }
951        Ok(EntryReservation { entry })
952    }
953
954    fn commit_entry(&mut self, path: &str, reservation: EntryReservation) {
955        // The builder holds exclusive state access while the backend hook is
956        // awaited, so a successful reservation remains valid until this commit.
957        let mut parent = ROOT_NODE;
958        let mut components = archive_path_components(path).peekable();
959        while let Some((component, _)) = components.next() {
960            let node = self
961                .0
962                .ensure_child_with(parent, component, || component.into());
963            if components.peek().is_some() {
964                if self.0.state(node).is_none() {
965                    self.0
966                        .set_state(node, ArchivedEntry::Directory { explicit: false });
967                }
968            } else {
969                self.0.set_state(node, reservation.entry);
970            }
971            parent = node;
972        }
973    }
974
975    #[cfg(test)]
976    fn node_count(&self) -> usize {
977        self.0.node_count()
978    }
979
980    #[cfg(test)]
981    fn component_bytes(&self) -> usize {
982        self.0.components().map(|component| component.len()).sum()
983    }
984}
985
986/// Iterates the exact textual component and prefix at each `/` boundary.
987fn archive_path_components(path: &str) -> impl Iterator<Item = (&str, &str)> {
988    let mut component_start = 0;
989    path.split('/').map(move |component| {
990        let prefix_end = component_start + component.len();
991        let prefix = &path[..prefix_end];
992        component_start = if prefix_end < path.len() {
993            prefix_end + 1
994        } else {
995            prefix_end
996        };
997        (component, prefix)
998    })
999}
1000
1001fn file_payload_read_error<E>(filesystem_path: Option<&Path>, source: io::Error) -> BuildError<E> {
1002    if let Some(path) = filesystem_path {
1003        BuildError::Filesystem {
1004            operation: "read source file",
1005            path: path.to_path_buf(),
1006            source,
1007        }
1008    } else {
1009        BuildError::SourceRead { source }
1010    }
1011}
1012
1013fn arithmetic_overflow<E>(context: &'static str) -> BuildError<E> {
1014    BuildError::ArithmeticOverflow { context }
1015}
1016
1017fn path_collision<E>(path: &str) -> BuildError<E> {
1018    BuildError::PathCollision {
1019        path: path.to_owned(),
1020    }
1021}
1022
1023#[cfg(unix)]
1024fn is_executable(metadata: &std::fs::Metadata) -> bool {
1025    use std::os::unix::fs::PermissionsExt;
1026
1027    metadata.permissions().mode() & 0o111 != 0
1028}
1029
1030#[cfg(not(unix))]
1031fn is_executable(_metadata: &std::fs::Metadata) -> bool {
1032    false
1033}
1034
1035#[cfg(test)]
1036mod tests {
1037    use std::fs;
1038
1039    use tempfile::tempdir;
1040
1041    use super::*;
1042
1043    #[derive(Debug)]
1044    struct TestError;
1045
1046    #[derive(Default)]
1047    struct NoopArchiveBuilder {
1048        fail_next_file: bool,
1049        fail_next_directory: bool,
1050    }
1051
1052    impl ArchiveBuilder for NoopArchiveBuilder {
1053        type Error = TestError;
1054
1055        async fn finish_archive(&mut self) -> Result<(), BuildFailure<Self::Error>> {
1056            Ok(())
1057        }
1058
1059        async fn write_file_member(
1060            &mut self,
1061            _path: &str,
1062            payload: &mut FilePayload<'_>,
1063            _metadata: EntryMetadata,
1064        ) -> Result<(), BuildFailure<Self::Error>> {
1065            if mem::take(&mut self.fail_next_file) {
1066                return Err(BuildFailure::recoverable(BuildError::Encoder(TestError)));
1067            }
1068            loop {
1069                match payload.next_chunk::<TestError>().await {
1070                    Ok(Some(_)) => {}
1071                    Ok(None) => return Ok(()),
1072                    Err(error) => return Err(BuildFailure::recoverable(error)),
1073                }
1074            }
1075        }
1076
1077        async fn write_directory_member(
1078            &mut self,
1079            _path: &str,
1080        ) -> Result<(), BuildFailure<Self::Error>> {
1081            if mem::take(&mut self.fail_next_directory) {
1082                return Err(BuildFailure::recoverable(BuildError::Encoder(TestError)));
1083            }
1084            Ok(())
1085        }
1086
1087        async fn write_symbolic_link_member(
1088            &mut self,
1089            _path: &str,
1090            _target: &str,
1091        ) -> Result<(), BuildFailure<Self::Error>> {
1092            Ok(())
1093        }
1094    }
1095
1096    #[tokio::test]
1097    async fn deep_manual_entry_uses_linear_component_storage() {
1098        const COMPONENT: &str = "segment";
1099        const DEPTH: usize = 4_096;
1100
1101        let mut path = format!("{COMPONENT}/").repeat(DEPTH);
1102        path.push_str("file");
1103        let mut builder = NoopArchiveBuilder::default().builder();
1104        builder
1105            .add_file(&path, b"".as_slice(), EntryMetadata::default())
1106            .await
1107            .expect("deep manual file should be added");
1108
1109        assert_eq!(builder.state.entries.node_count(), DEPTH + 2);
1110        assert_eq!(
1111            builder.state.entries.component_bytes(),
1112            DEPTH * COMPONENT.len() + "file".len()
1113        );
1114    }
1115
1116    #[tokio::test]
1117    async fn collision_state_preserves_literal_slash_components() {
1118        let mut builder = NoopArchiveBuilder::default().builder();
1119        for path in ["a//b", "a/b", "/absolute", "absolute", ".", ".."] {
1120            builder
1121                .add_file(path, b"".as_slice(), EntryMetadata::default())
1122                .await
1123                .expect("distinct textual path should be added");
1124        }
1125
1126        for (path, collision) in [("a//b", "a//b"), ("a/", "a/"), ("", ""), ("./child", ".")] {
1127            assert!(matches!(
1128                builder
1129                    .add_file(
1130                        path,
1131                        b"".as_slice(),
1132                        EntryMetadata::default(),
1133                    )
1134                    .await,
1135                Err(BuildError::PathCollision { path }) if path == collision
1136            ));
1137        }
1138    }
1139
1140    #[tokio::test]
1141    async fn recoverable_write_failure_does_not_commit_reservation() {
1142        let mut builder = NoopArchiveBuilder {
1143            fail_next_file: true,
1144            ..Default::default()
1145        }
1146        .builder();
1147        assert!(matches!(
1148            builder
1149                .add_file("parent/file", b"".as_slice(), EntryMetadata::default(),)
1150                .await,
1151            Err(BuildError::Encoder(TestError))
1152        ));
1153        builder
1154            .add_file("parent/file", b"".as_slice(), EntryMetadata::default())
1155            .await
1156            .expect("a recoverable failure should not reserve the path");
1157    }
1158
1159    #[tokio::test]
1160    async fn recoverable_directory_write_failure_does_not_commit_reservation() {
1161        let mut builder = NoopArchiveBuilder {
1162            fail_next_directory: true,
1163            ..Default::default()
1164        }
1165        .builder();
1166
1167        assert!(matches!(
1168            builder.add_directory("directory").await,
1169            Err(BuildError::Encoder(TestError))
1170        ));
1171        assert_eq!(builder.state.entries.node_count(), 1);
1172
1173        builder
1174            .add_directory("directory")
1175            .await
1176            .expect("a recoverable failure should not reserve the directory");
1177        assert_eq!(builder.state.entries.node_count(), 2);
1178    }
1179
1180    #[tokio::test]
1181    async fn repeated_directory_additions_use_linear_component_storage() {
1182        const DIRECTORIES: usize = 256;
1183
1184        let temp = tempdir().expect("temporary directory should be created");
1185        let mut builder = NoopArchiveBuilder::default().builder();
1186        for index in 0..DIRECTORIES {
1187            let source = temp.path().join(format!("directory-{index}"));
1188            fs::create_dir(&source).expect("source directory should be created");
1189            builder
1190                .add_directory_all(&source)
1191                .await
1192                .expect("empty source directory should be added");
1193        }
1194
1195        assert_eq!(builder.state.entries.node_count(), DIRECTORIES + 1);
1196    }
1197}