Skip to main content

tar_framing/
logical.rs

1//! Member-oriented reading above the lossless physical frame stream.
2//!
3//! This API assembles PAX and GNU extension payloads with the ordinary members
4//! they describe. Each member carries a compact borrowed [`Header`], and each
5//! PAX member carries one unified [`PaxState`].
6
7use std::{borrow::Cow, mem, ops::Range};
8
9use tokio::io::AsyncRead;
10use tokio_stream::StreamExt;
11
12use crate::{
13    ArchiveFormat, Block, FrameError, FrameErrorInner, GnuKind, PaxKeyword, PaxKind, PaxRecord,
14    PaxString, PaxValue, UstarKind,
15    header::{GNAME_RANGE, LINK_NAME_RANGE, UNAME_RANGE},
16    pax::GlobalPaxRecords,
17    stream::{DataFrame, DataOwner, Frame, HeaderFrame, TarStream},
18};
19
20pub use crate::{PaxExtension, PaxState};
21
22const PAYLOAD_DRAIN_CHUNK_BYTES: usize = 1024 * 1024;
23
24/// A GNU long-name or long-link value needed to interpret a member.
25#[derive(Clone, Debug, Eq, PartialEq)]
26pub struct GnuMetadata {
27    /// The absolute byte position of the GNU extension header block.
28    pub position: u64,
29    /// The meaningful metadata payload bytes, excluding tar padding.
30    pub payload: Vec<u8>,
31}
32
33/// Extension metadata attached to one ordinary archive member.
34#[derive(Clone, Debug, Eq, PartialEq)]
35pub enum MemberExtensions<'a> {
36    /// Unified pax metadata applicable to an ordinary ustar member, borrowing
37    /// effective global values from the logical reader.
38    Pax(PaxState<'a>),
39    /// GNU metadata applying to an ordinary GNU member.
40    Gnu {
41        /// Optional GNU long-name metadata.
42        long_name: Option<GnuMetadata>,
43        /// Optional GNU long-link metadata.
44        long_link: Option<GnuMetadata>,
45    },
46}
47
48/// Extracted ordinary-header metadata for one logical archive member.
49///
50/// Unlike [`HeaderFrame`], this type does not retain the lossless physical
51/// header block. Its ordinary path and link-path fallbacks borrow reusable
52/// storage owned by [`TarReader`].
53#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub struct Header<'a> {
55    /// The absolute byte position of the ordinary member header block.
56    pub position: u64,
57    /// The selected archive family of this member header.
58    pub format: ArchiveFormat,
59    /// The member type identified by the header.
60    pub kind: UstarKind,
61    /// The size encoded directly in the member header field.
62    pub declared_size: u64,
63    /// The size after applying applicable pax `size` records.
64    ///
65    /// This is also the number of payload bytes exposed through
66    /// [`MemberPayload`]. Member kinds that cannot carry payload are rejected
67    /// when either their declared or effective size is nonzero.
68    pub effective_size: u64,
69    /// Permission and mode bits decoded from the ordinary header, if present.
70    ///
71    /// Note that pax only defines the semantics of the lower 12 bits of this
72    /// field. Higher bits may or may not be set, and have no assigned semantics.
73    ///
74    /// This is [`None`] only when the field is wholly NUL and the framing policy
75    /// permits missing numeric metadata.
76    pub mode: Option<u64>,
77    /// Numeric user identifier from the ordinary header, if present.
78    ///
79    /// This is [`None`] only when the field is wholly NUL and the framing policy
80    /// permits missing numeric metadata.
81    ///
82    /// Applicable pax metadata may override or delete this fallback.
83    pub uid: Option<u64>,
84    /// Numeric group identifier from the ordinary header, if present.
85    ///
86    /// This is [`None`] only when the field is wholly NUL and the framing policy
87    /// permits missing numeric metadata.
88    ///
89    /// Applicable pax metadata may override or delete this fallback.
90    pub gid: Option<u64>,
91    /// Modification time in seconds from the ordinary header, if present.
92    ///
93    /// This is [`None`] only when the field is wholly NUL and the framing policy
94    /// permits missing numeric metadata.
95    ///
96    /// Applicable pax metadata may override or delete this fallback.
97    pub mtime: Option<u64>,
98    /// User name bytes from the ordinary header, empty if absent or unusable.
99    ///
100    /// Applicable pax metadata may override or delete this fallback.
101    pub uname: &'a [u8],
102    /// Group name bytes from the ordinary header, empty if absent or unusable.
103    ///
104    /// Applicable pax metadata may override or delete this fallback.
105    pub gname: &'a [u8],
106    header_path: &'a [u8],
107    link_name: &'a [u8],
108}
109
110/// One meaningful payload block belonging to an ordinary archive member.
111#[derive(Clone, Debug, Eq, PartialEq)]
112pub struct PayloadBlock {
113    /// The absolute byte position of this payload block.
114    pub position: u64,
115    /// The lossless payload block bytes, including any final padding.
116    pub block: Block,
117    /// The number of meaningful bytes in this block.
118    pub len: usize,
119}
120
121/// An ordinary archive member and its streaming payload cursor.
122pub struct MemberFrame<'a, R> {
123    /// The ordinary member header.
124    pub header: Header<'a>,
125    /// Extension metadata applying to this member.
126    pub extensions: MemberExtensions<'a>,
127    /// A cursor over the member payload bytes.
128    pub payload: MemberPayload<'a, R>,
129}
130
131impl<R> MemberFrame<'_, R> {
132    /// Returns the effective member path after applying pax or GNU metadata.
133    ///
134    /// An explicit pax deletion is an error because it also removes the
135    /// ordinary-header fallback required to identify this member. Empty paths
136    /// and paths containing embedded NUL bytes are also rejected.
137    pub fn effective_path(&self) -> Result<Cow<'_, [u8]>, FrameError> {
138        let path = effective_member_path(&self.header, &self.extensions)?;
139        if path.is_empty() {
140            return Err(FrameError::at(
141                self.header.position,
142                FrameErrorInner::EmptyMemberPath,
143            ));
144        }
145        reject_nul(self.header.position, "path", path.as_ref())?;
146        Ok(path)
147    }
148
149    /// Returns the effective member link target after applying pax or GNU metadata.
150    ///
151    /// An explicit pax deletion is an error because it also removes the
152    /// ordinary-header fallback required to identify a link target. Link
153    /// targets containing embedded NUL bytes are also rejected.
154    pub fn effective_link_path(&self) -> Result<Cow<'_, [u8]>, FrameError> {
155        let path = match &self.extensions {
156            MemberExtensions::Pax(state) => resolve_pax_text(
157                self.header.position,
158                state,
159                &PaxKeyword::LinkPath,
160                "linkpath",
161                Cow::Borrowed(self.header.link_name),
162                |record| match record {
163                    PaxRecord::LinkPath(value) => Some(value),
164                    _ => None,
165                },
166            ),
167            MemberExtensions::Gnu { long_link, .. } => match long_link {
168                Some(metadata) => Ok(Cow::Borrowed(parse_gnu_metadata(
169                    metadata,
170                    GnuKind::LongLink,
171                )?)),
172                None => Ok(Cow::Borrowed(self.header.link_name)),
173            },
174        }?;
175        reject_nul(self.header.position, "link path", path.as_ref())?;
176        Ok(path)
177    }
178}
179
180/// A streaming, typed cursor over one member's payload blocks.
181pub struct MemberPayload<'a, R> {
182    reader: &'a mut PayloadReader<R>,
183}
184
185/// A logical reader that assembles physical frames into archive-level items.
186///
187/// Unlike [`TarStream`], this API attaches PAX or GNU extension metadata to the
188/// ordinary member it describes. Each PAX member carries one [`PaxState`] with
189/// effective metadata and newly encountered positioned extensions. Ordinary
190/// header path and link-path fallbacks are copied into reusable storage and
191/// borrowed by the returned [`Header`].
192pub struct TarReader<R> {
193    // Keep the logical effective state outside `payload` so a returned
194    // `PaxState` can borrow it while `MemberPayload` mutably borrows only the
195    // independent payload machinery. `TarStream` maintains its own physical
196    // copy for framing decisions.
197    global_pax_records: Option<GlobalPaxRecords>,
198    payload: PayloadReader<R>,
199    header_storage: HeaderStorage,
200    pending_extensions: PendingExtensions,
201    extension_payload: Option<ExtensionPayload>,
202}
203
204/// Payload state kept separate so [`MemberPayload`] can borrow it mutably while
205/// the logical [`Header`] borrows reusable header storage.
206struct PayloadReader<R> {
207    stream: TarStream<R>,
208    remaining: u64,
209    drain_buffer: Vec<u8>,
210}
211
212/// Logical member metadata retained across cancellation of [`TarReader::next_frame`].
213#[derive(Default)]
214struct PendingExtensions {
215    global_pax: Vec<PaxExtension>,
216    local_pax: Option<PaxExtension>,
217    gnu_long_name: Option<GnuMetadata>,
218    gnu_long_link: Option<GnuMetadata>,
219}
220
221impl PendingExtensions {
222    fn set_gnu(&mut self, kind: GnuKind, metadata: GnuMetadata) {
223        *match kind {
224            GnuKind::LongName => &mut self.gnu_long_name,
225            GnuKind::LongLink => &mut self.gnu_long_link,
226        } = Some(metadata);
227    }
228}
229
230/// An extension payload being assembled across physical frames.
231enum ExtensionPayload {
232    Pax {
233        position: u64,
234        kind: PaxKind,
235    },
236    Gnu {
237        position: u64,
238        kind: GnuKind,
239        remaining: u64,
240        payload: Vec<u8>,
241    },
242}
243
244#[derive(Default)]
245struct HeaderStorage {
246    path: Vec<u8>,
247    link_name: Vec<u8>,
248    uname: Vec<u8>,
249    gname: Vec<u8>,
250}
251
252impl HeaderStorage {
253    fn update<'a>(&'a mut self, frame: &HeaderFrame) -> Header<'a> {
254        frame.copy_header_path_into(&mut self.path);
255        copy_string_field_into(&frame.block, LINK_NAME_RANGE, &mut self.link_name);
256        copy_string_field_into(&frame.block, UNAME_RANGE, &mut self.uname);
257        copy_string_field_into(&frame.block, GNAME_RANGE, &mut self.gname);
258        Header {
259            position: frame.position,
260            format: frame.format,
261            kind: frame.kind,
262            declared_size: frame.declared_size,
263            effective_size: frame.effective_size,
264            mode: frame.mode,
265            uid: frame.uid,
266            gid: frame.gid,
267            mtime: frame.mtime,
268            uname: &self.uname,
269            gname: &self.gname,
270            header_path: &self.path,
271            link_name: &self.link_name,
272        }
273    }
274}
275
276fn copy_string_field_into(block: &Block, range: Range<usize>, destination: &mut Vec<u8>) {
277    let field = &block[range];
278    let len = field
279        .iter()
280        .position(|byte| *byte == 0)
281        .unwrap_or(field.len());
282    destination.clear();
283    destination.extend_from_slice(&field[..len]);
284}
285
286impl<R> TarReader<R> {
287    /// Creates a new logical reader from an uncompressed tar reader.
288    pub fn new(reader: R) -> Self {
289        Self {
290            global_pax_records: None,
291            payload: PayloadReader {
292                stream: TarStream::new(reader),
293                remaining: 0,
294                drain_buffer: Vec::new(),
295            },
296            header_storage: HeaderStorage::default(),
297            pending_extensions: PendingExtensions::default(),
298            extension_payload: None,
299        }
300    }
301
302    /// Sets the maximum size accepted for each subsequent pax extension.
303    ///
304    /// A local or global pax header that declares a larger payload is rejected
305    /// before any of its payload blocks are consumed. Setting the maximum to
306    /// [`u64::MAX`] removes the per-extension bound; global extensions remain
307    /// subject to their cumulative limit.
308    ///
309    /// See [`TarStream::set_max_pax_extension_size`].
310    pub fn set_max_pax_extension_size(&mut self, max_pax_extension_size: u64) {
311        self.payload
312            .stream
313            .set_max_pax_extension_size(max_pax_extension_size);
314    }
315
316    /// Sets the maximum cumulative size of global pax extensions before one member.
317    ///
318    /// A global header that would increase the pending total beyond this limit
319    /// is rejected before its payload is consumed. Setting the maximum to
320    /// [`u64::MAX`] removes the cumulative bound; each extension remains
321    /// subject to its individual limit.
322    ///
323    /// See [`TarStream::set_max_global_pax_extensions_size`].
324    pub fn set_max_global_pax_extensions_size(&mut self, max_global_pax_extensions_size: u64) {
325        self.payload
326            .stream
327            .set_max_global_pax_extensions_size(max_global_pax_extensions_size);
328    }
329
330    /// Sets whether wholly NUL numeric metadata fields may be accepted.
331    ///
332    /// See [`TarStream::set_allow_all_nul_numeric_fields`].
333    pub fn set_allow_all_nul_numeric_fields(&mut self, allow: bool) {
334        self.payload.stream.set_allow_all_nul_numeric_fields(allow);
335    }
336
337    /// Sets the maximum size accepted for each subsequent GNU metadata extension.
338    ///
339    /// A long-name or long-link header that declares a larger payload is
340    /// rejected before any of its payload blocks are consumed. Setting the
341    /// maximum to [`u64::MAX`] permits unbounded metadata buffering.
342    pub fn set_max_gnu_extension_size(&mut self, max_gnu_extension_size: u64) {
343        self.payload
344            .stream
345            .set_max_gnu_extension_size(max_gnu_extension_size);
346    }
347}
348
349impl<R: AsyncRead + Unpin> TarReader<R> {
350    /// Returns the next ordinary archive member.
351    ///
352    /// If the preceding member payload was not fully consumed, it is first
353    /// drained and validated. Extension metadata is then consumed and attached
354    /// before the next member is returned. Global pax updates not followed by
355    /// an ordinary member are consumed and ignored. A returned pax state is a
356    /// view borrowing this reader; it must be dropped before requesting another
357    /// member.
358    pub async fn next_frame(&mut self) -> Result<Option<MemberFrame<'_, R>>, FrameError> {
359        if let Err(error) = self.payload.drain_payload().await {
360            self.clear_extension_state();
361            return Err(error);
362        }
363
364        loop {
365            let frame = match self.payload.stream.next().await {
366                Some(Ok(frame)) => frame,
367                Some(Err(error)) => {
368                    self.clear_extension_state();
369                    return Err(error);
370                }
371                None => {
372                    self.clear_extension_state();
373                    return Ok(None);
374                }
375            };
376            match frame {
377                Frame::Pax(frame) => {
378                    self.extension_payload = Some(ExtensionPayload::Pax {
379                        position: frame.position,
380                        kind: frame.kind,
381                    });
382                }
383                Frame::Gnu(frame) => {
384                    if frame.payload_size == 0 {
385                        let metadata = GnuMetadata {
386                            position: frame.position,
387                            payload: Vec::new(),
388                        };
389                        self.pending_extensions.set_gnu(frame.kind, metadata);
390                    } else {
391                        self.extension_payload = Some(ExtensionPayload::Gnu {
392                            position: frame.position,
393                            kind: frame.kind,
394                            remaining: frame.payload_size,
395                            payload: Vec::new(),
396                        });
397                    }
398                }
399                Frame::Header(header) => {
400                    let pending_extensions = mem::take(&mut self.pending_extensions);
401                    let extensions = match header.format {
402                        ArchiveFormat::Pax => MemberExtensions::Pax(PaxState::new(
403                            self.global_pax_records.as_ref(),
404                            pending_extensions.global_pax,
405                            pending_extensions.local_pax,
406                        )),
407                        ArchiveFormat::Gnu => MemberExtensions::Gnu {
408                            long_name: pending_extensions.gnu_long_name,
409                            long_link: pending_extensions.gnu_long_link,
410                        },
411                    };
412                    self.payload.remaining = header.effective_size;
413                    let header = self.header_storage.update(&header);
414                    return Ok(Some(MemberFrame {
415                        header,
416                        extensions,
417                        payload: MemberPayload {
418                            reader: &mut self.payload,
419                        },
420                    }));
421                }
422                Frame::Data(frame) => {
423                    if let Err(error) = self.process_extension_data(frame) {
424                        self.clear_extension_state();
425                        return Err(error);
426                    }
427                }
428            }
429        }
430    }
431
432    fn clear_extension_state(&mut self) {
433        self.pending_extensions = PendingExtensions::default();
434        self.extension_payload = None;
435    }
436
437    fn process_extension_data(&mut self, frame: DataFrame) -> Result<(), FrameError> {
438        let Some(payload) = self.extension_payload.take() else {
439            return Err(FrameError::unexpected_order(
440                frame.position,
441                "extension header or ordinary member header",
442                "unattached payload data",
443            ));
444        };
445        match payload {
446            ExtensionPayload::Pax { position, kind } => {
447                if frame.owner != DataOwner::Pax(kind) {
448                    return Err(FrameError::unexpected_order(
449                        frame.position,
450                        "pax extension payload",
451                        "different payload data",
452                    ));
453                }
454                if let Some(records) = frame.into_completed_pax_records() {
455                    match kind {
456                        PaxKind::Global => {
457                            records.apply_global(&mut self.global_pax_records);
458                            self.pending_extensions
459                                .global_pax
460                                .push(PaxExtension::new(position, kind, records));
461                        }
462                        PaxKind::Local => {
463                            self.pending_extensions.local_pax =
464                                Some(PaxExtension::new(position, kind, records));
465                        }
466                    }
467                } else {
468                    self.extension_payload = Some(ExtensionPayload::Pax { position, kind });
469                }
470            }
471            ExtensionPayload::Gnu {
472                position,
473                kind,
474                mut remaining,
475                mut payload,
476            } => {
477                if frame.owner != DataOwner::Gnu(kind) {
478                    return Err(FrameError::unexpected_order(
479                        frame.position,
480                        "GNU metadata payload",
481                        "different payload data",
482                    ));
483                }
484                let len = u64::try_from(frame.len).map_err(|_| {
485                    FrameError::arithmetic_overflow(frame.position, "GNU metadata payload length")
486                })?;
487                remaining = remaining.checked_sub(len).ok_or_else(|| {
488                    FrameError::unexpected_order(
489                        frame.position,
490                        "bounded GNU metadata payload",
491                        "oversized GNU metadata payload",
492                    )
493                })?;
494                payload.extend_from_slice(&frame.block[..frame.len]);
495                if remaining == 0 {
496                    let metadata = GnuMetadata { position, payload };
497                    self.pending_extensions.set_gnu(kind, metadata);
498                } else {
499                    self.extension_payload = Some(ExtensionPayload::Gnu {
500                        position,
501                        kind,
502                        remaining,
503                        payload,
504                    });
505                }
506            }
507        }
508        Ok(())
509    }
510}
511
512impl<R: AsyncRead + Unpin> PayloadReader<R> {
513    async fn next_payload_block(&mut self) -> Result<Option<PayloadBlock>, FrameError> {
514        if self.remaining == 0 {
515            return Ok(None);
516        }
517        let (position, block, len) = self.stream.read_member_block().await?;
518        let payload_len = u64::try_from(len)
519            .map_err(|_| FrameError::arithmetic_overflow(position, "member payload length"))?;
520        self.remaining = self.remaining.checked_sub(payload_len).ok_or_else(|| {
521            FrameError::unexpected_order(
522                position,
523                "bounded member payload",
524                "oversized member payload",
525            )
526        })?;
527        Ok(Some(PayloadBlock {
528            position,
529            block,
530            len,
531        }))
532    }
533
534    async fn next_payload_chunk(
535        &mut self,
536        buffer: &mut Vec<u8>,
537        target_len: usize,
538    ) -> Result<bool, FrameError> {
539        if self.remaining == 0 {
540            return Ok(false);
541        }
542        let len = self.stream.read_member_chunk(buffer, target_len).await?;
543        let len = u64::try_from(len).map_err(|_| {
544            FrameError::arithmetic_overflow(self.stream.position, "member payload chunk length")
545        })?;
546        self.remaining = self.remaining.checked_sub(len).ok_or_else(|| {
547            FrameError::unexpected_order(
548                self.stream.position,
549                "bounded member payload",
550                "oversized member payload chunk",
551            )
552        })?;
553        Ok(true)
554    }
555
556    async fn drain_payload(&mut self) -> Result<(), FrameError> {
557        let mut buffer = mem::take(&mut self.drain_buffer);
558        let result = loop {
559            match self
560                .next_payload_chunk(&mut buffer, PAYLOAD_DRAIN_CHUNK_BYTES)
561                .await
562            {
563                Ok(true) => {}
564                Ok(false) => break Ok(()),
565                Err(error) => break Err(error),
566            }
567        };
568        self.drain_buffer = buffer;
569        result
570    }
571}
572
573impl<R: AsyncRead + Unpin> MemberPayload<'_, R> {
574    /// Returns the next meaningful payload block, excluding final padding in `len`.
575    pub async fn next_block(&mut self) -> Result<Option<PayloadBlock>, FrameError> {
576        self.reader.next_payload_block().await
577    }
578
579    /// Reads validated payload bytes into a reusable chunk buffer.
580    ///
581    /// When this returns `true`, the buffer's existing contents are replaced.
582    /// When the payload is exhausted, it returns `false` without changing the
583    /// buffer so its initialized storage can be reused. Complete physical blocks
584    /// are read directly into it until the chunk contains at least `target_len`
585    /// bytes or the payload ends. The target is raised to one physical block
586    /// when it is smaller, and final-block padding is removed before this
587    /// returns. This preserves [`Self::next_block`] as the lossless interface
588    /// while allowing higher-level consumers to amortize per-block bookkeeping
589    /// and copies.
590    pub async fn next_chunk(
591        &mut self,
592        buffer: &mut Vec<u8>,
593        target_len: usize,
594    ) -> Result<bool, FrameError> {
595        self.reader.next_payload_chunk(buffer, target_len).await
596    }
597
598    /// Discards and validates all remaining payload bytes using reusable storage.
599    pub async fn skip(self) -> Result<(), FrameError> {
600        self.reader.drain_payload().await
601    }
602}
603
604fn effective_member_path<'a>(
605    header: &Header<'a>,
606    extensions: &'a MemberExtensions<'_>,
607) -> Result<Cow<'a, [u8]>, FrameError> {
608    match extensions {
609        MemberExtensions::Pax(state) => resolve_pax_text(
610            header.position,
611            state,
612            &PaxKeyword::Path,
613            "path",
614            Cow::Borrowed(header.header_path),
615            |record| match record {
616                PaxRecord::Path(value) => Some(value),
617                _ => None,
618            },
619        ),
620        MemberExtensions::Gnu { long_name, .. } => match long_name {
621            Some(metadata) => Ok(Cow::Borrowed(parse_gnu_metadata(
622                metadata,
623                GnuKind::LongName,
624            )?)),
625            None => Ok(Cow::Borrowed(header.header_path)),
626        },
627    }
628}
629
630fn reject_nul(position: u64, field: &'static str, value: &[u8]) -> Result<(), FrameError> {
631    if value.contains(&0) {
632        return Err(FrameError::at(
633            position,
634            FrameErrorInner::NulInMemberName { field },
635        ));
636    }
637    Ok(())
638}
639
640fn resolve_pax_text<'a>(
641    position: u64,
642    state: &'a PaxState<'_>,
643    keyword: &PaxKeyword,
644    field: &'static str,
645    header_value: Cow<'a, [u8]>,
646    select: fn(&PaxRecord) -> Option<&PaxValue<PaxString>>,
647) -> Result<Cow<'a, [u8]>, FrameError> {
648    if let Some(value) = state.effective_record(keyword).and_then(select) {
649        return pax_value(position, field, value);
650    }
651    Ok(header_value)
652}
653
654/// Return the raw bytes of a pax record, erroring if the record is a tombstone
655/// (i.e.) explicitly deleted.
656fn pax_value<'a>(
657    position: u64,
658    keyword: &'static str,
659    value: &'a PaxValue<PaxString>,
660) -> Result<Cow<'a, [u8]>, FrameError> {
661    match value {
662        PaxValue::Value(PaxString::Utf8(value)) => Ok(Cow::Borrowed(value.as_bytes())),
663        PaxValue::Value(PaxString::Binary(value)) => Ok(Cow::Borrowed(value.as_ref())),
664        // A pax value that has been explicitly deleted does *not*
665        // result in a fallthrough to the corresponding ustar header value:
666        //
667        // "If a keyword in an extended header record (or in a -o option-
668        // argument) overrides or deletes a corresponding field in the ustar
669        // header block, pax shall ignore the contents of that header block
670        // field."
671        //
672        // See: pax spec, "pax Extended Header"
673        PaxValue::Deleted => Err(FrameError::deleted_pax_metadata(position, keyword)),
674    }
675}
676
677fn parse_gnu_metadata(metadata: &GnuMetadata, kind: GnuKind) -> Result<&[u8], FrameError> {
678    let terminator = metadata
679        .payload
680        .iter()
681        .position(|byte| *byte == 0)
682        .ok_or_else(|| {
683            FrameError::invalid_gnu_metadata(metadata.position, kind, "value is not NUL-terminated")
684        })?;
685
686    // TODO: Make this configurable through some kind of policy?
687    // Might be overly strict in practice.
688    if metadata.payload[terminator..].iter().any(|byte| *byte != 0) {
689        return Err(FrameError::invalid_gnu_metadata(
690            metadata.position,
691            kind,
692            "non-NUL bytes follow the terminator",
693        ));
694    }
695    Ok(&metadata.payload[..terminator])
696}
697
698#[cfg(test)]
699mod tests {
700    use tokio::io::AsyncRead;
701
702    use super::*;
703    use crate::{
704        BLOCK_SIZE, DEFAULT_MAX_GNU_EXTENSION_SIZE, FrameError, FrameErrorInner, PaxRecord,
705        PaxValue,
706        header::{
707            GID_RANGE, GNAME_RANGE, LINK_NAME_RANGE, MODE_RANGE, MTIME_RANGE, NAME_RANGE,
708            PREFIX_RANGE, TYPEFLAG_OFFSET, UID_RANGE, UNAME_RANGE,
709        },
710        stream::DataOwner,
711        test_support::{
712            ChunkedReader, append_block, append_gnu, append_pax, append_payload, append_terminator,
713            cancel_pending, gnu_header, header, ready, ready_ok, record, set_checksum,
714        },
715    };
716
717    fn set_field(block: &mut Block, range: std::ops::Range<usize>, value: &[u8]) {
718        block[range.clone()].fill(0);
719        block[range.start..range.start + value.len()].copy_from_slice(value);
720    }
721
722    async fn next_member<R: AsyncRead + Unpin>(
723        reader: &mut TarReader<R>,
724    ) -> Result<MemberFrame<'_, R>, FrameError> {
725        let Some(member) = reader.next_frame().await? else {
726            panic!("expected logical member");
727        };
728        Ok(member)
729    }
730
731    fn pax_state<'a, R>(member: &'a MemberFrame<'_, R>) -> Option<&'a PaxState<'a>> {
732        if let MemberExtensions::Pax(state) = &member.extensions {
733            Some(state)
734        } else {
735            None
736        }
737    }
738
739    fn member_followed_by_empty_member(payload: &[u8]) -> (Vec<u8>, u64) {
740        let mut bytes = Vec::new();
741        append_pax(&mut bytes, b'0', payload);
742        let next_position = u64::try_from(bytes.len()).expect("test position should fit u64");
743        append_block(&mut bytes, &header(b'0', 0));
744        append_terminator(&mut bytes);
745        (bytes, next_position)
746    }
747
748    #[test]
749    fn exposes_ordinary_header_metadata_and_decodes_modes() {
750        let mut ustar_header = header(b'2', 0);
751        set_field(&mut ustar_header, NAME_RANGE, b"file");
752        set_field(&mut ustar_header, PREFIX_RANGE, b"dir");
753        set_field(&mut ustar_header, LINK_NAME_RANGE, b"target");
754        ustar_header[MODE_RANGE].copy_from_slice(b"0100644\0");
755        ustar_header[UID_RANGE].copy_from_slice(b"0000001\0");
756        ustar_header[GID_RANGE].copy_from_slice(b"0000002\0");
757        ustar_header[MTIME_RANGE].copy_from_slice(b"00000000003\0");
758        set_field(&mut ustar_header, UNAME_RANGE, b"user");
759        set_field(&mut ustar_header, GNAME_RANGE, b"group");
760        set_checksum(&mut ustar_header);
761
762        let mut empty_header = header(b'0', 0);
763        for range in [
764            MODE_RANGE,
765            UID_RANGE,
766            GID_RANGE,
767            MTIME_RANGE,
768            UNAME_RANGE,
769            GNAME_RANGE,
770        ] {
771            empty_header[range].fill(0);
772        }
773        set_checksum(&mut empty_header);
774
775        ready_ok(async {
776            let mut bytes = Vec::new();
777            append_block(&mut bytes, &ustar_header);
778            append_block(&mut bytes, &empty_header);
779            append_terminator(&mut bytes);
780            let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
781            {
782                let member = next_member(&mut reader).await?;
783                assert_eq!(member.header.format, ArchiveFormat::Pax);
784                assert_eq!(member.header.header_path, b"dir/file");
785                assert_eq!(member.header.link_name, b"target");
786                assert_eq!(member.header.mode, Some(0o100644));
787                assert_eq!(member.header.uid, Some(1));
788                assert_eq!(member.header.gid, Some(2));
789                assert_eq!(member.header.mtime, Some(3));
790                assert_eq!(member.header.uname, b"user");
791                assert_eq!(member.header.gname, b"group");
792                assert_eq!(member.effective_path()?.as_ref(), b"dir/file");
793                assert_eq!(member.effective_link_path()?.as_ref(), b"target");
794            }
795            let member = next_member(&mut reader).await?;
796            assert_eq!(member.header.mode, None);
797            assert_eq!(member.header.uid, None);
798            assert_eq!(member.header.gid, None);
799            assert_eq!(member.header.mtime, None);
800            assert!(member.header.uname.is_empty());
801            assert!(member.header.gname.is_empty());
802            Ok(())
803        });
804
805        let mut gnu_member_header = gnu_header(b'0', 0);
806        set_field(&mut gnu_member_header, NAME_RANGE, b"name");
807        set_field(&mut gnu_member_header, PREFIX_RANGE, b"ignored");
808        gnu_member_header[MODE_RANGE].fill(0);
809        gnu_member_header[MODE_RANGE.start] = 0x80;
810        gnu_member_header[MODE_RANGE.end - 2..MODE_RANGE.end].copy_from_slice(&[0x81, 0xa4]);
811        set_checksum(&mut gnu_member_header);
812
813        let mut empty_gnu_header = gnu_header(b'0', 0);
814        for range in [MODE_RANGE, UID_RANGE, GID_RANGE, MTIME_RANGE] {
815            empty_gnu_header[range].fill(0);
816        }
817        set_checksum(&mut empty_gnu_header);
818
819        ready_ok(async {
820            let mut bytes = Vec::new();
821            append_block(&mut bytes, &gnu_member_header);
822            append_block(&mut bytes, &empty_gnu_header);
823            append_terminator(&mut bytes);
824            let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
825            {
826                let member = next_member(&mut reader).await?;
827                assert_eq!(member.header.format, ArchiveFormat::Gnu);
828                assert_eq!(member.header.header_path, b"name");
829                assert_eq!(member.header.mode, Some(0o100644));
830                assert_eq!(member.header.uid, Some(0));
831                assert_eq!(member.header.gid, Some(0));
832                assert_eq!(member.header.mtime, Some(0));
833            }
834            let member = next_member(&mut reader).await?;
835            assert_eq!(member.header.mode, None);
836            assert_eq!(member.header.uid, None);
837            assert_eq!(member.header.gid, None);
838            assert_eq!(member.header.mtime, None);
839            Ok(())
840        });
841    }
842
843    #[test]
844    fn preserves_ustar_separator_when_name_is_empty() {
845        let mut ustar_header = header(b'5', 0);
846        set_field(&mut ustar_header, NAME_RANGE, b"");
847        set_field(&mut ustar_header, PREFIX_RANGE, b"victim");
848        set_checksum(&mut ustar_header);
849
850        ready_ok(async {
851            let mut bytes = Vec::new();
852            append_block(&mut bytes, &ustar_header);
853            append_terminator(&mut bytes);
854            let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
855            let member = next_member(&mut reader).await?;
856            assert_eq!(member.header.header_path, b"victim/");
857            assert_eq!(member.effective_path()?.as_ref(), b"victim/");
858            Ok(())
859        });
860    }
861
862    #[test]
863    fn keeps_borrowed_header_metadata_available_while_streaming_payload() {
864        let mut member_header = header(b'0', 1);
865        set_field(&mut member_header, NAME_RANGE, b"file");
866        set_field(&mut member_header, PREFIX_RANGE, b"dir");
867        set_field(&mut member_header, LINK_NAME_RANGE, b"target");
868        member_header[MODE_RANGE].copy_from_slice(b"0000755\0");
869        set_checksum(&mut member_header);
870
871        ready_ok(async {
872            let mut bytes = Vec::new();
873            append_block(&mut bytes, &member_header);
874            append_payload(&mut bytes, b"x");
875            append_terminator(&mut bytes);
876            let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
877            let mut member = next_member(&mut reader).await?;
878
879            assert!(member.payload.next_block().await?.is_some());
880            assert_eq!(member.header.header_path, b"dir/file");
881            assert_eq!(member.header.link_name, b"target");
882            assert_eq!(member.header.mode, Some(0o755));
883            assert_eq!(member.effective_path()?.as_ref(), b"dir/file");
884            assert_eq!(member.effective_link_path()?.as_ref(), b"target");
885            Ok(())
886        });
887    }
888
889    #[test]
890    fn resolves_pax_path_precedence_and_deletions() {
891        let mut global = record("path", "global");
892        global.extend_from_slice(&record("linkpath", "global-link"));
893        let mut local = record("path", "local");
894        local.extend_from_slice(&record("linkpath", ""));
895        let mut bytes = Vec::new();
896        append_pax(&mut bytes, b'g', &global);
897        append_pax(&mut bytes, b'x', &local);
898        append_block(&mut bytes, &header(b'2', 0));
899        append_block(&mut bytes, &header(b'2', 0));
900        append_terminator(&mut bytes);
901
902        ready_ok(async {
903            let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
904            {
905                let member = next_member(&mut reader).await?;
906                assert_eq!(member.effective_path()?.as_ref(), b"local");
907                assert!(matches!(
908                    member.effective_link_path(),
909                    Err(FrameError {
910                        position: 2048,
911                        inner: FrameErrorInner::DeletedPaxMetadata {
912                            keyword: "linkpath"
913                        },
914                    })
915                ));
916            }
917            let member = next_member(&mut reader).await?;
918            assert_eq!(member.effective_path()?.as_ref(), b"global");
919            assert_eq!(member.effective_link_path()?.as_ref(), b"global-link");
920            Ok(())
921        });
922    }
923
924    #[test]
925    fn rejects_empty_effective_member_paths() {
926        for (case, mut bytes) in [
927            ("pax-header", {
928                let mut bytes = Vec::new();
929                let mut member = header(b'0', 0);
930                set_field(&mut member, NAME_RANGE, b"");
931                set_field(&mut member, PREFIX_RANGE, b"");
932                set_checksum(&mut member);
933                append_block(&mut bytes, &member);
934                bytes
935            }),
936            ("gnu-header", {
937                let mut bytes = Vec::new();
938                let mut member = gnu_header(b'0', 0);
939                set_field(&mut member, NAME_RANGE, b"");
940                set_checksum(&mut member);
941                append_block(&mut bytes, &member);
942                bytes
943            }),
944            ("gnu-long-name", {
945                let mut bytes = Vec::new();
946                append_gnu(&mut bytes, b'L', b"\0");
947                let mut member = gnu_header(b'0', 0);
948                set_field(&mut member, NAME_RANGE, b"physical");
949                set_checksum(&mut member);
950                append_block(&mut bytes, &member);
951                bytes
952            }),
953        ] {
954            append_terminator(&mut bytes);
955            let result: Result<(), FrameError> = ready(async {
956                let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
957                let member = next_member(&mut reader).await?;
958                member.effective_path().map(|_| ())
959            });
960            assert!(
961                matches!(
962                    result,
963                    Err(FrameError {
964                        inner: FrameErrorInner::EmptyMemberPath,
965                        ..
966                    })
967                ),
968                "{case}: {result:?}"
969            );
970        }
971    }
972
973    #[test]
974    fn rejects_nul_in_effective_member_names() {
975        for (field, mut bytes) in [
976            ("path", {
977                let mut bytes = Vec::new();
978                append_pax(&mut bytes, b'x', &record("path", "bad\0name"));
979                append_block(&mut bytes, &header(b'0', 0));
980                bytes
981            }),
982            ("link path", {
983                let mut bytes = Vec::new();
984                append_pax(&mut bytes, b'x', &record("linkpath", "bad\0target"));
985                append_block(&mut bytes, &header(b'2', 0));
986                bytes
987            }),
988        ] {
989            append_terminator(&mut bytes);
990            let result: Result<(), FrameError> = ready(async {
991                let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
992                let member = next_member(&mut reader).await?;
993                if field == "path" {
994                    member.effective_path().map(|_| ())
995                } else {
996                    member.effective_link_path().map(|_| ())
997                }
998            });
999            assert!(
1000                matches!(
1001                    result,
1002                    Err(FrameError {
1003                        inner: FrameErrorInner::NulInMemberName { field: found },
1004                        ..
1005                    }) if found == field
1006                ),
1007                "{field}: {result:?}"
1008            );
1009        }
1010    }
1011
1012    #[test]
1013    fn ignores_nul_in_overridden_pax_member_names() {
1014        let mut global = record("path", "bad\0name");
1015        global.extend_from_slice(&record("linkpath", "bad\0target"));
1016        let mut local = record("path", "good-name");
1017        local.extend_from_slice(&record("linkpath", "good-target"));
1018        let mut bytes = Vec::new();
1019        append_pax(&mut bytes, b'g', &global);
1020        append_pax(&mut bytes, b'x', &local);
1021        append_block(&mut bytes, &header(b'2', 0));
1022        append_terminator(&mut bytes);
1023
1024        ready_ok(async {
1025            let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
1026            let member = next_member(&mut reader).await?;
1027            assert_eq!(member.effective_path()?.as_ref(), b"good-name");
1028            assert_eq!(member.effective_link_path()?.as_ref(), b"good-target");
1029            Ok(())
1030        });
1031    }
1032
1033    #[test]
1034    fn accepts_nonempty_extension_paths_over_empty_header_names() {
1035        for (case, mut bytes, expected) in [
1036            (
1037                "pax",
1038                {
1039                    let mut bytes = Vec::new();
1040                    append_pax(&mut bytes, b'x', &record("path", "pax-name"));
1041                    let mut member = header(b'0', 0);
1042                    set_field(&mut member, NAME_RANGE, b"");
1043                    set_field(&mut member, PREFIX_RANGE, b"");
1044                    set_checksum(&mut member);
1045                    append_block(&mut bytes, &member);
1046                    bytes
1047                },
1048                b"pax-name".as_slice(),
1049            ),
1050            (
1051                "gnu",
1052                {
1053                    let mut bytes = Vec::new();
1054                    append_gnu(&mut bytes, b'L', b"gnu-name\0");
1055                    let mut member = gnu_header(b'0', 0);
1056                    set_field(&mut member, NAME_RANGE, b"");
1057                    set_checksum(&mut member);
1058                    append_block(&mut bytes, &member);
1059                    bytes
1060                },
1061                b"gnu-name".as_slice(),
1062            ),
1063        ] {
1064            append_terminator(&mut bytes);
1065            ready_ok(async {
1066                let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
1067                let member = next_member(&mut reader).await?;
1068                assert_eq!(member.effective_path()?.as_ref(), expected, "{case}");
1069                Ok(())
1070            });
1071        }
1072    }
1073
1074    #[test]
1075    fn global_path_deletion_suppresses_the_physical_header_path() {
1076        let mut physical_header = header(b'0', 0);
1077        set_field(&mut physical_header, NAME_RANGE, b"physical");
1078        set_checksum(&mut physical_header);
1079
1080        let mut bytes = Vec::new();
1081        append_pax(&mut bytes, b'g', &record("path", "global"));
1082        append_block(&mut bytes, &header(b'0', 0));
1083        append_pax(&mut bytes, b'g', &record("path", ""));
1084        append_block(&mut bytes, &physical_header);
1085        append_terminator(&mut bytes);
1086
1087        ready_ok(async {
1088            let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
1089            {
1090                let member = next_member(&mut reader).await?;
1091                assert_eq!(member.effective_path()?.as_ref(), b"global");
1092            }
1093
1094            let member = next_member(&mut reader).await?;
1095            assert!(matches!(
1096                member.effective_path(),
1097                Err(FrameError {
1098                    inner: FrameErrorInner::DeletedPaxMetadata { keyword: "path" },
1099                    ..
1100                })
1101            ));
1102            let state = pax_state(&member).expect("expected pax member metadata");
1103            assert_eq!(
1104                state.effective_record(&PaxKeyword::Path),
1105                Some(&PaxRecord::Path(PaxValue::Deleted))
1106            );
1107            let extensions = state.extensions().collect::<Vec<_>>();
1108            assert_eq!(extensions.len(), 1);
1109            assert!(matches!(
1110                extensions[0].records(),
1111                [PaxRecord::Path(PaxValue::Deleted)]
1112            ));
1113            Ok(())
1114        });
1115    }
1116
1117    #[test]
1118    fn resolves_and_validates_gnu_metadata_lazily() {
1119        let mut bytes = Vec::new();
1120        append_block(&mut bytes, &gnu_header(b'L', 5));
1121        append_payload(&mut bytes, b"name\0");
1122        append_block(&mut bytes, &gnu_header(b'K', 5));
1123        append_payload(&mut bytes, b"link\0");
1124        append_block(&mut bytes, &gnu_header(b'2', 0));
1125        append_terminator(&mut bytes);
1126
1127        ready_ok(async {
1128            let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
1129            let member = next_member(&mut reader).await?;
1130            assert_eq!(member.effective_path()?.as_ref(), b"name");
1131            assert_eq!(member.effective_link_path()?.as_ref(), b"link");
1132            Ok(())
1133        });
1134
1135        for (typeflag, payload, kind) in [
1136            (b'L', b"no-nul".as_slice(), GnuKind::LongName),
1137            (b'K', b"link\0bad".as_slice(), GnuKind::LongLink),
1138        ] {
1139            let mut bytes = Vec::new();
1140            append_gnu(&mut bytes, typeflag, payload);
1141            append_block(&mut bytes, &gnu_header(b'2', 0));
1142            append_terminator(&mut bytes);
1143            let result: Result<(), FrameError> = ready(async {
1144                let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
1145                let member = next_member(&mut reader).await?;
1146                match kind {
1147                    GnuKind::LongName => member.effective_path().map(|_| ()),
1148                    GnuKind::LongLink => member.effective_link_path().map(|_| ()),
1149                }
1150            });
1151            assert!(matches!(
1152                result,
1153                Err(FrameError {
1154                    position: 0,
1155                    inner: FrameErrorInner::InvalidGnuMetadata { kind: found, .. },
1156                }) if found == kind
1157            ));
1158        }
1159    }
1160
1161    #[test]
1162    fn groups_pax_metadata_and_streams_member_payload() {
1163        let mut global = record("comment", "first");
1164        global.extend_from_slice(&record("comment", "last"));
1165        let mut local = record("path", "renamed");
1166        local.extend_from_slice(&record("size", "513"));
1167        let mut bytes = Vec::new();
1168        append_pax(&mut bytes, b'g', &global);
1169        append_pax(&mut bytes, b'x', &local);
1170        append_block(&mut bytes, &header(b'0', 1));
1171        append_payload(&mut bytes, &[b'a'; BLOCK_SIZE]);
1172        append_payload(&mut bytes, b"b");
1173        append_terminator(&mut bytes);
1174
1175        ready_ok(async {
1176            let mut reader = TarReader::new(ChunkedReader::new(bytes, 17));
1177            {
1178                let mut member = next_member(&mut reader).await?;
1179                assert_eq!(member.header.effective_size, 513);
1180                let state = pax_state(&member).expect("expected pax member metadata");
1181                let extensions = state.extensions().collect::<Vec<_>>();
1182                assert_eq!(extensions.len(), 2);
1183                assert_eq!(extensions[0].position, 0);
1184                assert_eq!(extensions[0].kind, PaxKind::Global);
1185                assert_eq!(
1186                    extensions[0].records(),
1187                    [
1188                        PaxRecord::Comment(PaxValue::Value("first".into())),
1189                        PaxRecord::Comment(PaxValue::Value("last".into())),
1190                    ]
1191                );
1192                assert_eq!(extensions[1].position, (BLOCK_SIZE * 2) as u64);
1193                assert_eq!(extensions[1].kind, PaxKind::Local);
1194                assert_eq!(
1195                    state.effective_record(&PaxKeyword::Size),
1196                    Some(&PaxRecord::Size(PaxValue::Value(513)))
1197                );
1198                assert_eq!(
1199                    state.effective_record(&PaxKeyword::Comment),
1200                    Some(&PaxRecord::Comment(PaxValue::Value("last".into())))
1201                );
1202                let Some(first) = member.payload.next_block().await? else {
1203                    panic!("expected first member payload block");
1204                };
1205                let Some(last) = member.payload.next_block().await? else {
1206                    panic!("expected last member payload block");
1207                };
1208                assert_eq!(first.len, BLOCK_SIZE);
1209                assert_eq!(last.len, 1);
1210                assert!(member.payload.next_block().await?.is_none());
1211            }
1212            assert!(reader.next_frame().await?.is_none());
1213            Ok(())
1214        });
1215    }
1216
1217    #[test]
1218    fn bounds_cumulative_global_pax_extension_payloads() {
1219        let payload = record("comment", "metadata");
1220        let payload_size = u64::try_from(payload.len()).expect("payload size should fit u64");
1221        let limit = payload_size
1222            .checked_mul(2)
1223            .expect("test payload total should fit u64");
1224
1225        let mut rejected = Vec::new();
1226        append_pax(&mut rejected, b'g', &payload);
1227        append_pax(&mut rejected, b'g', &payload);
1228        let rejected_position =
1229            u64::try_from(rejected.len()).expect("test position should fit u64");
1230        append_block(&mut rejected, &header(b'g', payload_size));
1231        let error: Result<(), FrameError> = ready(async {
1232            let mut reader = TarReader::new(ChunkedReader::new(rejected, BLOCK_SIZE));
1233            reader.set_max_global_pax_extensions_size(limit);
1234            reader.next_frame().await.map(|_| ())
1235        });
1236        assert!(matches!(
1237            error,
1238            Err(FrameError {
1239                position,
1240                inner: FrameErrorInner::GlobalPaxExtensionsTooLarge {
1241                    size,
1242                    limit: found_limit,
1243                },
1244            }) if position == rejected_position
1245                && size == payload_size * 3
1246                && found_limit == limit
1247        ));
1248
1249        let mut accepted = Vec::new();
1250        for _ in 0..2 {
1251            for _ in 0..3 {
1252                append_pax(&mut accepted, b'g', &payload);
1253            }
1254            append_block(&mut accepted, &header(b'0', 0));
1255        }
1256        append_terminator(&mut accepted);
1257        ready_ok(async {
1258            let mut reader = TarReader::new(ChunkedReader::new(accepted, BLOCK_SIZE));
1259            reader.set_max_global_pax_extensions_size(payload_size * 3);
1260            for _ in 0..2 {
1261                let member = next_member(&mut reader).await?;
1262                assert_eq!(
1263                    pax_state(&member)
1264                        .expect("expected pax member metadata")
1265                        .extensions()
1266                        .count(),
1267                    3
1268                );
1269            }
1270            Ok(())
1271        });
1272    }
1273
1274    #[test]
1275    fn retains_global_pax_extension_across_cancelled_reads() {
1276        let mut bytes = Vec::new();
1277        append_pax(&mut bytes, b'g', &record("comment", "metadata"));
1278        let after_extension_header = BLOCK_SIZE;
1279        let after_extension_payload = bytes.len();
1280        append_block(&mut bytes, &header(b'0', 0));
1281        append_terminator(&mut bytes);
1282
1283        for pending_at in [after_extension_header, after_extension_payload] {
1284            let mut reader = TarReader::new(ChunkedReader::pending_once(bytes.clone(), pending_at));
1285            cancel_pending(reader.next_frame());
1286
1287            ready_ok(async {
1288                let member = next_member(&mut reader).await?;
1289                let state = pax_state(&member).expect("expected pax member metadata");
1290                let extensions = state.extensions().collect::<Vec<_>>();
1291                assert_eq!(extensions.len(), 1);
1292                assert_eq!(extensions[0].position, 0);
1293                assert_eq!(extensions[0].kind, PaxKind::Global);
1294                assert_eq!(
1295                    extensions[0].records(),
1296                    &[PaxRecord::Comment(PaxValue::Value("metadata".into()))]
1297                );
1298                Ok(())
1299            });
1300        }
1301    }
1302
1303    #[test]
1304    fn retains_gnu_metadata_across_cancelled_reads() {
1305        let expected_name = vec![b'n'; BLOCK_SIZE + 10];
1306        let mut long_name = expected_name.clone();
1307        long_name.push(0);
1308
1309        let mut bytes = Vec::new();
1310        append_gnu(&mut bytes, b'L', &long_name);
1311        let after_first_payload_block = BLOCK_SIZE * 2;
1312        append_block(&mut bytes, &gnu_header(b'0', 0));
1313        append_terminator(&mut bytes);
1314
1315        let mut reader = TarReader::new(ChunkedReader::pending_once(
1316            bytes,
1317            after_first_payload_block,
1318        ));
1319        cancel_pending(reader.next_frame());
1320
1321        ready_ok(async {
1322            let member = next_member(&mut reader).await?;
1323            assert_eq!(member.effective_path()?.as_ref(), expected_name);
1324            Ok(())
1325        });
1326
1327        let mut bytes = Vec::new();
1328        append_gnu(&mut bytes, b'L', &[]);
1329        let after_extension_header = bytes.len();
1330        append_block(&mut bytes, &gnu_header(b'0', 0));
1331        append_terminator(&mut bytes);
1332        let mut reader = TarReader::new(ChunkedReader::pending_once(bytes, after_extension_header));
1333        cancel_pending(reader.next_frame());
1334
1335        ready_ok(async {
1336            let member = next_member(&mut reader).await?;
1337            assert!(matches!(
1338                &member.extensions,
1339                MemberExtensions::Gnu {
1340                    long_name: Some(GnuMetadata { payload, .. }),
1341                    ..
1342                } if payload.is_empty()
1343            ));
1344            Ok(())
1345        });
1346    }
1347
1348    #[test]
1349    fn applies_global_pax_updates_to_each_borrowed_state() {
1350        let first = record("comment", "first");
1351        let second = record("gname", "second");
1352        let replacement = record("comment", "replacement");
1353        let mut bytes = Vec::new();
1354        append_pax(&mut bytes, b'g', &first);
1355        append_pax(&mut bytes, b'g', &second);
1356        append_block(&mut bytes, &header(b'0', 0));
1357        append_block(&mut bytes, &header(b'0', 0));
1358        append_pax(&mut bytes, b'g', &replacement);
1359        append_block(&mut bytes, &header(b'0', 0));
1360        append_terminator(&mut bytes);
1361
1362        ready_ok(async {
1363            let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
1364            {
1365                let member = next_member(&mut reader).await?;
1366                let state = pax_state(&member).expect("expected pax member metadata");
1367                let extensions = state.extensions().collect::<Vec<_>>();
1368                assert_eq!(extensions.len(), 2);
1369                assert_eq!(extensions[0].position, 0);
1370                assert_eq!(extensions[1].position, (BLOCK_SIZE * 2) as u64);
1371                assert_eq!(
1372                    state.effective_record(&PaxKeyword::Comment),
1373                    Some(&PaxRecord::Comment(PaxValue::Value("first".into())))
1374                );
1375            }
1376            {
1377                let member = next_member(&mut reader).await?;
1378                let state = pax_state(&member).expect("expected pax member metadata");
1379                assert_eq!(state.extensions().count(), 0);
1380                assert_eq!(
1381                    state.effective_record(&PaxKeyword::Comment),
1382                    Some(&PaxRecord::Comment(PaxValue::Value("first".into())))
1383                );
1384            }
1385
1386            let member = next_member(&mut reader).await?;
1387            let state = pax_state(&member).expect("expected pax member metadata");
1388            let extensions = state.extensions().collect::<Vec<_>>();
1389            assert_eq!(extensions.len(), 1);
1390            assert_eq!(extensions[0].kind, PaxKind::Global);
1391            assert_eq!(
1392                state.effective_record(&PaxKeyword::Comment),
1393                Some(&PaxRecord::Comment(PaxValue::Value("replacement".into())))
1394            );
1395            Ok(())
1396        });
1397    }
1398
1399    #[test]
1400    fn streams_member_payload_in_reusable_chunks() {
1401        let payload = (0..BLOCK_SIZE * 3 + 7)
1402            .map(|index| u8::try_from(index % 251).unwrap())
1403            .collect::<Vec<_>>();
1404        let mut bytes = Vec::new();
1405        append_pax(&mut bytes, b'0', &payload);
1406        append_terminator(&mut bytes);
1407
1408        ready_ok(async {
1409            let mut reader = TarReader::new(ChunkedReader::new(bytes, 17));
1410            let mut member = next_member(&mut reader).await?;
1411            let mut chunk = vec![b'x'; BLOCK_SIZE * 2];
1412            assert!(
1413                member
1414                    .payload
1415                    .next_chunk(&mut chunk, BLOCK_SIZE + 1)
1416                    .await?
1417            );
1418            let allocation = chunk.as_ptr();
1419            assert_eq!(chunk, payload[..BLOCK_SIZE * 2]);
1420            assert!(
1421                member
1422                    .payload
1423                    .next_chunk(&mut chunk, BLOCK_SIZE + 1)
1424                    .await?
1425            );
1426            assert_eq!(chunk.as_ptr(), allocation);
1427            assert_eq!(chunk, payload[BLOCK_SIZE * 2..]);
1428            assert!(
1429                !member
1430                    .payload
1431                    .next_chunk(&mut chunk, BLOCK_SIZE + 1)
1432                    .await?
1433            );
1434            assert_eq!(chunk, payload[BLOCK_SIZE * 2..]);
1435            assert!(reader.next_frame().await?.is_none());
1436            Ok(())
1437        });
1438    }
1439
1440    #[test]
1441    fn resumes_cancelled_member_payload_chunk_with_either_read_api() {
1442        let payload = (0..BLOCK_SIZE * 2 + 17)
1443            .map(|index| u8::try_from(index % 251).expect("test byte should fit"))
1444            .collect::<Vec<_>>();
1445        let (bytes, next_member_position) = member_followed_by_empty_member(&payload);
1446
1447        ready_ok(async {
1448            let mut reader = TarReader::new(ChunkedReader::pending_once(bytes, BLOCK_SIZE + 73));
1449            {
1450                let mut member = next_member(&mut reader).await?;
1451                let mut cancelled_buffer = vec![b'x'; 17];
1452                cancel_pending(
1453                    member
1454                        .payload
1455                        .next_chunk(&mut cancelled_buffer, payload.len()),
1456                );
1457                assert!(cancelled_buffer.is_empty());
1458
1459                let first = member
1460                    .payload
1461                    .next_block()
1462                    .await?
1463                    .expect("cancelled chunk should resume as a payload block");
1464                let mut resumed_buffer = vec![b'y'; 23];
1465                assert!(member.payload.next_chunk(&mut resumed_buffer, 1).await?);
1466                let mut observed = first.block[..first.len].to_vec();
1467                observed.extend_from_slice(&resumed_buffer);
1468                assert_eq!(observed, payload);
1469                assert!(!member.payload.next_chunk(&mut resumed_buffer, 1).await?);
1470            }
1471
1472            let member = next_member(&mut reader).await?;
1473            assert_eq!(member.header.position, next_member_position);
1474            Ok(())
1475        });
1476    }
1477
1478    #[test]
1479    fn resumes_cancelled_member_payload_block_during_automatic_drain() {
1480        let payload = vec![b'x'; BLOCK_SIZE * 2 + 17];
1481        let (bytes, next_member_position) = member_followed_by_empty_member(&payload);
1482
1483        ready_ok(async {
1484            let mut reader = TarReader::new(ChunkedReader::pending_once(bytes, BLOCK_SIZE + 73));
1485            {
1486                let mut member = next_member(&mut reader).await?;
1487                cancel_pending(member.payload.next_block());
1488            }
1489
1490            let member = next_member(&mut reader).await?;
1491            assert_eq!(member.header.position, next_member_position);
1492            drop(member);
1493            assert!(reader.next_frame().await?.is_none());
1494            Ok(())
1495        });
1496    }
1497
1498    #[test]
1499    fn resumes_cancelled_automatic_payload_drain() {
1500        let payload = vec![b'x'; BLOCK_SIZE * 2 + 17];
1501        let (bytes, next_member_position) = member_followed_by_empty_member(&payload);
1502
1503        ready_ok(async {
1504            let mut reader = TarReader::new(ChunkedReader::pending_once(bytes, BLOCK_SIZE + 73));
1505            drop(next_member(&mut reader).await?);
1506            cancel_pending(reader.next_frame());
1507
1508            let member = next_member(&mut reader).await?;
1509            assert_eq!(member.header.position, next_member_position);
1510            drop(member);
1511            assert!(reader.next_frame().await?.is_none());
1512            Ok(())
1513        });
1514    }
1515
1516    #[test]
1517    fn reports_cancelled_chunk_errors_at_physical_block_boundaries() {
1518        #[derive(Clone, Copy, Debug)]
1519        enum ExpectedError {
1520            TruncatedPayload,
1521            IncompleteBlock,
1522        }
1523
1524        for (expected, trailing_byte) in [
1525            (ExpectedError::TruncatedPayload, None),
1526            (ExpectedError::IncompleteBlock, Some(b'x')),
1527        ] {
1528            let mut bytes = Vec::new();
1529            append_block(&mut bytes, &header(b'0', (BLOCK_SIZE + 1) as u64));
1530            append_payload(&mut bytes, b"payload");
1531            if let Some(trailing_byte) = trailing_byte {
1532                bytes.push(trailing_byte);
1533            }
1534            let error = ready(async {
1535                let mut reader =
1536                    TarReader::new(ChunkedReader::pending_once(bytes, BLOCK_SIZE + 73));
1537                let Ok(Some(mut member)) = reader.next_frame().await else {
1538                    panic!("expected member");
1539                };
1540                let mut buffer = Vec::new();
1541                cancel_pending(member.payload.next_chunk(&mut buffer, BLOCK_SIZE * 2));
1542                member.payload.next_chunk(&mut buffer, BLOCK_SIZE * 2).await
1543            });
1544            let Err(FrameError { position, inner }) = &error else {
1545                panic!("{expected:?}: expected error, got {error:?}");
1546            };
1547            assert_eq!(*position, (BLOCK_SIZE * 2) as u64, "{expected:?}");
1548            assert!(
1549                matches!(
1550                    (expected, inner),
1551                    (
1552                        ExpectedError::TruncatedPayload,
1553                        FrameErrorInner::TruncatedPayload {
1554                            owner: DataOwner::Member,
1555                            remaining: 1,
1556                        },
1557                    ) | (
1558                        ExpectedError::IncompleteBlock,
1559                        FrameErrorInner::IncompleteBlock { read: 1 },
1560                    )
1561                ),
1562                "{expected:?}: {error:?}"
1563            );
1564        }
1565    }
1566
1567    #[test]
1568    fn groups_gnu_metadata_with_its_member() {
1569        let mut bytes = Vec::new();
1570        append_block(&mut bytes, &gnu_header(b'L', 5));
1571        append_payload(&mut bytes, b"name\0");
1572        append_block(&mut bytes, &gnu_header(b'K', 5));
1573        append_payload(&mut bytes, b"link\0");
1574        append_block(&mut bytes, &gnu_header(b'2', 0));
1575        append_terminator(&mut bytes);
1576
1577        ready_ok(async {
1578            let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
1579            let mut member = next_member(&mut reader).await?;
1580            let MemberExtensions::Gnu {
1581                long_name: Some(long_name),
1582                long_link: Some(long_link),
1583            } = &member.extensions
1584            else {
1585                panic!("expected GNU extensions");
1586            };
1587            assert_eq!(long_name.payload, b"name\0");
1588            assert_eq!(long_link.payload, b"link\0");
1589            assert!(member.payload.next_block().await?.is_none());
1590            Ok(())
1591        });
1592    }
1593
1594    #[test]
1595    fn rejects_oversized_gnu_extensions_before_consuming_payload() {
1596        let declared_size = 9;
1597        for (case, typeflag) in [("long-name", b'L'), ("long-link", b'K')] {
1598            let mut reader = TarReader::new(ChunkedReader::new(
1599                gnu_header(typeflag, declared_size).to_vec(),
1600                BLOCK_SIZE,
1601            ));
1602            reader.set_max_gnu_extension_size(declared_size - 1);
1603            assert!(
1604                matches!(
1605                    ready(reader.next_frame()),
1606                    Err(FrameError {
1607                        position: 0,
1608                        inner: FrameErrorInner::ExtensionTooLarge {
1609                            format: ArchiveFormat::Gnu,
1610                            size,
1611                            limit,
1612                        },
1613                    }) if size == declared_size && limit == declared_size - 1
1614                ),
1615                "{case}"
1616            );
1617        }
1618
1619        let mut reader = TarReader::new(ChunkedReader::new(
1620            gnu_header(b'L', DEFAULT_MAX_GNU_EXTENSION_SIZE + 1).to_vec(),
1621            BLOCK_SIZE,
1622        ));
1623        assert!(matches!(
1624            ready(reader.next_frame()),
1625            Err(FrameError {
1626                position: 0,
1627                inner: FrameErrorInner::ExtensionTooLarge {
1628                    format: ArchiveFormat::Gnu,
1629                    size,
1630                    limit: DEFAULT_MAX_GNU_EXTENSION_SIZE,
1631                },
1632            }) if size == DEFAULT_MAX_GNU_EXTENSION_SIZE + 1
1633        ));
1634    }
1635
1636    #[test]
1637    fn logical_reader_is_fused_after_oversized_gnu_extension() {
1638        let payload = b"renamed\0";
1639        let payload_size = u64::try_from(payload.len()).expect("payload size should fit u64");
1640        let mut bytes = Vec::new();
1641        append_gnu(&mut bytes, b'L', payload);
1642        append_block(&mut bytes, &gnu_header(b'0', 0));
1643        append_terminator(&mut bytes);
1644
1645        ready_ok(async {
1646            let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
1647            reader.set_max_gnu_extension_size(payload_size - 1);
1648            assert!(matches!(
1649                reader.next_frame().await,
1650                Err(FrameError {
1651                    position: 0,
1652                    inner: FrameErrorInner::ExtensionTooLarge {
1653                        format: ArchiveFormat::Gnu,
1654                        size,
1655                        limit,
1656                    },
1657                }) if size == payload_size && limit == payload_size - 1
1658            ));
1659            assert!(reader.next_frame().await?.is_none());
1660            Ok(())
1661        });
1662    }
1663
1664    #[test]
1665    fn preserves_multiblock_gnu_metadata_payloads() {
1666        let mut long_name = vec![b'n'; BLOCK_SIZE * 2 + 37];
1667        long_name.push(0);
1668        let mut long_link = vec![b'l'; BLOCK_SIZE + 19];
1669        long_link.push(0);
1670
1671        let mut bytes = Vec::new();
1672        append_gnu(&mut bytes, b'L', &long_name);
1673        append_gnu(&mut bytes, b'K', &long_link);
1674        append_block(&mut bytes, &gnu_header(b'2', 0));
1675        append_terminator(&mut bytes);
1676
1677        ready_ok(async {
1678            let mut reader = TarReader::new(ChunkedReader::new(bytes, 19));
1679            let member = next_member(&mut reader).await?;
1680            let MemberExtensions::Gnu {
1681                long_name: Some(name_metadata),
1682                long_link: Some(link_metadata),
1683            } = &member.extensions
1684            else {
1685                panic!("expected GNU extensions");
1686            };
1687            assert_eq!(name_metadata.position, 0);
1688            assert_eq!(name_metadata.payload, long_name);
1689            assert_eq!(link_metadata.position, (BLOCK_SIZE * 4) as u64);
1690            assert_eq!(link_metadata.payload, long_link);
1691            member.payload.skip().await?;
1692            assert!(reader.next_frame().await?.is_none());
1693            Ok(())
1694        });
1695    }
1696
1697    #[test]
1698    fn handles_empty_archives_and_trailing_global_pax() {
1699        let mut empty = Vec::new();
1700        append_terminator(&mut empty);
1701        ready_ok(async {
1702            let mut reader = TarReader::new(ChunkedReader::new(empty, BLOCK_SIZE));
1703            assert!(reader.next_frame().await?.is_none());
1704            Ok(())
1705        });
1706
1707        for header in [
1708            header(b'x', record("path", "name").len() as u64),
1709            gnu_header(b'L', 0),
1710        ] {
1711            let mut bytes = Vec::new();
1712            append_block(&mut bytes, &header);
1713            if header[TYPEFLAG_OFFSET] == b'x' {
1714                append_payload(&mut bytes, &record("path", "name"));
1715            }
1716            let error: Result<(), FrameError> = ready(async {
1717                let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
1718                reader.next_frame().await.map(|_| ())
1719            });
1720            assert!(matches!(
1721                error,
1722                Err(FrameError {
1723                    inner: FrameErrorInner::UnexpectedEof { .. },
1724                    ..
1725                })
1726            ));
1727        }
1728
1729        let mut global = Vec::new();
1730        append_pax(&mut global, b'g', &record("comment", "metadata"));
1731        append_pax(&mut global, b'g', &record("gname", "group"));
1732        append_terminator(&mut global);
1733        ready_ok(async {
1734            let mut reader = TarReader::new(ChunkedReader::new(global, BLOCK_SIZE));
1735            assert!(reader.next_frame().await?.is_none());
1736            Ok(())
1737        });
1738
1739        let mut malformed_global = Vec::new();
1740        append_pax(&mut malformed_global, b'g', b"invalid");
1741        append_terminator(&mut malformed_global);
1742        let error: Result<(), FrameError> = ready(async {
1743            let mut reader = TarReader::new(ChunkedReader::new(malformed_global, BLOCK_SIZE));
1744            reader.next_frame().await.map(|_| ())
1745        });
1746        assert!(matches!(
1747            error,
1748            Err(FrameError {
1749                position: 0,
1750                inner: FrameErrorInner::InvalidPaxRecord { .. },
1751            })
1752        ));
1753    }
1754
1755    #[test]
1756    fn skips_unread_payload_before_advancing() {
1757        for payload_len in [BLOCK_SIZE + 1, PAYLOAD_DRAIN_CHUNK_BYTES + 7] {
1758            let payload = vec![b'a'; payload_len];
1759            let mut bytes = Vec::new();
1760            append_pax(&mut bytes, b'0', &payload);
1761            append_block(&mut bytes, &header(b'0', 0));
1762            append_terminator(&mut bytes);
1763
1764            ready_ok(async {
1765                let mut reader = TarReader::new(ChunkedReader::new(bytes, BLOCK_SIZE));
1766                {
1767                    let member = next_member(&mut reader).await?;
1768                    member.payload.skip().await?;
1769                }
1770                let member = next_member(&mut reader).await?;
1771                assert_eq!(member.header.effective_size, 0);
1772                drop(member);
1773                assert!(reader.next_frame().await?.is_none());
1774                Ok(())
1775            });
1776        }
1777
1778        let mut auto_bytes = Vec::new();
1779        append_block(&mut auto_bytes, &header(b'0', 1));
1780        append_payload(&mut auto_bytes, b"a");
1781        append_block(&mut auto_bytes, &header(b'0', 0));
1782        append_terminator(&mut auto_bytes);
1783        ready_ok(async {
1784            let mut reader = TarReader::new(ChunkedReader::new(auto_bytes, BLOCK_SIZE));
1785            let first = next_member(&mut reader).await?;
1786            drop(first);
1787            assert!(reader.next_frame().await?.is_some());
1788            Ok(())
1789        });
1790    }
1791
1792    #[test]
1793    fn reports_truncated_payload_when_read_or_skipped() {
1794        #[derive(Clone, Copy, Debug)]
1795        enum Operation {
1796            Read,
1797            ExplicitSkip,
1798            AutomaticSkip,
1799        }
1800
1801        for operation in [
1802            Operation::Read,
1803            Operation::ExplicitSkip,
1804            Operation::AutomaticSkip,
1805        ] {
1806            let result: Result<(), FrameError> = ready(async {
1807                let mut reader =
1808                    TarReader::new(ChunkedReader::new(header(b'0', 1).to_vec(), BLOCK_SIZE));
1809                let Ok(Some(mut member)) = reader.next_frame().await else {
1810                    panic!("expected member");
1811                };
1812                match operation {
1813                    Operation::Read => member.payload.next_block().await.map(|_| ()),
1814                    Operation::ExplicitSkip => member.payload.skip().await,
1815                    Operation::AutomaticSkip => {
1816                        drop(member);
1817                        reader.next_frame().await.map(|_| ())
1818                    }
1819                }
1820            });
1821            assert!(
1822                matches!(
1823                    result,
1824                    Err(FrameError {
1825                        inner: FrameErrorInner::TruncatedPayload {
1826                            owner: DataOwner::Member,
1827                            ..
1828                        },
1829                        ..
1830                    })
1831                ),
1832                "{operation:?}"
1833            );
1834        }
1835    }
1836}