Skip to main content

tar_framing/
stream.rs

1//! Lossless, block-oriented tar streaming.
2//!
3//! This API emits one frame for each accepted non-terminator physical
4//! tar block and preserves each source block verbatim.
5//!
6//! The following Mermaid diagram described the state machine:
7//!
8//! ```mermaid
9//! ---
10//! config:
11//!   layout: elk
12//! ---
13//!
14//! stateDiagram-v2
15//!   state "AwaitingHeader (unclassified)" as Unclassified
16//!   state SelectFormat <<choice>>
17//!
18//!   [*] --> Unclassified
19//!   Unclassified --> AwaitingSecondZero: first zero block
20//!   Unclassified --> SelectFormat: first nonzero header
21//!   SelectFormat --> PosixHeader: ustar identity
22//!   SelectFormat --> GnuHeader: GNU identity
23//!   SelectFormat --> Failed: unsupported identity
24//!
25//!   state "POSIX-pax selected" as Pax {
26//!     direction TB
27//!     state "AwaitingHeader" as PosixBoundary
28//!     state PosixHeader <<choice>>
29//!     state "ReadingMember" as PosixMemberData
30//!
31//!     PosixBoundary --> PosixHeader: next header
32//!     PosixHeader --> ReadingPax: x or g
33//!     PosixHeader --> PosixMemberData: member data
34//!     PosixHeader --> PosixBoundary: empty member
35//!     ReadingPax --> AwaitingUstarHeader: local x payload complete
36//!     ReadingPax --> PosixBoundary: global g payload complete
37//!     AwaitingUstarHeader --> PosixMemberData: member data
38//!     AwaitingUstarHeader --> PosixBoundary: empty member
39//!     PosixMemberData --> PosixBoundary: payload complete
40//!   }
41//!
42//!   state "GNU selected" as Gnu {
43//!     direction TB
44//!     state "AwaitingHeader" as GnuBoundary
45//!     state GnuHeader <<choice>>
46//!     state "ReadingMember" as GnuMemberData
47//!
48//!     GnuBoundary --> GnuHeader: next header
49//!     GnuHeader --> ReadingGnu: L or K data
50//!     GnuHeader --> AwaitingGnuMember: empty L or K
51//!     GnuHeader --> GnuMemberData: member data
52//!     GnuHeader --> GnuBoundary: empty member
53//!     ReadingGnu --> AwaitingGnuMember: metadata payload complete
54//!     AwaitingGnuMember --> ReadingGnu: another L or K data
55//!     AwaitingGnuMember --> AwaitingGnuMember: another empty L or K
56//!     AwaitingGnuMember --> GnuMemberData: member data
57//!     AwaitingGnuMember --> GnuBoundary: empty member
58//!     GnuMemberData --> GnuBoundary: payload complete
59//!   }
60//!
61//! PosixBoundary --> AwaitingSecondZero: first zero block
62//! GnuBoundary --> AwaitingSecondZero: first zero block
63//! AwaitingSecondZero --> Complete: second zero block
64//! AwaitingSecondZero --> Failed: nonzero block or EOF
65//! Complete --> [*]
66//! Failed --> [*]
67//!
68//! note right of Failed
69//!   Validation, ordering, and family-mismatch
70//!   errors also enter Failed; arrows omitted.
71//! end note
72//! ```
73
74use std::{
75    future::poll_fn,
76    ops::Range,
77    pin::Pin,
78    sync::Arc,
79    task::{Context, Poll},
80};
81
82use tokio::io::{AsyncRead, ReadBuf};
83use tokio_stream::Stream;
84
85use crate::{
86    ArchiveFormat, BLOCK_SIZE, Block, DEFAULT_MAX_GLOBAL_PAX_EXTENSIONS_SIZE,
87    DEFAULT_MAX_GNU_EXTENSION_SIZE, DEFAULT_MAX_PAX_EXTENSION_SIZE, FrameError, FrameErrorInner,
88    GnuKind, HdrCharset, PaxError, PaxKind, PaxRecord, PaxState, PaxValue, UstarKind,
89    header::{
90        CHECKSUM_RANGE, GID_RANGE, GNAME_RANGE, GNU_IDENTITY, IDENTITY_RANGE, MODE_RANGE,
91        MTIME_RANGE, NAME_RANGE, PREFIX_RANGE, SIZE_RANGE, TYPEFLAG_OFFSET, UID_RANGE, UNAME_RANGE,
92        USTAR_IDENTITY, checksum, is_all_nul, parse_number, parse_octal,
93    },
94    pax::{GlobalPaxRecords, PaxRecords, SharedPaxRecords},
95};
96
97type PositionedBlock = (u64, Block);
98
99/// Represents a single non-terminator physical block in a tar stream.
100#[derive(Clone, Debug, Eq, PartialEq)]
101pub enum Frame {
102    /// A local or global pax extended header block.
103    Pax(PaxFrame),
104    /// A GNU long-name or long-link extension header block.
105    Gnu(GnuFrame),
106    /// An ordinary POSIX-ustar or GNU member header block.
107    Header(HeaderFrame),
108    /// A pax or member payload block.
109    Data(DataFrame),
110}
111
112/// A pax extended header block.
113#[derive(Clone, Debug, Eq, PartialEq)]
114pub struct PaxFrame {
115    /// The absolute byte position of this block in the source stream.
116    pub position: u64,
117    /// The lossless header block bytes.
118    pub block: Block,
119    /// Whether this header is local or global.
120    pub kind: PaxKind,
121    /// The number of bytes occupied by the extended header records.
122    pub payload_size: u64,
123}
124
125/// A GNU metadata extension header block.
126#[derive(Clone, Debug, Eq, PartialEq)]
127pub struct GnuFrame {
128    /// The absolute byte position of this block in the source stream.
129    pub position: u64,
130    /// The lossless header block bytes.
131    pub block: Block,
132    /// The GNU extension kind.
133    pub kind: GnuKind,
134    /// The number of metadata payload bytes following the header.
135    pub payload_size: u64,
136}
137
138/// An ordinary physical member header block in the selected archive family.
139///
140/// PAX records remain on their physical payload frames. Use
141/// [`crate::logical::TarReader`] for assembled member metadata.
142#[derive(Clone, Debug, Eq, PartialEq)]
143pub struct HeaderFrame {
144    /// The absolute byte position of this block in the source stream.
145    pub position: u64,
146    /// The lossless header block bytes.
147    pub block: Block,
148    /// The selected archive family of this member header.
149    pub format: ArchiveFormat,
150    /// The member type identified by the header.
151    pub kind: UstarKind,
152    /// The size encoded directly in the ustar or GNU member header field.
153    pub declared_size: u64,
154    /// The size after applying applicable pax `size` records.
155    ///
156    /// This is also the number of payload bytes for which data frames will be
157    /// emitted. Member kinds that cannot carry payload are rejected when either
158    /// their declared or effective size is nonzero.
159    pub effective_size: u64,
160    pub(crate) mode: Option<u64>,
161    pub(crate) uid: Option<u64>,
162    pub(crate) gid: Option<u64>,
163    pub(crate) mtime: Option<u64>,
164}
165
166impl HeaderFrame {
167    fn ustar(
168        position: u64,
169        block: Block,
170        typeflag: u8,
171        declared_size: u64,
172        local_records: Option<&PaxRecords>,
173        global_records: Option<&GlobalPaxRecords>,
174        allow_all_nul_numeric_fields: bool,
175    ) -> Result<Self, FrameError> {
176        let kind = UstarKind::try_from_framed(position, typeflag)?;
177
178        // Some real-world pax writers encode absent ordinary-header metadata as
179        // all NULs. The compatibility policy may accept these empty fields; every
180        // populated fallback remains subject to strict validation.
181        let parse_numeric_field = |field, range: Range<usize>| {
182            Self::parse_numeric_field(
183                position,
184                ArchiveFormat::Pax,
185                field,
186                &block[range],
187                allow_all_nul_numeric_fields,
188            )
189        };
190        let mode = parse_numeric_field("mode", MODE_RANGE)?;
191        let uid = parse_numeric_field("uid", UID_RANGE)?;
192        let gid = parse_numeric_field("gid", GID_RANGE)?;
193        let mtime = parse_numeric_field("mtime", MTIME_RANGE)?;
194
195        let validate_string_field = |field: &'static str, bytes: &[u8]| {
196            if bytes.contains(&0) {
197                Ok(())
198            } else {
199                Err(FrameError::at(
200                    position,
201                    FrameErrorInner::UnterminatedUstarStringField { field },
202                ))
203            }
204        };
205        validate_string_field("uname", &block[UNAME_RANGE])?;
206        validate_string_field("gname", &block[GNAME_RANGE])?;
207
208        // POSIX pax deliberately leaves the representation of device numbers unspecified.
209        // We do not consume those fields, so devmajor and devminor remain opaque.
210
211        // Determine our member's actual (i.e. effective) size. This takes any pax `size`
212        // records into account, in addition to the normal header-declared size.
213        let effective_size = PaxState::effective_size(local_records, global_records).map_or(
214            Ok(declared_size),
215            |size| match size {
216                PaxValue::Value(size) => Ok(*size),
217                PaxValue::Deleted => Err(FrameError::deleted_pax_metadata(position, "size")),
218            },
219        )?;
220        validate_pax_member_size(position, kind, declared_size, effective_size)?;
221
222        Ok(Self {
223            position,
224            block,
225            format: ArchiveFormat::Pax,
226            kind,
227            declared_size,
228            effective_size,
229            mode,
230            uid,
231            gid,
232            mtime,
233        })
234    }
235
236    fn gnu(
237        position: u64,
238        block: Block,
239        typeflag: u8,
240        declared_size: u64,
241        require_link_kind: bool,
242        allow_all_nul_numeric_fields: bool,
243    ) -> Result<Self, FrameError> {
244        let kind = UstarKind::try_from_framed(position, typeflag)?;
245        if require_link_kind && !matches!(kind, UstarKind::HardLink | UstarKind::SymbolicLink) {
246            return Err(FrameError::unexpected_order(
247                position,
248                "hard-link or symbolic-link member after GNU long-link extension",
249                "non-link ordinary member",
250            ));
251        }
252        validate_gnu_member_size(position, kind, declared_size)?;
253        let parse_numeric_field = |field, range: Range<usize>| {
254            Self::parse_numeric_field(
255                position,
256                ArchiveFormat::Gnu,
257                field,
258                &block[range],
259                allow_all_nul_numeric_fields,
260            )
261        };
262        let mode = parse_numeric_field("mode", MODE_RANGE)?;
263        let uid = parse_numeric_field("uid", UID_RANGE)?;
264        let gid = parse_numeric_field("gid", GID_RANGE)?;
265        let mtime = parse_numeric_field("mtime", MTIME_RANGE)?;
266
267        Ok(Self {
268            position,
269            block,
270            format: ArchiveFormat::Gnu,
271            kind,
272            declared_size,
273            effective_size: declared_size,
274            mode,
275            uid,
276            gid,
277            mtime,
278        })
279    }
280
281    fn parse_numeric_field(
282        position: u64,
283        format: ArchiveFormat,
284        field: &'static str,
285        bytes: &[u8],
286        allow_all_nul_numeric_fields: bool,
287    ) -> Result<Option<u64>, FrameError> {
288        if allow_all_nul_numeric_fields && is_all_nul(bytes) {
289            return Ok(None);
290        }
291        parse_number(format, bytes).map(Some).ok_or_else(|| {
292            FrameError::at(
293                position,
294                FrameErrorInner::InvalidNumericField {
295                    field,
296                    found: bytes.to_vec(),
297                },
298            )
299        })
300    }
301
302    pub(crate) fn copy_header_path_into(&self, path: &mut Vec<u8>) {
303        path.clear();
304        let name = trim_nul(&self.block[NAME_RANGE]);
305        if self.format == ArchiveFormat::Gnu {
306            path.extend_from_slice(name);
307            return;
308        }
309        let prefix = trim_nul(&self.block[PREFIX_RANGE]);
310        if !prefix.is_empty() {
311            path.extend_from_slice(prefix);
312            path.push(b'/');
313        }
314        path.extend_from_slice(name);
315    }
316}
317
318/// The payload entry to which a data block belongs.
319#[derive(Clone, Copy, Debug, Eq, PartialEq)]
320pub enum DataOwner {
321    /// Payload bytes belonging to a pax extended header.
322    Pax(PaxKind),
323    /// Payload bytes belonging to a GNU metadata extension.
324    Gnu(GnuKind),
325    /// Payload bytes belonging to an ordinary archive member.
326    Member,
327}
328
329/// A payload physical block.
330///
331/// This can be "real" data for e.g. a file member, or it can be the payload of a pax
332/// or GNU header.
333#[derive(Clone, Debug, Eq, PartialEq)]
334pub struct DataFrame {
335    /// The absolute byte position of this block in the source stream.
336    pub position: u64,
337    /// The lossless payload block bytes, including any final padding.
338    pub block: Block,
339    /// The number of meaningful payload bytes in this block.
340    pub len: usize,
341    /// Whether this block carries metadata-extension or member data.
342    pub owner: DataOwner,
343    /// Parsed records completed by this final pax payload block.
344    ///
345    /// This is `Some` only for the last data block belonging to a local or
346    /// global pax header; other payload data carries `None`.
347    completed_pax_records: Option<SharedPaxRecords>,
348}
349
350impl DataFrame {
351    /// Returns parsed records completed by this final pax payload block.
352    ///
353    /// This returns `Some` only for the last data block belonging to a local
354    /// or global pax header.
355    pub fn completed_pax_records(&self) -> Option<&[PaxRecord]> {
356        self.completed_pax_records
357            .as_deref()
358            .map(PaxRecords::as_slice)
359    }
360
361    pub(crate) fn into_completed_pax_records(self) -> Option<SharedPaxRecords> {
362        self.completed_pax_records
363    }
364}
365
366/// The parser phase required before the next physical frame can be emitted.
367#[derive(Debug)]
368pub(super) enum State {
369    /// No payload is pending; accept a header or the first zero end marker.
370    AwaitingHeader,
371    /// Consume the payload blocks declared by a local or global pax header.
372    ReadingPax {
373        kind: PaxKind,
374        header_position: u64,
375        remaining: u64,
376        payload: Vec<u8>,
377    },
378    /// A local pax header has completed; require its ordinary ustar header.
379    AwaitingUstarHeader { records: SharedPaxRecords },
380    /// Consume uninterpreted payload blocks for a GNU `L` or `K` extension.
381    ReadingGnu {
382        kind: GnuKind,
383        remaining: u64,
384        pending: PendingGnu,
385    },
386    /// GNU metadata is pending; accept another distinct extension or its member.
387    AwaitingGnuMember { pending: PendingGnu },
388    /// Consume the payload blocks declared for an ordinary member.
389    ReadingMember { remaining: u64 },
390    /// The first zero end marker was read; require the second zero block.
391    AwaitingSecondZero,
392    /// A valid two-block end marker was consumed; no further input is examined.
393    Complete,
394    /// An error has been emitted; subsequent polls return end-of-stream.
395    Failed,
396}
397
398#[derive(Clone, Copy, Debug, Default)]
399pub(super) struct PendingGnu {
400    pub(super) long_name: bool,
401    pub(super) long_link: bool,
402}
403
404/// Ordinary-member chunk storage retained across cancellation and API changes.
405#[derive(Default)]
406struct MemberChunk {
407    buffer: Vec<u8>,
408    start_position: u64,
409    physical_len: usize,
410    meaningful_len: usize,
411    state: Option<MemberChunkState>,
412}
413
414#[derive(Clone, Copy)]
415enum MemberChunkState {
416    Reading {
417        member_remaining: u64,
418        filled: usize,
419    },
420    Ready {
421        delivered: usize,
422    },
423}
424
425/// A strict stream of POSIX-pax or GNU frames sourced from an underlying reader.
426pub struct TarStream<R> {
427    /// Our current stream position.
428    pub(super) position: u64,
429    /// Our interior source.
430    pub(super) inner: R,
431    pub(super) block: Block,
432    pub(super) block_len: usize,
433    pub(super) format: Option<ArchiveFormat>,
434    /// The currently effective global pax records, if any.
435    pub(super) global_pax_records: Option<GlobalPaxRecords>,
436    max_pax_extension_size: u64,
437    max_global_pax_extensions_size: u64,
438    global_pax_extensions_size: u64,
439    allow_all_nul_numeric_fields: bool,
440    max_gnu_extension_size: u64,
441    member_chunk: MemberChunk,
442    pub(super) state: State,
443}
444
445impl<R> TarStream<R> {
446    /// Creates a new [`TarStream`] from the given reader.
447    pub fn new(reader: R) -> Self {
448        Self {
449            position: 0,
450            inner: reader,
451            block: [0; BLOCK_SIZE],
452            block_len: 0,
453            format: None,
454            global_pax_records: None,
455            max_pax_extension_size: DEFAULT_MAX_PAX_EXTENSION_SIZE,
456            max_global_pax_extensions_size: DEFAULT_MAX_GLOBAL_PAX_EXTENSIONS_SIZE,
457            global_pax_extensions_size: 0,
458            allow_all_nul_numeric_fields: true,
459            max_gnu_extension_size: DEFAULT_MAX_GNU_EXTENSION_SIZE,
460            member_chunk: MemberChunk::default(),
461            state: State::AwaitingHeader,
462        }
463    }
464
465    /// Sets the maximum size accepted for each subsequent pax extension.
466    ///
467    /// A local or global header that declares a larger payload is rejected
468    /// before its payload is consumed. Setting the maximum to zero rejects
469    /// every nonempty extension. Setting it to [`u64::MAX`] removes the
470    /// per-extension bound; global extensions remain subject to their
471    /// cumulative limit.
472    pub fn set_max_pax_extension_size(&mut self, max_pax_extension_size: u64) {
473        self.max_pax_extension_size = max_pax_extension_size;
474    }
475
476    /// Sets the maximum cumulative size accepted for global pax extensions
477    /// before one ordinary member.
478    ///
479    /// The total resets after each ordinary member. A global header that would
480    /// increase the pending total beyond this limit is rejected before its
481    /// payload is consumed. Setting the maximum to zero rejects every nonempty
482    /// global extension. Setting it to [`u64::MAX`] removes the cumulative
483    /// bound; each extension remains subject to its individual limit.
484    pub fn set_max_global_pax_extensions_size(&mut self, max_global_pax_extensions_size: u64) {
485        self.max_global_pax_extensions_size = max_global_pax_extensions_size;
486    }
487
488    /// Sets whether wholly NUL numeric metadata fields may be accepted.
489    ///
490    /// This compatibility option applies to `mode`, `uid`, `gid`, and `mtime`
491    /// in both pax/ustar and GNU ordinary member headers. It is enabled by
492    /// default. Disabling it requires each field to use a valid numeric encoding
493    /// for its archive family.
494    pub fn set_allow_all_nul_numeric_fields(&mut self, allow: bool) {
495        self.allow_all_nul_numeric_fields = allow;
496    }
497
498    /// Sets the maximum size accepted for each GNU extension.
499    ///
500    /// A GNU extension member that declares a larger payload is rejected before
501    /// its payload is consumed. Setting the maximum to zero rejects every nonempty
502    /// GNU extension member. Setting it to [`u64::MAX`] removes the per-extension bound.
503    pub fn set_max_gnu_extension_size(&mut self, max_gnu_extension_size: u64) {
504        self.max_gnu_extension_size = max_gnu_extension_size;
505    }
506
507    /// Returns the selected archive family after the first header is read.
508    pub fn format(&self) -> Option<ArchiveFormat> {
509        self.format
510    }
511}
512
513impl<R: AsyncRead + Unpin> TarStream<R> {
514    /// Reads one ordinary-member payload block without constructing a [`Frame`].
515    ///
516    /// Returns the block's position, lossless bytes, and meaningful length.
517    pub(crate) async fn read_member_block(&mut self) -> Result<(u64, Block, usize), FrameError> {
518        if self.member_chunk.state.is_some() {
519            self.complete_member_chunk().await?;
520            return self.take_member_block_from_chunk();
521        }
522        let remaining = match &self.state {
523            State::ReadingMember { remaining } => *remaining,
524            _ => {
525                self.state = State::Failed;
526                return Err(FrameError::unexpected_order(
527                    self.position,
528                    "ordinary member payload",
529                    "parser state without member payload",
530                ));
531            }
532        };
533        let (position, block) = match poll_fn(|context| self.poll_read_block(context)).await {
534            Ok(Some(block)) => block,
535            Ok(None) => {
536                let error = self.handle_eof();
537                self.state = State::Failed;
538                return Err(error);
539            }
540            Err(error) => {
541                self.state = State::Failed;
542                return Err(error);
543            }
544        };
545        let meaningful_len = remaining.min(BLOCK_SIZE as u64) as usize;
546        self.state = member_payload_state(remaining - meaningful_len as u64);
547        Ok((position, block, meaningful_len))
548    }
549
550    /// Reads aligned ordinary-member payload blocks directly into `buffer`.
551    ///
552    /// This internal path preserves exact physical-block completion checks
553    /// while avoiding lossless [`Frame`] construction for chunk consumers.
554    pub(crate) async fn read_member_chunk(
555        &mut self,
556        buffer: &mut Vec<u8>,
557        target_len: usize,
558    ) -> Result<usize, FrameError> {
559        // A cancelled block read retains its partial physical block here. Finish
560        // and deliver it before starting a direct chunk so no bytes are lost.
561        if self.member_chunk.state.is_none() && self.block_len != 0 {
562            let (_, block, meaningful_len) = self.read_member_block().await?;
563            buffer.clear();
564            buffer.extend_from_slice(&block[..meaningful_len]);
565            return Ok(meaningful_len);
566        }
567        if self.member_chunk.state.is_none() {
568            self.start_member_chunk(buffer, target_len)?;
569        }
570        self.complete_member_chunk().await?;
571        self.take_member_chunk(buffer)
572    }
573
574    fn start_member_chunk(
575        &mut self,
576        buffer: &mut Vec<u8>,
577        target_len: usize,
578    ) -> Result<(), FrameError> {
579        let member_remaining = match &self.state {
580            State::ReadingMember { remaining } => *remaining,
581            _ => {
582                self.state = State::Failed;
583                return Err(FrameError::unexpected_order(
584                    self.position,
585                    "ordinary member payload",
586                    "parser state without member payload",
587                ));
588            }
589        };
590        if self.block_len != 0 {
591            self.state = State::Failed;
592            return Err(FrameError::unexpected_order(
593                self.position,
594                "aligned ordinary member payload",
595                "partially buffered physical block",
596            ));
597        }
598
599        let target_len = u64::try_from(target_len.max(BLOCK_SIZE)).map_err(|_| {
600            FrameError::arithmetic_overflow(self.position, "member payload chunk target length")
601        })?;
602        let physical_len = member_remaining
603            .min(target_len)
604            .div_ceil(BLOCK_SIZE as u64)
605            .checked_mul(BLOCK_SIZE as u64)
606            .ok_or_else(|| {
607                FrameError::arithmetic_overflow(
608                    self.position,
609                    "member payload chunk physical length",
610                )
611            })?;
612        let meaningful_len = member_remaining.min(physical_len);
613        let physical_len = usize::try_from(physical_len).map_err(|_| {
614            FrameError::arithmetic_overflow(self.position, "member payload chunk physical length")
615        })?;
616        let meaningful_len = usize::try_from(meaningful_len).map_err(|_| {
617            FrameError::arithmetic_overflow(self.position, "member payload chunk meaningful length")
618        })?;
619
620        // Move the caller's reusable allocation into persistent storage before
621        // reading so cancellation cannot discard partial bytes or progress.
622        self.member_chunk.buffer.clear();
623        std::mem::swap(buffer, &mut self.member_chunk.buffer);
624        if self.member_chunk.buffer.len() != physical_len {
625            self.member_chunk.buffer.resize(physical_len, 0);
626        }
627        self.member_chunk.start_position = self.position;
628        self.member_chunk.physical_len = physical_len;
629        self.member_chunk.meaningful_len = meaningful_len;
630        self.member_chunk.state = Some(MemberChunkState::Reading {
631            member_remaining,
632            filled: 0,
633        });
634        Ok(())
635    }
636
637    async fn complete_member_chunk(&mut self) -> Result<(), FrameError> {
638        loop {
639            let (member_remaining, filled) = match self.member_chunk.state {
640                Some(MemberChunkState::Reading {
641                    member_remaining,
642                    filled,
643                }) => (member_remaining, filled),
644                Some(MemberChunkState::Ready { .. }) => return Ok(()),
645                None => {
646                    self.state = State::Failed;
647                    return Err(FrameError::unexpected_order(
648                        self.position,
649                        "pending member payload chunk",
650                        "parser state without a pending chunk",
651                    ));
652                }
653            };
654            let start_position = self.member_chunk.start_position;
655            let physical_len = self.member_chunk.physical_len;
656            let meaningful_len = self.member_chunk.meaningful_len;
657            if filled == physical_len {
658                self.position =
659                    checked_position(start_position, physical_len).inspect_err(|_| {
660                        self.state = State::Failed;
661                        self.member_chunk.state = None;
662                    })?;
663                let remaining = member_remaining
664                    .checked_sub(meaningful_len as u64)
665                    .ok_or_else(|| {
666                        self.state = State::Failed;
667                        self.member_chunk.state = None;
668                        FrameError::arithmetic_overflow(
669                            start_position,
670                            "remaining member payload length",
671                        )
672                    })?;
673                self.state = member_payload_state(remaining);
674                self.member_chunk.state = Some(MemberChunkState::Ready { delivered: 0 });
675                return Ok(());
676            }
677
678            let read = match poll_fn(|context| {
679                let mut read_buffer =
680                    ReadBuf::new(&mut self.member_chunk.buffer[filled..physical_len]);
681                match Pin::new(&mut self.inner).poll_read(context, &mut read_buffer) {
682                    Poll::Pending => Poll::Pending,
683                    Poll::Ready(Ok(())) => Poll::Ready(Ok(read_buffer.filled().len())),
684                    Poll::Ready(Err(source)) => Poll::Ready(Err(source)),
685                }
686            })
687            .await
688            {
689                Ok(read) => read,
690                Err(source) => {
691                    self.state = State::Failed;
692                    self.member_chunk.state = None;
693                    let error_position = checked_position(start_position, filled)?;
694                    self.position = checked_position(start_position, filled - filled % BLOCK_SIZE)?;
695                    return Err(FrameError::at(
696                        error_position,
697                        FrameErrorInner::Io { source },
698                    ));
699                }
700            };
701            if read == 0 {
702                self.state = State::Failed;
703                self.member_chunk.state = None;
704                let partial_len = filled % BLOCK_SIZE;
705                let completed_len = filled - partial_len;
706                self.position = checked_position(start_position, completed_len)?;
707                if partial_len != 0 {
708                    return Err(FrameError::at(
709                        self.position,
710                        FrameErrorInner::IncompleteBlock { read: partial_len },
711                    ));
712                }
713                let completed_len = u64::try_from(completed_len).map_err(|_| {
714                    FrameError::arithmetic_overflow(
715                        self.position,
716                        "completed member payload chunk length",
717                    )
718                })?;
719                return Err(FrameError::truncated_payload(
720                    self.position,
721                    DataOwner::Member,
722                    member_remaining - member_remaining.min(completed_len),
723                ));
724            }
725            if let Some(MemberChunkState::Reading { filled, .. }) = &mut self.member_chunk.state {
726                *filled += read;
727            }
728        }
729    }
730
731    fn take_member_chunk(&mut self, buffer: &mut Vec<u8>) -> Result<usize, FrameError> {
732        let Some(MemberChunkState::Ready { delivered }) = self.member_chunk.state.take() else {
733            self.state = State::Failed;
734            return Err(FrameError::unexpected_order(
735                self.position,
736                "completed member payload chunk",
737                "incomplete member payload chunk",
738            ));
739        };
740        let meaningful_len = self.member_chunk.meaningful_len;
741        let remaining_len = meaningful_len.checked_sub(delivered).ok_or_else(|| {
742            self.state = State::Failed;
743            FrameError::arithmetic_overflow(self.position, "undelivered member payload length")
744        })?;
745        if delivered != 0 {
746            self.member_chunk
747                .buffer
748                .copy_within(delivered..meaningful_len, 0);
749        }
750        self.member_chunk.buffer.truncate(remaining_len);
751        std::mem::swap(buffer, &mut self.member_chunk.buffer);
752        Ok(remaining_len)
753    }
754
755    fn take_member_block_from_chunk(&mut self) -> Result<(u64, Block, usize), FrameError> {
756        let Some(MemberChunkState::Ready { delivered }) = self.member_chunk.state else {
757            self.state = State::Failed;
758            return Err(FrameError::unexpected_order(
759                self.position,
760                "completed member payload chunk",
761                "incomplete member payload chunk",
762            ));
763        };
764        let start_position = self.member_chunk.start_position;
765        let physical_len = self.member_chunk.physical_len;
766        let total_meaningful_len = self.member_chunk.meaningful_len;
767        let position = checked_position(start_position, delivered).inspect_err(|_| {
768            self.state = State::Failed;
769            self.member_chunk.state = None;
770        })?;
771        let mut block = [0; BLOCK_SIZE];
772        block.copy_from_slice(&self.member_chunk.buffer[delivered..delivered + BLOCK_SIZE]);
773        let meaningful_len = total_meaningful_len
774            .checked_sub(delivered)
775            .ok_or_else(|| {
776                self.state = State::Failed;
777                self.member_chunk.state = None;
778                FrameError::arithmetic_overflow(self.position, "undelivered member payload length")
779            })?
780            .min(BLOCK_SIZE);
781        let delivered = delivered + BLOCK_SIZE;
782        if delivered == physical_len {
783            self.member_chunk.state = None;
784        } else {
785            self.member_chunk.state = Some(MemberChunkState::Ready { delivered });
786        }
787        Ok((position, block, meaningful_len))
788    }
789
790    fn poll_read_block(
791        &mut self,
792        cx: &mut Context<'_>,
793    ) -> Poll<Result<Option<PositionedBlock>, FrameError>> {
794        while self.block_len < BLOCK_SIZE {
795            let mut read_buf = ReadBuf::new(&mut self.block[self.block_len..]);
796            match Pin::new(&mut self.inner).poll_read(cx, &mut read_buf) {
797                Poll::Pending => return Poll::Pending,
798                Poll::Ready(Err(source)) => {
799                    return Poll::Ready(Err(FrameError::at(
800                        self.position + self.block_len as u64,
801                        FrameErrorInner::Io { source },
802                    )));
803                }
804                Poll::Ready(Ok(())) => {
805                    let read = read_buf.filled().len();
806                    if read == 0 {
807                        if self.block_len == 0 {
808                            return Poll::Ready(Ok(None));
809                        }
810                        return Poll::Ready(Err(FrameError::at(
811                            self.position,
812                            FrameErrorInner::IncompleteBlock {
813                                read: self.block_len,
814                            },
815                        )));
816                    }
817                    self.block_len += read;
818                }
819            }
820        }
821
822        let position = self.position;
823        self.position = self
824            .position
825            .checked_add(BLOCK_SIZE as u64)
826            .ok_or_else(|| FrameError::arithmetic_overflow(position, "stream position"))?;
827        self.block_len = 0;
828        let block = std::mem::replace(&mut self.block, [0; BLOCK_SIZE]);
829        Poll::Ready(Ok(Some((position, block))))
830    }
831
832    fn handle_eof(&mut self) -> FrameError {
833        let inner = match &self.state {
834            State::AwaitingHeader | State::AwaitingSecondZero => FrameErrorInner::MissingEndMarker,
835            State::ReadingPax {
836                kind, remaining, ..
837            } => FrameErrorInner::TruncatedPayload {
838                owner: DataOwner::Pax(*kind),
839                remaining: *remaining,
840            },
841            State::AwaitingUstarHeader { .. } => FrameErrorInner::UnexpectedEof {
842                expected: "ordinary ustar member header after a local pax header",
843            },
844            State::ReadingGnu {
845                kind, remaining, ..
846            } => FrameErrorInner::TruncatedPayload {
847                owner: DataOwner::Gnu(*kind),
848                remaining: *remaining,
849            },
850            State::AwaitingGnuMember { .. } => FrameErrorInner::UnexpectedEof {
851                expected: "ordinary GNU member header after a GNU metadata extension",
852            },
853            State::ReadingMember { remaining } => FrameErrorInner::TruncatedPayload {
854                owner: DataOwner::Member,
855                remaining: *remaining,
856            },
857            State::Complete | State::Failed => FrameErrorInner::UnexpectedEof {
858                expected: "no further input",
859            },
860        };
861        FrameError::at(self.position, inner)
862    }
863
864    fn process_block(&mut self, position: u64, block: Block) -> Result<Option<Frame>, FrameError> {
865        let state = std::mem::replace(&mut self.state, State::Failed);
866        match state {
867            State::AwaitingHeader => {
868                if is_zero_block(&block) {
869                    self.state = State::AwaitingSecondZero;
870                    Ok(None)
871                } else {
872                    self.process_boundary_header(position, block).map(Some)
873                }
874            }
875            State::ReadingPax {
876                kind,
877                header_position,
878                mut remaining,
879                mut payload,
880            } => {
881                let len = remaining.min(BLOCK_SIZE as u64) as usize;
882                payload.extend_from_slice(&block[..len]);
883                remaining -= len as u64;
884                let completed_pax_records = if remaining == 0 {
885                    let records = Arc::new(
886                        PaxRecords::parse(
887                            &payload,
888                            self.global_pax_records
889                                .as_ref()
890                                .map_or(HdrCharset::Utf8, GlobalPaxRecords::hdrcharset),
891                        )
892                        .map_err(|source| {
893                            FrameError::invalid_pax_record(header_position, source)
894                        })?,
895                    );
896                    match kind {
897                        PaxKind::Local => {
898                            self.state = State::AwaitingUstarHeader {
899                                records: records.clone(),
900                            };
901                        }
902                        PaxKind::Global => {
903                            records.apply_global(&mut self.global_pax_records);
904                            self.state = State::AwaitingHeader;
905                        }
906                    }
907                    Some(records)
908                } else {
909                    self.state = State::ReadingPax {
910                        kind,
911                        header_position,
912                        remaining,
913                        payload,
914                    };
915                    None
916                };
917                Ok(Some(Frame::Data(DataFrame {
918                    position,
919                    block,
920                    len,
921                    owner: DataOwner::Pax(kind),
922                    completed_pax_records,
923                })))
924            }
925            State::AwaitingUstarHeader { records } => {
926                if is_zero_block(&block) {
927                    return Err(FrameError::unexpected_order(
928                        position,
929                        "ordinary ustar member header after a local pax header",
930                        "end-of-archive marker",
931                    ));
932                }
933                let parsed = self.parse_format_checked_header(position, &block)?;
934                if matches!(parsed.typeflag, b'x' | b'g') {
935                    return Err(FrameError::unexpected_order(
936                        position,
937                        "ordinary ustar member header after a local pax header",
938                        "another pax extended header",
939                    ));
940                }
941                self.process_ustar_header(position, block, parsed, Some(records))
942                    .map(Some)
943            }
944            State::ReadingGnu {
945                kind,
946                mut remaining,
947                pending,
948            } => {
949                let len = remaining.min(BLOCK_SIZE as u64) as usize;
950                remaining -= len as u64;
951                if remaining == 0 {
952                    self.state = State::AwaitingGnuMember { pending };
953                } else {
954                    self.state = State::ReadingGnu {
955                        kind,
956                        remaining,
957                        pending,
958                    };
959                }
960                Ok(Some(Frame::Data(DataFrame {
961                    position,
962                    block,
963                    len,
964                    owner: DataOwner::Gnu(kind),
965                    completed_pax_records: None,
966                })))
967            }
968            State::AwaitingGnuMember { pending } => {
969                if is_zero_block(&block) {
970                    return Err(FrameError::unexpected_order(
971                        position,
972                        "ordinary GNU member header after a GNU metadata extension",
973                        "end-of-archive marker",
974                    ));
975                }
976                let parsed = self.parse_format_checked_header(position, &block)?;
977                self.process_gnu_header(position, block, parsed, pending)
978                    .map(Some)
979            }
980            State::ReadingMember { mut remaining } => {
981                let len = remaining.min(BLOCK_SIZE as u64) as usize;
982                remaining -= len as u64;
983                self.state = member_payload_state(remaining);
984                Ok(Some(Frame::Data(DataFrame {
985                    position,
986                    block,
987                    len,
988                    owner: DataOwner::Member,
989                    completed_pax_records: None,
990                })))
991            }
992            State::AwaitingSecondZero => {
993                if !is_zero_block(&block) {
994                    return Err(FrameError::at(position, FrameErrorInner::InvalidEndMarker));
995                }
996                self.state = State::Complete;
997                Ok(None)
998            }
999            State::Complete => {
1000                self.state = State::Complete;
1001                Ok(None)
1002            }
1003            State::Failed => Ok(None),
1004        }
1005    }
1006
1007    fn process_boundary_header(
1008        &mut self,
1009        position: u64,
1010        block: Block,
1011    ) -> Result<Frame, FrameError> {
1012        let parsed = self.parse_format_checked_header(position, &block)?;
1013        match parsed.format {
1014            ArchiveFormat::Pax => self.process_pax_boundary_header(position, block, parsed),
1015            ArchiveFormat::Gnu => {
1016                self.process_gnu_header(position, block, parsed, PendingGnu::default())
1017            }
1018        }
1019    }
1020
1021    /// Parses a header and enforces the archive's single selected format.
1022    ///
1023    /// The first non-terminator header selects the format; later headers must
1024    /// decode as valid headers of that same family.
1025    fn parse_format_checked_header(
1026        &mut self,
1027        position: u64,
1028        block: &Block,
1029    ) -> Result<ParsedHeader, FrameError> {
1030        let parsed = ParsedHeader::try_from_framed(position, block)?;
1031        if let Some(expected) = self.format
1032            && parsed.format != expected
1033        {
1034            return Err(FrameError::at(
1035                position,
1036                FrameErrorInner::FormatMismatch {
1037                    expected,
1038                    found: parsed.format,
1039                },
1040            ));
1041        }
1042        self.format.get_or_insert(parsed.format);
1043        Ok(parsed)
1044    }
1045
1046    /// Processes a pax/ustar header at an archive-member boundary, where a new
1047    /// pax extension or an ordinary ustar member may begin.
1048    ///
1049    /// Pax extension headers enter [`State::ReadingPax`]; ordinary ustar
1050    /// headers are delegated to [`Self::process_ustar_header`].
1051    fn process_pax_boundary_header(
1052        &mut self,
1053        position: u64,
1054        block: Block,
1055        parsed: ParsedHeader,
1056    ) -> Result<Frame, FrameError> {
1057        match parsed.typeflag {
1058            b'x' => self.process_pax_header(position, block, parsed.size, PaxKind::Local),
1059            b'g' => self.process_pax_header(position, block, parsed.size, PaxKind::Global),
1060            _ => self.process_ustar_header(position, block, parsed, None),
1061        }
1062    }
1063
1064    /// Emits a pax extension header and enters its payload-reading state.
1065    ///
1066    /// This is reached only from the POSIX boundary state, before any local
1067    /// pax records require an ordinary member header.
1068    fn process_pax_header(
1069        &mut self,
1070        position: u64,
1071        block: Block,
1072        payload_size: u64,
1073        kind: PaxKind,
1074    ) -> Result<Frame, FrameError> {
1075        if payload_size > self.max_pax_extension_size {
1076            return Err(FrameError::at(
1077                position,
1078                FrameErrorInner::ExtensionTooLarge {
1079                    format: ArchiveFormat::Pax,
1080                    size: payload_size,
1081                    limit: self.max_pax_extension_size,
1082                },
1083            ));
1084        }
1085        if kind == PaxKind::Global {
1086            let size = self
1087                .global_pax_extensions_size
1088                .checked_add(payload_size)
1089                .ok_or_else(|| {
1090                    FrameError::arithmetic_overflow(position, "global pax extension payload total")
1091                })?;
1092            if size > self.max_global_pax_extensions_size {
1093                return Err(FrameError::at(
1094                    position,
1095                    FrameErrorInner::GlobalPaxExtensionsTooLarge {
1096                        size,
1097                        limit: self.max_global_pax_extensions_size,
1098                    },
1099                ));
1100            }
1101            self.global_pax_extensions_size = size;
1102        }
1103        if payload_size == 0 {
1104            return Err(FrameError::invalid_pax_record(
1105                position,
1106                PaxError::InvalidRecords {
1107                    reason: "extended header payload contains no records",
1108                },
1109            ));
1110        }
1111        self.state = State::ReadingPax {
1112            kind,
1113            header_position: position,
1114            remaining: payload_size,
1115            payload: Vec::new(),
1116        };
1117        Ok(Frame::Pax(PaxFrame {
1118            position,
1119            block,
1120            kind,
1121            payload_size,
1122        }))
1123    }
1124
1125    /// Emits an ordinary ustar member header after applying pax size state.
1126    ///
1127    /// This handles both bare members and members required by
1128    /// [`State::AwaitingUstarHeader`], then enters member data reading when
1129    /// the effective member size requires payload blocks.
1130    fn process_ustar_header(
1131        &mut self,
1132        position: u64,
1133        block: Block,
1134        parsed: ParsedHeader,
1135        local_pax_records: Option<SharedPaxRecords>,
1136    ) -> Result<Frame, FrameError> {
1137        let frame = HeaderFrame::ustar(
1138            position,
1139            block,
1140            parsed.typeflag,
1141            parsed.size,
1142            local_pax_records.as_deref(),
1143            self.global_pax_records.as_ref(),
1144            self.allow_all_nul_numeric_fields,
1145        )?;
1146        self.global_pax_extensions_size = 0;
1147        self.state = member_payload_state(frame.effective_size);
1148        Ok(Frame::Header(frame))
1149    }
1150
1151    fn process_gnu_header(
1152        &mut self,
1153        position: u64,
1154        block: Block,
1155        parsed: ParsedHeader,
1156        mut pending: PendingGnu,
1157    ) -> Result<Frame, FrameError> {
1158        let extension = match parsed.typeflag {
1159            b'L' => Some(GnuKind::LongName),
1160            b'K' => Some(GnuKind::LongLink),
1161            _ => None,
1162        };
1163        if let Some(kind) = extension {
1164            let already_seen = match kind {
1165                GnuKind::LongName => &mut pending.long_name,
1166                GnuKind::LongLink => &mut pending.long_link,
1167            };
1168            if *already_seen {
1169                return Err(FrameError::unexpected_order(
1170                    position,
1171                    "ordinary GNU member header or the other GNU metadata extension",
1172                    "duplicate GNU metadata extension",
1173                ));
1174            }
1175            if parsed.size > self.max_gnu_extension_size {
1176                return Err(FrameError::at(
1177                    position,
1178                    FrameErrorInner::ExtensionTooLarge {
1179                        format: ArchiveFormat::Gnu,
1180                        size: parsed.size,
1181                        limit: self.max_gnu_extension_size,
1182                    },
1183                ));
1184            }
1185            *already_seen = true;
1186            self.state = if parsed.size == 0 {
1187                State::AwaitingGnuMember { pending }
1188            } else {
1189                State::ReadingGnu {
1190                    kind,
1191                    remaining: parsed.size,
1192                    pending,
1193                }
1194            };
1195            return Ok(Frame::Gnu(GnuFrame {
1196                position,
1197                block,
1198                kind,
1199                payload_size: parsed.size,
1200            }));
1201        }
1202
1203        let frame = HeaderFrame::gnu(
1204            position,
1205            block,
1206            parsed.typeflag,
1207            parsed.size,
1208            pending.long_link,
1209            self.allow_all_nul_numeric_fields,
1210        )?;
1211        self.state = member_payload_state(frame.effective_size);
1212        Ok(Frame::Header(frame))
1213    }
1214}
1215
1216impl<R: AsyncRead + Unpin> Stream for TarStream<R> {
1217    type Item = Result<Frame, FrameError>;
1218
1219    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1220        let this = self.get_mut();
1221        loop {
1222            if matches!(this.state, State::Complete | State::Failed) {
1223                return Poll::Ready(None);
1224            }
1225
1226            let (position, block) = match this.poll_read_block(cx) {
1227                Poll::Pending => return Poll::Pending,
1228                Poll::Ready(Ok(Some(block))) => block,
1229                Poll::Ready(Ok(None)) => {
1230                    let error = this.handle_eof();
1231                    this.state = State::Failed;
1232                    return Poll::Ready(Some(Err(error)));
1233                }
1234                Poll::Ready(Err(error)) => {
1235                    this.state = State::Failed;
1236                    return Poll::Ready(Some(Err(error)));
1237                }
1238            };
1239
1240            match this.process_block(position, block) {
1241                Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))),
1242                Ok(None) => continue,
1243                Err(error) => {
1244                    this.state = State::Failed;
1245                    return Poll::Ready(Some(Err(error)));
1246                }
1247            }
1248        }
1249    }
1250}
1251
1252struct ParsedHeader {
1253    format: ArchiveFormat,
1254    typeflag: u8,
1255    size: u64,
1256}
1257
1258/// Converts raw tar input into a typed value while retaining source position
1259/// for any framing error produced by the conversion.
1260trait TryFromFramed<T>: Sized {
1261    fn try_from_framed(position: u64, source: T) -> Result<Self, FrameError>;
1262}
1263
1264fn is_zero_block(block: &Block) -> bool {
1265    block.iter().all(|byte| *byte == 0)
1266}
1267
1268fn trim_nul(bytes: &[u8]) -> &[u8] {
1269    let end = bytes
1270        .iter()
1271        .position(|byte| *byte == 0)
1272        .unwrap_or(bytes.len());
1273    &bytes[..end]
1274}
1275
1276fn member_payload_state(remaining: u64) -> State {
1277    if remaining == 0 {
1278        State::AwaitingHeader
1279    } else {
1280        State::ReadingMember { remaining }
1281    }
1282}
1283
1284fn checked_position(position: u64, len: usize) -> Result<u64, FrameError> {
1285    let len = u64::try_from(len)
1286        .map_err(|_| FrameError::arithmetic_overflow(position, "stream position"))?;
1287    position
1288        .checked_add(len)
1289        .ok_or_else(|| FrameError::arithmetic_overflow(position, "stream position"))
1290}
1291
1292impl TryFromFramed<&Block> for ParsedHeader {
1293    fn try_from_framed(position: u64, block: &Block) -> Result<Self, FrameError> {
1294        let format = match &block[IDENTITY_RANGE] {
1295            identity if identity == USTAR_IDENTITY => ArchiveFormat::Pax,
1296            identity if identity == GNU_IDENTITY => ArchiveFormat::Gnu,
1297            identity => {
1298                return Err(FrameError::at(
1299                    position,
1300                    FrameErrorInner::InvalidIdentity {
1301                        found: identity.try_into().expect("fixed header range"),
1302                    },
1303                ));
1304            }
1305        };
1306
1307        let actual_checksum = checksum(block);
1308        let expected_checksum = parse_octal(&block[CHECKSUM_RANGE]);
1309        if expected_checksum != Some(actual_checksum) {
1310            return Err(FrameError::at(
1311                position,
1312                FrameErrorInner::InvalidChecksum {
1313                    expected: expected_checksum,
1314                    actual: actual_checksum,
1315                },
1316            ));
1317        }
1318
1319        let size_bytes: [u8; 12] = block[SIZE_RANGE].try_into().expect("fixed header range");
1320        let size = parse_number(format, &size_bytes).ok_or_else(|| {
1321            FrameError::at(position, FrameErrorInner::InvalidSize { found: size_bytes })
1322        })?;
1323
1324        Ok(Self {
1325            format,
1326            typeflag: block[TYPEFLAG_OFFSET],
1327            size,
1328        })
1329    }
1330}
1331
1332impl TryFromFramed<u8> for UstarKind {
1333    fn try_from_framed(position: u64, typeflag: u8) -> Result<Self, FrameError> {
1334        match typeflag {
1335            0 | b'0' => Ok(Self::Regular),
1336            b'1' => Ok(Self::HardLink),
1337            b'2' => Ok(Self::SymbolicLink),
1338            b'3' => Ok(Self::CharacterDevice),
1339            b'4' => Ok(Self::BlockDevice),
1340            b'5' => Ok(Self::Directory),
1341            b'6' => Ok(Self::Fifo),
1342            b'7' => Ok(Self::Contiguous),
1343            _ => Err(FrameError::at(
1344                position,
1345                FrameErrorInner::UnsupportedTypeflag { typeflag },
1346            )),
1347        }
1348    }
1349}
1350
1351fn validate_pax_member_size(
1352    position: u64,
1353    kind: UstarKind,
1354    declared_size: u64,
1355    effective_size: u64,
1356) -> Result<(), FrameError> {
1357    match kind {
1358        // PAX permits a nonzero physical hardlink size and allows pax `size`
1359        // records to override it, so the effective size controls framing.
1360        // This is a broadening of what ustar allows; ustar requires
1361        // hardlink members to have `size=0`.
1362        UstarKind::Regular | UstarKind::HardLink | UstarKind::Contiguous => Ok(()),
1363        UstarKind::SymbolicLink
1364        | UstarKind::CharacterDevice
1365        | UstarKind::BlockDevice
1366        | UstarKind::Directory
1367        | UstarKind::Fifo => {
1368            // NOTE: Observe that we're strict about directory entries having
1369            // `size=0`, even though ustar/pax says that they may have a nonzero
1370            // size as an allocation hint (which, in turn, does not affect framing).
1371            // We do this to avoid a common differential where some parsers incorrectly
1372            // honor the directory entry's size during framing.
1373            // TODO: Make this configurable? Doing so seems very risky.
1374            validate_payload_free_size(position, kind, declared_size)?;
1375            validate_payload_free_size(position, kind, effective_size)
1376        }
1377    }
1378}
1379
1380fn validate_gnu_member_size(position: u64, kind: UstarKind, size: u64) -> Result<(), FrameError> {
1381    match kind {
1382        UstarKind::Regular | UstarKind::Contiguous => Ok(()),
1383        UstarKind::HardLink
1384        | UstarKind::SymbolicLink
1385        | UstarKind::CharacterDevice
1386        | UstarKind::BlockDevice
1387        | UstarKind::Directory
1388        | UstarKind::Fifo => validate_payload_free_size(position, kind, size),
1389    }
1390}
1391
1392fn validate_payload_free_size(position: u64, kind: UstarKind, size: u64) -> Result<(), FrameError> {
1393    if size == 0 {
1394        Ok(())
1395    } else {
1396        Err(FrameError::at(
1397            position,
1398            FrameErrorInner::InvalidMemberSize { kind, size },
1399        ))
1400    }
1401}
1402
1403#[cfg(test)]
1404mod tests {
1405    use std::{
1406        cell::Cell,
1407        pin::Pin,
1408        rc::Rc,
1409        task::{Context, Poll},
1410    };
1411
1412    use tokio::io::ReadBuf;
1413    use tokio_stream::{Stream, StreamExt};
1414
1415    use super::*;
1416    use crate::{
1417        ArchiveFormat, FrameError, FrameErrorInner, HdrCharset, PaxString, PaxValue,
1418        header::{DEVMAJOR_RANGE, DEVMINOR_RANGE},
1419        test_support::{
1420            ChunkedReader, append_block, append_gnu, append_pax, append_payload, append_terminator,
1421            gnu_base256_header, gnu_header, header, ready, record, set_checksum,
1422        },
1423    };
1424
1425    fn collect(bytes: Vec<u8>, max_chunk: usize) -> Vec<Result<Frame, FrameError>> {
1426        ready(TarStream::new(ChunkedReader::new(bytes, max_chunk)).collect())
1427    }
1428
1429    fn collect_with_max_pax_extension_size(
1430        bytes: Vec<u8>,
1431        max_chunk: usize,
1432        max_pax_extension_size: u64,
1433    ) -> Vec<Result<Frame, FrameError>> {
1434        let mut stream = TarStream::new(ChunkedReader::new(bytes, max_chunk));
1435        stream.set_max_pax_extension_size(max_pax_extension_size);
1436        ready(stream.collect())
1437    }
1438
1439    fn header_frame(frames: &[Result<Frame, FrameError>], index: usize) -> &HeaderFrame {
1440        let Ok(Frame::Header(frame)) = &frames[index] else {
1441            panic!("expected header frame");
1442        };
1443        frame
1444    }
1445
1446    fn data_frame(frames: &[Result<Frame, FrameError>], index: usize) -> &DataFrame {
1447        let Ok(Frame::Data(frame)) = &frames[index] else {
1448            panic!("expected data frame");
1449        };
1450        frame
1451    }
1452
1453    fn last_error(frames: &[Result<Frame, FrameError>]) -> &FrameError {
1454        frames
1455            .last()
1456            .expect("stream should emit an item")
1457            .as_ref()
1458            .expect_err("last item should be an error")
1459    }
1460
1461    fn last_error_inner(frames: &[Result<Frame, FrameError>]) -> &FrameErrorInner {
1462        &last_error(frames).inner
1463    }
1464
1465    struct CountingReader {
1466        bytes: Vec<u8>,
1467        position: usize,
1468        consumed: Rc<Cell<usize>>,
1469    }
1470
1471    impl AsyncRead for CountingReader {
1472        fn poll_read(
1473            mut self: Pin<&mut Self>,
1474            _context: &mut Context<'_>,
1475            buffer: &mut ReadBuf<'_>,
1476        ) -> Poll<std::io::Result<()>> {
1477            let len = buffer
1478                .remaining()
1479                .min(self.bytes.len().saturating_sub(self.position));
1480            let end = self.position + len;
1481            buffer.put_slice(&self.bytes[self.position..end]);
1482            self.position = end;
1483            self.consumed.set(self.consumed.get() + len);
1484            Poll::Ready(Ok(()))
1485        }
1486    }
1487
1488    #[derive(Clone, Copy)]
1489    enum ExpectedHeaderError {
1490        InvalidIdentity,
1491        InvalidChecksum,
1492        InvalidSize,
1493        InvalidNumericField(&'static str),
1494        UnterminatedUstarStringField(&'static str),
1495        UnsupportedTypeflag(u8),
1496    }
1497
1498    impl ExpectedHeaderError {
1499        fn matches(self, error: &FrameErrorInner) -> bool {
1500            match (self, error) {
1501                (Self::InvalidIdentity, FrameErrorInner::InvalidIdentity { .. })
1502                | (Self::InvalidChecksum, FrameErrorInner::InvalidChecksum { .. })
1503                | (Self::InvalidSize, FrameErrorInner::InvalidSize { .. }) => true,
1504                (
1505                    Self::InvalidNumericField(field),
1506                    FrameErrorInner::InvalidNumericField { field: found, .. },
1507                )
1508                | (
1509                    Self::UnterminatedUstarStringField(field),
1510                    FrameErrorInner::UnterminatedUstarStringField { field: found },
1511                ) => field == *found,
1512                (
1513                    Self::UnsupportedTypeflag(typeflag),
1514                    FrameErrorInner::UnsupportedTypeflag { typeflag: found },
1515                ) => typeflag == *found,
1516                _ => false,
1517            }
1518        }
1519    }
1520
1521    fn checksummed_header(mutate: impl FnOnce(&mut Block)) -> Block {
1522        let mut block = header(b'0', 0);
1523        mutate(&mut block);
1524        set_checksum(&mut block);
1525        block
1526    }
1527
1528    fn invalid_header_cases() -> Vec<(&'static str, Block, ExpectedHeaderError)> {
1529        let mut bad_magic = header(b'0', 0);
1530        bad_magic[IDENTITY_RANGE.start] = b'g';
1531        let mut bad_version = header(b'0', 0);
1532        bad_version[IDENTITY_RANGE.end - 2..IDENTITY_RANGE.end].copy_from_slice(b"  ");
1533        let mut bad_checksum = header(b'0', 0);
1534        bad_checksum[0] = b'X';
1535
1536        vec![
1537            ("magic", bad_magic, ExpectedHeaderError::InvalidIdentity),
1538            ("version", bad_version, ExpectedHeaderError::InvalidIdentity),
1539            (
1540                "checksum",
1541                bad_checksum,
1542                ExpectedHeaderError::InvalidChecksum,
1543            ),
1544            (
1545                "octal size",
1546                checksummed_header(|block| {
1547                    block[SIZE_RANGE].copy_from_slice(b"00000000008\0");
1548                }),
1549                ExpectedHeaderError::InvalidSize,
1550            ),
1551            (
1552                "base256 size",
1553                checksummed_header(|block| block[SIZE_RANGE.start] = 0x80),
1554                ExpectedHeaderError::InvalidSize,
1555            ),
1556            (
1557                "octal mode",
1558                checksummed_header(|block| {
1559                    block[MODE_RANGE].copy_from_slice(b"0000080\0");
1560                }),
1561                ExpectedHeaderError::InvalidNumericField("mode"),
1562            ),
1563            (
1564                "uid",
1565                checksummed_header(|block| {
1566                    block[UID_RANGE].copy_from_slice(b"invalid\0");
1567                }),
1568                ExpectedHeaderError::InvalidNumericField("uid"),
1569            ),
1570            (
1571                "gid",
1572                checksummed_header(|block| block[GID_RANGE.start] = b'8'),
1573                ExpectedHeaderError::InvalidNumericField("gid"),
1574            ),
1575            (
1576                "mtime",
1577                checksummed_header(|block| {
1578                    block[MTIME_RANGE].copy_from_slice(b"00000000008\0");
1579                }),
1580                ExpectedHeaderError::InvalidNumericField("mtime"),
1581            ),
1582            (
1583                "uname",
1584                checksummed_header(|block| block[UNAME_RANGE].fill(b'u')),
1585                ExpectedHeaderError::UnterminatedUstarStringField("uname"),
1586            ),
1587            (
1588                "gname",
1589                checksummed_header(|block| block[GNAME_RANGE].fill(b'g')),
1590                ExpectedHeaderError::UnterminatedUstarStringField("gname"),
1591            ),
1592            (
1593                "POSIX typeflag",
1594                header(b'X', 0),
1595                ExpectedHeaderError::UnsupportedTypeflag(b'X'),
1596            ),
1597            (
1598                "GNU typeflag",
1599                header(b'L', 0),
1600                ExpectedHeaderError::UnsupportedTypeflag(b'L'),
1601            ),
1602        ]
1603    }
1604
1605    #[test]
1606    fn frames_bare_member_across_fragmented_reads() {
1607        let mut bytes = Vec::new();
1608        append_block(&mut bytes, &header(b'0', 513));
1609        append_payload(&mut bytes, &[b'a'; BLOCK_SIZE]);
1610        append_payload(&mut bytes, b"b");
1611        append_terminator(&mut bytes);
1612
1613        let frames = collect(bytes, 7);
1614        assert_eq!(frames.len(), 3);
1615        let header = header_frame(&frames, 0);
1616        assert_eq!(header.kind, UstarKind::Regular);
1617        assert_eq!(header.declared_size, 513);
1618        assert_eq!(header.effective_size, 513);
1619        let first = data_frame(&frames, 1);
1620        let last = data_frame(&frames, 2);
1621        assert_eq!(first.len, BLOCK_SIZE);
1622        assert_eq!(last.len, 1);
1623        assert_eq!(last.owner, DataOwner::Member);
1624        assert!(first.completed_pax_records().is_none());
1625        assert!(last.completed_pax_records().is_none());
1626    }
1627
1628    #[test]
1629    fn frames_multiblock_pax_records_and_applies_size_override() {
1630        let mut payload = record("comment", &"x".repeat(BLOCK_SIZE));
1631        payload.extend_from_slice(&record("size", "513"));
1632        assert!(payload.len() > BLOCK_SIZE);
1633
1634        let mut bytes = Vec::new();
1635        append_pax(&mut bytes, b'x', &payload);
1636        append_block(&mut bytes, &header(b'0', 1));
1637        append_payload(&mut bytes, &[b'a'; BLOCK_SIZE]);
1638        append_payload(&mut bytes, b"b");
1639        append_terminator(&mut bytes);
1640
1641        let frames = collect(bytes, 19);
1642        assert_eq!(frames.len(), 6);
1643        let Frame::Pax(pax) = frames[0].as_ref().unwrap() else {
1644            panic!("expected pax header");
1645        };
1646        assert_eq!(pax.kind, PaxKind::Local);
1647        assert_eq!(pax.payload_size, payload.len() as u64);
1648        let first_pax_data = data_frame(&frames, 1);
1649        assert_eq!(first_pax_data.owner, DataOwner::Pax(PaxKind::Local));
1650        assert!(first_pax_data.completed_pax_records().is_none());
1651        let final_pax_data = data_frame(&frames, 2);
1652        assert_eq!(final_pax_data.owner, DataOwner::Pax(PaxKind::Local));
1653        assert_eq!(
1654            final_pax_data
1655                .completed_pax_records()
1656                .and_then(|records| records.last()),
1657            Some(&PaxRecord::Size(PaxValue::Value(513)))
1658        );
1659        let header = header_frame(&frames, 3);
1660        assert_eq!(header.declared_size, 1);
1661        assert_eq!(header.effective_size, 513);
1662        let last = data_frame(&frames, 5);
1663        assert_eq!(last.len, 1);
1664    }
1665
1666    #[test]
1667    fn rejects_oversized_pax_extensions_before_consuming_payload() {
1668        let mut payload = record("comment", "metadata");
1669        payload.extend_from_slice(&record("mtime", "1"));
1670        let declared_size = u64::try_from(payload.len()).expect("payload size should fit u64");
1671        for (case, typeflag) in [("local", b'x'), ("global", b'g')] {
1672            let mut bytes = Vec::new();
1673            append_pax(&mut bytes, typeflag, &payload);
1674            let frames = collect_with_max_pax_extension_size(bytes, BLOCK_SIZE, declared_size - 1);
1675            assert_eq!(frames.len(), 1, "{case}");
1676            assert!(matches!(
1677                last_error(&frames),
1678                FrameError {
1679                    position: 0,
1680                    inner: FrameErrorInner::ExtensionTooLarge {
1681                        format: ArchiveFormat::Pax,
1682                        size,
1683                        limit,
1684                    },
1685                } if *size == declared_size && *limit == declared_size - 1
1686            ));
1687        }
1688
1689        let frames = collect(
1690            header(b'x', DEFAULT_MAX_PAX_EXTENSION_SIZE + 1).to_vec(),
1691            BLOCK_SIZE,
1692        );
1693        assert_eq!(frames.len(), 1);
1694        assert!(matches!(
1695            last_error(&frames),
1696            FrameError {
1697                position: 0,
1698                inner: FrameErrorInner::ExtensionTooLarge {
1699                    format: ArchiveFormat::Pax,
1700                    size,
1701                    limit: DEFAULT_MAX_PAX_EXTENSION_SIZE,
1702                },
1703            } if *size == DEFAULT_MAX_PAX_EXTENSION_SIZE + 1
1704        ));
1705    }
1706
1707    #[test]
1708    fn oversized_pax_extension_does_not_read_its_payload_block() {
1709        let mut bytes = header(b'x', 1).to_vec();
1710        bytes.resize(BLOCK_SIZE * 2, 0);
1711        let consumed = Rc::new(Cell::new(0));
1712        let reader = CountingReader {
1713            bytes,
1714            position: 0,
1715            consumed: Rc::clone(&consumed),
1716        };
1717        let mut stream = TarStream::new(reader);
1718        stream.set_max_pax_extension_size(0);
1719
1720        assert!(matches!(
1721            ready(stream.next()),
1722            Some(Err(FrameError {
1723                position: 0,
1724                inner: FrameErrorInner::ExtensionTooLarge {
1725                    format: ArchiveFormat::Pax,
1726                    size: 1,
1727                    limit: 0,
1728                },
1729            }))
1730        ));
1731        assert_eq!(consumed.get(), BLOCK_SIZE);
1732    }
1733
1734    #[test]
1735    fn accepts_pax_extensions_at_the_configured_limit() {
1736        let mut payload = record("comment", "metadata");
1737        payload.extend_from_slice(&record("ACME.attribute", "value"));
1738        for (case, typeflag) in [("local", b'x'), ("global", b'g')] {
1739            let mut bytes = Vec::new();
1740            append_pax(&mut bytes, typeflag, &payload);
1741            if typeflag == b'x' {
1742                append_block(&mut bytes, &header(b'0', 0));
1743            }
1744            append_terminator(&mut bytes);
1745
1746            let frames = collect_with_max_pax_extension_size(
1747                bytes,
1748                7,
1749                payload
1750                    .len()
1751                    .try_into()
1752                    .expect("payload size should fit u64"),
1753            );
1754            assert!(frames.iter().all(Result::is_ok), "{case}");
1755        }
1756    }
1757
1758    #[test]
1759    fn applies_global_pax_records_overrides_and_rejects_size_deletions() {
1760        let mut initial_global = record("comment", "old");
1761        initial_global.extend_from_slice(&record("size", "2"));
1762        let replacement_global = record("comment", "new");
1763        let mut local = record("comment", "local");
1764        local.extend_from_slice(&record("size", "3"));
1765        let mut deletion = record("comment", "");
1766        deletion.extend_from_slice(&record("size", ""));
1767
1768        let mut bytes = Vec::new();
1769        append_pax(&mut bytes, b'g', &initial_global);
1770        append_pax(&mut bytes, b'g', &replacement_global);
1771        append_block(&mut bytes, &header(b'0', 1));
1772        append_payload(&mut bytes, b"ab");
1773        append_pax(&mut bytes, b'x', &local);
1774        append_block(&mut bytes, &header(b'0', 1));
1775        append_payload(&mut bytes, b"abc");
1776        append_pax(&mut bytes, b'g', &deletion);
1777        append_block(&mut bytes, &header(b'5', 1));
1778        append_terminator(&mut bytes);
1779
1780        let frames = collect(bytes, 31);
1781        assert!(frames.iter().any(|frame| matches!(
1782            frame,
1783            Ok(Frame::Pax(PaxFrame {
1784                kind: PaxKind::Global,
1785                ..
1786            }))
1787        )));
1788        assert!(frames.iter().any(|frame| matches!(
1789            frame,
1790            Ok(Frame::Data(DataFrame {
1791                owner: DataOwner::Pax(PaxKind::Global),
1792                ..
1793            }))
1794        )));
1795        let completed_global_payloads: Vec<&[PaxRecord]> = frames
1796            .iter()
1797            .filter_map(|frame| match frame {
1798                Ok(Frame::Data(frame)) if frame.owner == DataOwner::Pax(PaxKind::Global) => {
1799                    frame.completed_pax_records()
1800                }
1801                _ => None,
1802            })
1803            .collect();
1804        assert_eq!(completed_global_payloads.len(), 3);
1805        assert_eq!(
1806            completed_global_payloads[2],
1807            [
1808                PaxRecord::Comment(PaxValue::Deleted),
1809                PaxRecord::Size(PaxValue::Deleted),
1810            ]
1811        );
1812        let headers: Vec<&HeaderFrame> = frames
1813            .iter()
1814            .filter_map(|frame| match frame {
1815                Ok(Frame::Header(header)) => Some(header),
1816                _ => None,
1817            })
1818            .collect();
1819        assert_eq!(headers.len(), 2);
1820        assert_eq!(headers[0].effective_size, 2);
1821        assert_eq!(headers[1].effective_size, 3);
1822        assert!(frames.iter().any(|frame| {
1823            matches!(
1824                frame,
1825                Ok(Frame::Data(frame))
1826                    if frame.owner == DataOwner::Pax(PaxKind::Local)
1827                        && frame.completed_pax_records() == Some(local_records("local", 3).as_slice())
1828            )
1829        }));
1830        assert!(matches!(
1831            last_error_inner(&frames),
1832            FrameErrorInner::DeletedPaxMetadata { keyword: "size" }
1833        ));
1834    }
1835
1836    fn local_records(comment: &str, size: u64) -> Vec<PaxRecord> {
1837        vec![
1838            PaxRecord::Comment(PaxValue::Value(comment.into())),
1839            PaxRecord::Size(PaxValue::Value(size)),
1840        ]
1841    }
1842
1843    #[test]
1844    fn allows_local_size_deletion_when_a_later_record_restores_size() {
1845        let mut local = record("size", "");
1846        local.extend_from_slice(&record("size", "2"));
1847        let mut bytes = Vec::new();
1848        append_pax(&mut bytes, b'x', &local);
1849        append_block(&mut bytes, &header(b'0', 1));
1850        append_payload(&mut bytes, b"ab");
1851        append_terminator(&mut bytes);
1852
1853        let frames = collect(bytes, BLOCK_SIZE);
1854        let header = header_frame(&frames, 2);
1855        assert_eq!(header.effective_size, 2);
1856        assert_eq!(
1857            data_frame(&frames, 1).completed_pax_records(),
1858            Some(
1859                [
1860                    PaxRecord::Size(PaxValue::Deleted),
1861                    PaxRecord::Size(PaxValue::Value(2)),
1862                ]
1863                .as_slice()
1864            )
1865        );
1866    }
1867
1868    #[test]
1869    fn pax_records_do_not_make_malformed_ordinary_header_fields_valid() {
1870        let cases = [
1871            (
1872                "local uid",
1873                b'x',
1874                record("uid", "1"),
1875                checksummed_header(|block| block[UID_RANGE].fill(b'u')),
1876                ExpectedHeaderError::InvalidNumericField("uid"),
1877            ),
1878            (
1879                "global gid",
1880                b'g',
1881                record("gid", "2"),
1882                checksummed_header(|block| block[GID_RANGE].fill(b'g')),
1883                ExpectedHeaderError::InvalidNumericField("gid"),
1884            ),
1885            (
1886                "local mtime",
1887                b'x',
1888                record("mtime", "3"),
1889                checksummed_header(|block| block[MTIME_RANGE].fill(b'm')),
1890                ExpectedHeaderError::InvalidNumericField("mtime"),
1891            ),
1892            (
1893                "global uname",
1894                b'g',
1895                record("uname", "user"),
1896                checksummed_header(|block| block[UNAME_RANGE].fill(b'u')),
1897                ExpectedHeaderError::UnterminatedUstarStringField("uname"),
1898            ),
1899            (
1900                "local gname",
1901                b'x',
1902                record("gname", "group"),
1903                checksummed_header(|block| block[GNAME_RANGE].fill(b'g')),
1904                ExpectedHeaderError::UnterminatedUstarStringField("gname"),
1905            ),
1906        ];
1907
1908        for (case, typeflag, records, malformed, expected) in cases {
1909            let mut bytes = Vec::new();
1910            append_pax(&mut bytes, typeflag, &records);
1911            append_block(&mut bytes, &malformed);
1912            append_terminator(&mut bytes);
1913
1914            let frames = collect(bytes, BLOCK_SIZE);
1915            assert!(
1916                expected.matches(last_error_inner(&frames)),
1917                "{case}: {frames:?}"
1918            );
1919        }
1920    }
1921
1922    #[test]
1923    fn accepts_all_nul_unused_device_fields() {
1924        let block = header(b'0', 0);
1925        assert_eq!(parse_octal(&block[DEVMAJOR_RANGE]), None);
1926        assert_eq!(parse_octal(&block[DEVMINOR_RANGE]), None);
1927
1928        let mut bytes = Vec::new();
1929        append_block(&mut bytes, &block);
1930        append_terminator(&mut bytes);
1931        assert!(collect(bytes, BLOCK_SIZE).iter().all(Result::is_ok));
1932    }
1933
1934    #[test]
1935    fn rejects_local_size_deletion_for_payload_free_members() {
1936        let global = record("size", "7");
1937        let local = record("size", "");
1938        let mut bytes = Vec::new();
1939        append_pax(&mut bytes, b'g', &global);
1940        append_pax(&mut bytes, b'x', &local);
1941        append_block(&mut bytes, &header(b'5', 3));
1942        append_terminator(&mut bytes);
1943
1944        assert!(matches!(
1945            last_error_inner(&collect(bytes, BLOCK_SIZE)),
1946            FrameErrorInner::DeletedPaxMetadata { keyword: "size" }
1947        ));
1948    }
1949
1950    #[test]
1951    fn rejects_deleted_size_when_member_payload_cannot_be_framed() {
1952        let records = record("size", "");
1953        for typeflag in [b'x', b'g'] {
1954            let mut bytes = Vec::new();
1955            append_pax(&mut bytes, typeflag, &records);
1956            append_block(&mut bytes, &header(b'0', 0));
1957
1958            assert!(
1959                matches!(
1960                    last_error_inner(&collect(bytes, BLOCK_SIZE)),
1961                    FrameErrorInner::DeletedPaxMetadata { keyword: "size" }
1962                ),
1963                "{typeflag:?}"
1964            );
1965        }
1966    }
1967
1968    #[test]
1969    fn allows_local_size_to_restore_an_active_global_deletion() {
1970        let global = record("size", "");
1971        let local = record("size", "2");
1972        let mut bytes = Vec::new();
1973        append_pax(&mut bytes, b'g', &global);
1974        append_pax(&mut bytes, b'x', &local);
1975        append_block(&mut bytes, &header(b'0', 1));
1976        append_payload(&mut bytes, b"ab");
1977        append_terminator(&mut bytes);
1978
1979        let frames = collect(bytes, BLOCK_SIZE);
1980        let header = header_frame(&frames, 4);
1981        assert_eq!(header.effective_size, 2);
1982        assert_eq!(
1983            data_frame(&frames, 1).completed_pax_records(),
1984            Some([PaxRecord::Size(PaxValue::Deleted)].as_slice())
1985        );
1986        assert_eq!(
1987            data_frame(&frames, 3).completed_pax_records(),
1988            Some([PaxRecord::Size(PaxValue::Value(2))].as_slice())
1989        );
1990    }
1991
1992    #[test]
1993    fn frames_pax_hard_link_bodies_from_header_or_size_override() {
1994        for (case, declared_size, override_size, header_index, data_index) in [
1995            ("physical size", 3, None, 0, 1),
1996            ("pax size", 0, Some("3"), 2, 3),
1997            ("pax size overrides physical size", 1, Some("3"), 2, 3),
1998        ] {
1999            let mut bytes = Vec::new();
2000            if let Some(override_size) = override_size {
2001                append_pax(&mut bytes, b'x', &record("size", override_size));
2002            }
2003            append_block(&mut bytes, &header(b'1', declared_size));
2004            append_payload(&mut bytes, b"abc");
2005            append_terminator(&mut bytes);
2006
2007            let frames = collect(bytes, BLOCK_SIZE);
2008            let header = header_frame(&frames, header_index);
2009            assert_eq!(header.format, ArchiveFormat::Pax, "{case}");
2010            assert_eq!(header.kind, UstarKind::HardLink, "{case}");
2011            assert_eq!(header.declared_size, declared_size, "{case}");
2012            assert_eq!(header.effective_size, 3, "{case}");
2013            assert_eq!(data_frame(&frames, data_index).len, 3, "{case}");
2014        }
2015    }
2016
2017    #[test]
2018    fn zero_data_block_is_not_a_terminator() {
2019        let mut bytes = Vec::new();
2020        append_block(&mut bytes, &header(b'0', BLOCK_SIZE as u64));
2021        append_block(&mut bytes, &[0; BLOCK_SIZE]);
2022        append_terminator(&mut bytes);
2023
2024        let frames = collect(bytes, BLOCK_SIZE);
2025        assert_eq!(frames.len(), 2);
2026        assert!(matches!(frames[1], Ok(Frame::Data(_))));
2027    }
2028
2029    #[test]
2030    fn zero_filled_block_inside_pax_payload_is_data() {
2031        let payload = record("comment", &"\0".repeat(BLOCK_SIZE * 3));
2032        let mut bytes = Vec::new();
2033        append_pax(&mut bytes, b'x', &payload);
2034        append_block(&mut bytes, &header(b'0', 0));
2035        append_terminator(&mut bytes);
2036
2037        let frames = collect(bytes, BLOCK_SIZE);
2038        assert!(frames.iter().any(|frame| matches!(
2039            frame,
2040            Ok(Frame::Data(DataFrame {
2041                block,
2042                owner: DataOwner::Pax(PaxKind::Local),
2043                ..
2044            })) if is_zero_block(block)
2045        )));
2046    }
2047
2048    #[test]
2049    fn frames_gnu_long_metadata_and_base256_payloads() {
2050        let mut bytes = Vec::new();
2051        append_block(&mut bytes, &gnu_base256_header(b'L', 513));
2052        append_payload(&mut bytes, &[b'n'; BLOCK_SIZE]);
2053        append_payload(&mut bytes, b"\0");
2054        append_gnu(&mut bytes, b'K', b"link\0");
2055        append_block(&mut bytes, &gnu_header(b'2', 0));
2056        append_terminator(&mut bytes);
2057
2058        let frames = collect(bytes, 13);
2059        assert_eq!(frames.len(), 6);
2060        assert!(matches!(
2061            frames[0].as_ref().unwrap(),
2062            Frame::Gnu(GnuFrame {
2063                kind: GnuKind::LongName,
2064                payload_size: 513,
2065                ..
2066            })
2067        ));
2068        let final_name = data_frame(&frames, 2);
2069        assert_eq!(final_name.owner, DataOwner::Gnu(GnuKind::LongName));
2070        assert_eq!(final_name.len, 1);
2071        assert!(final_name.completed_pax_records().is_none());
2072        assert!(matches!(
2073            frames[3].as_ref().unwrap(),
2074            Frame::Gnu(GnuFrame {
2075                kind: GnuKind::LongLink,
2076                ..
2077            })
2078        ));
2079        let header = header_frame(&frames, 5);
2080        assert_eq!(header.kind, UstarKind::SymbolicLink);
2081    }
2082
2083    #[test]
2084    fn rejects_header_format_type_and_field_errors() {
2085        for (case, block, expected) in invalid_header_cases() {
2086            let frames = collect(block.to_vec(), BLOCK_SIZE);
2087            let error = last_error_inner(&frames);
2088            assert!(expected.matches(error), "{case}: {error:?}");
2089        }
2090    }
2091
2092    #[test]
2093    fn rejects_malformed_gnu_numeric_fields() {
2094        for (field, range) in [
2095            ("mode", MODE_RANGE),
2096            ("uid", UID_RANGE),
2097            ("gid", GID_RANGE),
2098            ("mtime", MTIME_RANGE),
2099        ] {
2100            let mut block = gnu_header(b'0', 0);
2101            block[range].fill(b'x');
2102            set_checksum(&mut block);
2103
2104            assert!(matches!(
2105                last_error_inner(&collect(block.to_vec(), BLOCK_SIZE)),
2106                FrameErrorInner::InvalidNumericField { field: found, .. } if *found == field
2107            ));
2108        }
2109    }
2110
2111    #[test]
2112    fn rejects_nonzero_physical_sizes_for_payload_free_members() {
2113        for (format, block, kind) in [
2114            (ArchiveFormat::Pax, header(b'2', 1), UstarKind::SymbolicLink),
2115            (ArchiveFormat::Gnu, gnu_header(b'1', 1), UstarKind::HardLink),
2116            (
2117                ArchiveFormat::Gnu,
2118                gnu_header(b'2', 1),
2119                UstarKind::SymbolicLink,
2120            ),
2121            (
2122                ArchiveFormat::Pax,
2123                header(b'3', 1),
2124                UstarKind::CharacterDevice,
2125            ),
2126            (
2127                ArchiveFormat::Gnu,
2128                gnu_header(b'3', 1),
2129                UstarKind::CharacterDevice,
2130            ),
2131            (ArchiveFormat::Pax, header(b'4', 1), UstarKind::BlockDevice),
2132            (
2133                ArchiveFormat::Gnu,
2134                gnu_header(b'4', 1),
2135                UstarKind::BlockDevice,
2136            ),
2137            (ArchiveFormat::Pax, header(b'5', 1), UstarKind::Directory),
2138            (
2139                ArchiveFormat::Gnu,
2140                gnu_header(b'5', 1),
2141                UstarKind::Directory,
2142            ),
2143            (ArchiveFormat::Pax, header(b'6', 1), UstarKind::Fifo),
2144            (ArchiveFormat::Gnu, gnu_header(b'6', 1), UstarKind::Fifo),
2145        ] {
2146            let frames = collect(block.to_vec(), BLOCK_SIZE);
2147            assert!(
2148                matches!(
2149                    last_error_inner(&frames),
2150                    FrameErrorInner::InvalidMemberSize {
2151                        kind: found,
2152                        size: 1,
2153                    } if *found == kind
2154                ),
2155                "{format:?} {kind:?}"
2156            );
2157        }
2158    }
2159
2160    #[test]
2161    fn rejects_nonzero_declared_or_effective_pax_sizes_for_payload_free_members() {
2162        for (case, declared_size, override_size) in [("effective", 0, "1"), ("declared", 1, "0")] {
2163            for (typeflag, kind) in [
2164                (b'2', UstarKind::SymbolicLink),
2165                (b'3', UstarKind::CharacterDevice),
2166                (b'4', UstarKind::BlockDevice),
2167                (b'5', UstarKind::Directory),
2168                (b'6', UstarKind::Fifo),
2169            ] {
2170                let mut bytes = Vec::new();
2171                append_pax(&mut bytes, b'x', &record("size", override_size));
2172                append_block(&mut bytes, &header(typeflag, declared_size));
2173
2174                assert!(
2175                    matches!(
2176                        last_error_inner(&collect(bytes, BLOCK_SIZE)),
2177                        FrameErrorInner::InvalidMemberSize {
2178                            kind: found,
2179                            size: 1,
2180                        } if *found == kind
2181                    ),
2182                    "{case} {kind:?}"
2183                );
2184            }
2185        }
2186    }
2187
2188    #[test]
2189    fn header_errors_preserve_later_header_positions() {
2190        let position = BLOCK_SIZE as u64;
2191
2192        for (case, block, expected) in invalid_header_cases() {
2193            let mut bytes = Vec::new();
2194            append_block(&mut bytes, &header(b'0', 0));
2195            append_block(&mut bytes, &block);
2196            let frames = collect(bytes, BLOCK_SIZE);
2197            let error = last_error(&frames);
2198            assert_eq!(error.position, position, "{case}");
2199            assert!(expected.matches(&error.inner), "{case}: {error:?}");
2200        }
2201    }
2202
2203    #[test]
2204    fn rejects_invalid_pax_sequences() {
2205        assert!(matches!(
2206            last_error_inner(&collect(header(b'x', 0).to_vec(), BLOCK_SIZE)),
2207            FrameErrorInner::InvalidPaxRecord {
2208                source: PaxError::InvalidRecords { .. },
2209            }
2210        ));
2211
2212        let valid = record("path", "name");
2213        let mut consecutive = Vec::new();
2214        append_pax(&mut consecutive, b'x', &valid);
2215        append_block(&mut consecutive, &header(b'x', valid.len() as u64));
2216        assert!(matches!(
2217            last_error_inner(&collect(consecutive, BLOCK_SIZE)),
2218            FrameErrorInner::UnexpectedOrder { .. }
2219        ));
2220
2221        let mut missing_member = Vec::new();
2222        append_pax(&mut missing_member, b'x', &valid);
2223        assert!(matches!(
2224            last_error_inner(&collect(missing_member, BLOCK_SIZE)),
2225            FrameErrorInner::UnexpectedEof { .. }
2226        ));
2227    }
2228
2229    #[test]
2230    fn preserves_pax_parse_error_positions_in_stream() {
2231        let invalid = record("size", "bad");
2232        let mut bytes = Vec::new();
2233        append_block(&mut bytes, &header(b'0', 0));
2234        append_pax(&mut bytes, b'x', &invalid);
2235
2236        let frames = collect(bytes, BLOCK_SIZE);
2237        assert!(matches!(
2238            frames.last(),
2239            Some(Err(FrameError {
2240                position,
2241                inner: FrameErrorInner::InvalidPaxRecord {
2242                    source: PaxError::InvalidInteger { .. },
2243                },
2244            })) if *position == BLOCK_SIZE as u64
2245        ));
2246    }
2247
2248    #[test]
2249    fn accepts_binary_and_rejects_unknown_pax_charsets() {
2250        let mut global = record("hdrcharset", "BINARY");
2251        global.extend_from_slice(&record("path", "global"));
2252        let local = record("path", "local");
2253        let mut bytes = Vec::new();
2254        append_pax(&mut bytes, b'g', &global);
2255        append_pax(&mut bytes, b'x', &local);
2256        append_block(&mut bytes, &header(b'0', 0));
2257        append_terminator(&mut bytes);
2258        let frames = collect(bytes, BLOCK_SIZE);
2259        let member_header = header_frame(&frames, 4);
2260        assert_eq!(member_header.kind, UstarKind::Regular);
2261        assert_eq!(
2262            data_frame(&frames, 1).completed_pax_records(),
2263            Some(
2264                [
2265                    PaxRecord::HdrCharset(PaxValue::Value(HdrCharset::Binary)),
2266                    PaxRecord::Path(PaxValue::Value(PaxString::Binary(
2267                        b"global".to_vec().into(),
2268                    ))),
2269                ]
2270                .as_slice()
2271            )
2272        );
2273        assert_eq!(
2274            data_frame(&frames, 3).completed_pax_records(),
2275            Some(
2276                [PaxRecord::Path(PaxValue::Value(PaxString::Binary(
2277                    b"local".to_vec().into()
2278                )))]
2279                .as_slice()
2280            )
2281        );
2282
2283        let records = record("hdrcharset", "ISO-IR 8859 1 1998");
2284        let mut bytes = Vec::new();
2285        append_pax(&mut bytes, b'x', &records);
2286        assert!(matches!(
2287            last_error_inner(&collect(bytes, BLOCK_SIZE)),
2288            FrameErrorInner::InvalidPaxRecord {
2289                source: PaxError::UnsupportedCharset { value },
2290            } if value == "ISO-IR 8859 1 1998"
2291        ));
2292    }
2293
2294    #[test]
2295    fn rejects_invalid_gnu_sequences_and_sizes() {
2296        let mut duplicate = Vec::new();
2297        append_block(&mut duplicate, &gnu_header(b'L', 0));
2298        append_block(&mut duplicate, &gnu_header(b'L', 0));
2299        let mut long_link_for_regular = Vec::new();
2300        append_block(&mut long_link_for_regular, &gnu_header(b'K', 0));
2301        append_block(&mut long_link_for_regular, &gnu_header(b'0', 0));
2302        let mut dangling = Vec::new();
2303        append_block(&mut dangling, &gnu_header(b'L', 0));
2304        append_terminator(&mut dangling);
2305        for (case, bytes) in [
2306            ("duplicate", duplicate),
2307            ("long-link-for-regular", long_link_for_regular),
2308            ("dangling", dangling),
2309        ] {
2310            assert!(
2311                matches!(
2312                    last_error_inner(&collect(bytes, BLOCK_SIZE)),
2313                    FrameErrorInner::UnexpectedOrder { .. }
2314                ),
2315                "{case}"
2316            );
2317        }
2318
2319        assert!(matches!(
2320            last_error_inner(&collect(gnu_header(b'S', 0).to_vec(), BLOCK_SIZE)),
2321            FrameErrorInner::UnsupportedTypeflag { typeflag: b'S' }
2322        ));
2323
2324        let mut negative_size = gnu_header(b'0', 0);
2325        negative_size[SIZE_RANGE].fill(0xff);
2326        set_checksum(&mut negative_size);
2327        assert!(matches!(
2328            last_error_inner(&collect(negative_size.to_vec(), BLOCK_SIZE)),
2329            FrameErrorInner::InvalidSize { .. }
2330        ));
2331    }
2332
2333    #[test]
2334    fn detects_one_archive_family_and_rejects_mixing() {
2335        let mut posix_then_gnu = Vec::new();
2336        append_block(&mut posix_then_gnu, &header(b'0', 0));
2337        append_block(&mut posix_then_gnu, &gnu_header(b'0', 0));
2338        assert!(matches!(
2339            last_error_inner(&collect(posix_then_gnu, BLOCK_SIZE)),
2340            FrameErrorInner::FormatMismatch {
2341                expected: ArchiveFormat::Pax,
2342                found: ArchiveFormat::Gnu,
2343            }
2344        ));
2345
2346        // A family mismatch applies only to a successfully decoded header.
2347        let mut malformed_gnu = gnu_header(b'0', 0);
2348        malformed_gnu[0] = b'X';
2349        let mut posix_then_malformed_gnu = Vec::new();
2350        append_block(&mut posix_then_malformed_gnu, &header(b'0', 0));
2351        append_block(&mut posix_then_malformed_gnu, &malformed_gnu);
2352        assert!(matches!(
2353            last_error_inner(&collect(posix_then_malformed_gnu, BLOCK_SIZE)),
2354            FrameErrorInner::InvalidChecksum { .. }
2355        ));
2356
2357        let mut gnu_then_posix = Vec::new();
2358        append_block(&mut gnu_then_posix, &gnu_header(b'0', 0));
2359        append_block(&mut gnu_then_posix, &header(b'0', 0));
2360        assert!(matches!(
2361            last_error_inner(&collect(gnu_then_posix, BLOCK_SIZE)),
2362            FrameErrorInner::FormatMismatch {
2363                expected: ArchiveFormat::Gnu,
2364                found: ArchiveFormat::Pax,
2365            }
2366        ));
2367
2368        for typeflag in [b'x', b'g'] {
2369            assert!(
2370                matches!(
2371                    last_error_inner(&collect(gnu_header(typeflag, 0).to_vec(), BLOCK_SIZE)),
2372                    FrameErrorInner::UnsupportedTypeflag { typeflag: found } if *found == typeflag
2373                ),
2374                "{typeflag:?}"
2375            );
2376        }
2377
2378        let mut empty = Vec::new();
2379        append_terminator(&mut empty);
2380        let mut stream = TarStream::new(ChunkedReader::new(empty, BLOCK_SIZE));
2381        let waker = std::task::Waker::noop();
2382        let mut cx = Context::from_waker(waker);
2383        assert!(matches!(
2384            Pin::new(&mut stream).poll_next(&mut cx),
2385            Poll::Ready(None)
2386        ));
2387        assert_eq!(stream.format(), None);
2388    }
2389
2390    #[test]
2391    fn rejects_truncation_and_invalid_termination() {
2392        assert!(matches!(
2393            last_error_inner(&collect(vec![0; 3], 1)),
2394            FrameErrorInner::IncompleteBlock { read: 3 }
2395        ));
2396
2397        let mut payload_truncated = Vec::new();
2398        append_block(&mut payload_truncated, &header(b'0', 1));
2399        assert!(matches!(
2400            last_error_inner(&collect(payload_truncated, BLOCK_SIZE)),
2401            FrameErrorInner::TruncatedPayload {
2402                owner: DataOwner::Member,
2403                ..
2404            }
2405        ));
2406
2407        let mut pax_payload_truncated = Vec::new();
2408        append_block(&mut pax_payload_truncated, &header(b'x', 513));
2409        append_payload(&mut pax_payload_truncated, b"11 path=x\n");
2410        assert!(matches!(
2411            last_error_inner(&collect(pax_payload_truncated, BLOCK_SIZE)),
2412            FrameErrorInner::TruncatedPayload {
2413                owner: DataOwner::Pax(PaxKind::Local),
2414                ..
2415            }
2416        ));
2417
2418        let mut missing_second_zero = Vec::new();
2419        append_block(&mut missing_second_zero, &header(b'0', 0));
2420        append_block(&mut missing_second_zero, &[0; BLOCK_SIZE]);
2421        assert!(matches!(
2422            last_error_inner(&collect(missing_second_zero, BLOCK_SIZE)),
2423            FrameErrorInner::MissingEndMarker
2424        ));
2425
2426        let mut bad_second_zero = Vec::new();
2427        append_block(&mut bad_second_zero, &header(b'0', 0));
2428        append_block(&mut bad_second_zero, &[0; BLOCK_SIZE]);
2429        append_block(&mut bad_second_zero, &header(b'0', 0));
2430        assert!(matches!(
2431            last_error_inner(&collect(bad_second_zero, BLOCK_SIZE)),
2432            FrameErrorInner::InvalidEndMarker
2433        ));
2434    }
2435
2436    #[test]
2437    fn stream_is_fused_after_first_error() {
2438        let mut stream = TarStream::new(ChunkedReader::new(header(b'L', 0).to_vec(), BLOCK_SIZE));
2439        let waker = std::task::Waker::noop();
2440        let mut cx = Context::from_waker(waker);
2441        assert!(matches!(
2442            Pin::new(&mut stream).poll_next(&mut cx),
2443            Poll::Ready(Some(Err(FrameError {
2444                position: 0,
2445                inner: FrameErrorInner::UnsupportedTypeflag { typeflag: b'L' },
2446            })))
2447        ));
2448        assert!(matches!(
2449            Pin::new(&mut stream).poll_next(&mut cx),
2450            Poll::Ready(None)
2451        ));
2452    }
2453}