async_tar/
archive.rs

1use std::{
2    cmp,
3    pin::Pin,
4    sync::{Arc, Mutex},
5};
6
7use async_std::{
8    fs, io,
9    io::prelude::*,
10    path::Path,
11    prelude::*,
12    stream::Stream,
13    task::{Context, Poll},
14};
15use pin_project::pin_project;
16
17use crate::{
18    Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
19    entry::{EntryFields, EntryIo},
20    error::TarError,
21    other,
22    pax::pax_extensions,
23};
24
25/// A top-level representation of an archive file.
26///
27/// This archive can have an entry added to it and it can be iterated over.
28#[derive(Debug)]
29pub struct Archive<R: Read + Unpin> {
30    inner: Arc<Mutex<ArchiveInner<R>>>,
31}
32
33impl<R: Read + Unpin> Clone for Archive<R> {
34    fn clone(&self) -> Self {
35        Archive {
36            inner: self.inner.clone(),
37        }
38    }
39}
40
41#[pin_project]
42#[derive(Debug)]
43pub struct ArchiveInner<R: Read + Unpin> {
44    pos: u64,
45    unpack_xattrs: bool,
46    preserve_permissions: bool,
47    preserve_mtime: bool,
48    ignore_zeros: bool,
49    #[pin]
50    obj: R,
51}
52
53/// Configure the archive.
54pub struct ArchiveBuilder<R: Read + Unpin> {
55    obj: R,
56    unpack_xattrs: bool,
57    preserve_permissions: bool,
58    preserve_mtime: bool,
59    ignore_zeros: bool,
60}
61
62impl<R: Read + Unpin> ArchiveBuilder<R> {
63    /// Create a new builder.
64    pub fn new(obj: R) -> Self {
65        ArchiveBuilder {
66            unpack_xattrs: false,
67            preserve_permissions: false,
68            preserve_mtime: true,
69            ignore_zeros: false,
70            obj,
71        }
72    }
73
74    /// Indicate whether extended file attributes (xattrs on Unix) are preserved
75    /// when unpacking this archive.
76    ///
77    /// This flag is disabled by default and is currently only implemented on
78    /// Unix using xattr support. This may eventually be implemented for
79    /// Windows, however, if other archive implementations are found which do
80    /// this as well.
81    pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
82        self.unpack_xattrs = unpack_xattrs;
83        self
84    }
85
86    /// Indicate whether extended permissions (like suid on Unix) are preserved
87    /// when unpacking this entry.
88    ///
89    /// This flag is disabled by default and is currently only implemented on
90    /// Unix.
91    pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
92        self.preserve_permissions = preserve;
93        self
94    }
95
96    /// Indicate whether access time information is preserved when unpacking
97    /// this entry.
98    ///
99    /// This flag is enabled by default.
100    pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
101        self.preserve_mtime = preserve;
102        self
103    }
104
105    /// Ignore zeroed headers, which would otherwise indicate to the archive that it has no more
106    /// entries.
107    ///
108    /// This can be used in case multiple tar archives have been concatenated together.
109    pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
110        self.ignore_zeros = ignore_zeros;
111        self
112    }
113
114    /// Construct the archive, ready to accept inputs.
115    pub fn build(self) -> Archive<R> {
116        let Self {
117            unpack_xattrs,
118            preserve_permissions,
119            preserve_mtime,
120            ignore_zeros,
121            obj,
122        } = self;
123
124        Archive {
125            inner: Arc::new(Mutex::new(ArchiveInner {
126                unpack_xattrs,
127                preserve_permissions,
128                preserve_mtime,
129                ignore_zeros,
130                obj,
131                pos: 0,
132            })),
133        }
134    }
135}
136
137impl<R: Read + Unpin> Archive<R> {
138    /// Create a new archive with the underlying object as the reader.
139    pub fn new(obj: R) -> Archive<R> {
140        Archive {
141            inner: Arc::new(Mutex::new(ArchiveInner {
142                unpack_xattrs: false,
143                preserve_permissions: false,
144                preserve_mtime: true,
145                ignore_zeros: false,
146                obj,
147                pos: 0,
148            })),
149        }
150    }
151
152    /// Unwrap this archive, returning the underlying object.
153    pub fn into_inner(self) -> Result<R, Self> {
154        match Arc::try_unwrap(self.inner) {
155            Ok(inner) => Ok(inner.into_inner().unwrap().obj),
156            Err(inner) => Err(Self { inner }),
157        }
158    }
159
160    /// Construct an stream over the entries in this archive.
161    ///
162    /// Note that care must be taken to consider each entry within an archive in
163    /// sequence. If entries are processed out of sequence (from what the
164    /// stream returns), then the contents read for each entry may be
165    /// corrupted.
166    pub fn entries(self) -> io::Result<Entries<R>> {
167        if self.inner.lock().unwrap().pos != 0 {
168            return Err(other(
169                "cannot call entries unless archive is at \
170                 position 0",
171            ));
172        }
173
174        Ok(Entries {
175            archive: self,
176            current: State::default(),
177            fields: None,
178            gnu_longlink: None,
179            gnu_longname: None,
180            pax_extensions: None,
181        })
182    }
183
184    /// Construct an stream over the raw entries in this archive.
185    ///
186    /// Note that care must be taken to consider each entry within an archive in
187    /// sequence. If entries are processed out of sequence (from what the
188    /// stream returns), then the contents read for each entry may be
189    /// corrupted.
190    pub fn entries_raw(self) -> io::Result<RawEntries<R>> {
191        if self.inner.lock().unwrap().pos != 0 {
192            return Err(other(
193                "cannot call entries_raw unless archive is at \
194                 position 0",
195            ));
196        }
197
198        Ok(RawEntries {
199            archive: self,
200            current: (0, None, 0),
201        })
202    }
203
204    /// Unpacks the contents tarball into the specified `dst`.
205    ///
206    /// This function will iterate over the entire contents of this tarball,
207    /// extracting each file in turn to the location specified by the entry's
208    /// path name.
209    ///
210    /// This operation is relatively sensitive in that it will not write files
211    /// outside of the path specified by `dst`. Files in the archive which have
212    /// a '..' in their path are skipped during the unpacking process.
213    ///
214    /// # Examples
215    ///
216    /// ```no_run
217    /// # fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> { async_std::task::block_on(async {
218    /// #
219    /// use async_std::fs::File;
220    /// use async_tar::Archive;
221    ///
222    /// let mut ar = Archive::new(File::open("foo.tar").await?);
223    /// ar.unpack("foo").await?;
224    /// #
225    /// # Ok(()) }) }
226    /// ```
227    pub async fn unpack<P: AsRef<Path>>(self, dst: P) -> io::Result<()> {
228        let mut entries = self.entries()?;
229        let mut pinned = Pin::new(&mut entries);
230        let dst = dst.as_ref();
231
232        if dst.symlink_metadata().await.is_err() {
233            fs::create_dir_all(&dst)
234                .await
235                .map_err(|e| TarError::new(&format!("failed to create `{}`", dst.display()), e))?;
236        }
237
238        // Canonicalizing the dst directory will prepend the path with '\\?\'
239        // on windows which will allow windows APIs to treat the path as an
240        // extended-length path with a 32,767 character limit. Otherwise all
241        // unpacked paths over 260 characters will fail on creation with a
242        // NotFound exception.
243        let dst = &dst
244            .canonicalize()
245            .await
246            .unwrap_or_else(|_| dst.to_path_buf());
247
248        // Delay any directory entries until the end (they will be created if needed by
249        // descendants), to ensure that directory permissions do not interfer with descendant
250        // extraction.
251        let mut directories = Vec::new();
252        while let Some(entry) = pinned.next().await {
253            let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
254            if file.header().entry_type() == crate::EntryType::Directory {
255                directories.push(file);
256            } else {
257                file.unpack_in(dst).await?;
258            }
259        }
260        for mut dir in directories {
261            dir.unpack_in(dst).await?;
262        }
263
264        Ok(())
265    }
266}
267
268#[derive(Debug, Default)]
269struct State {
270    next: u64,
271    current_header: Option<Header>,
272    current_header_pos: usize,
273    current_ext: Option<GnuExtSparseHeader>,
274    pax_extensions: Option<Vec<u8>>,
275}
276
277/// Stream of `Entry`s.
278#[pin_project]
279#[derive(Debug)]
280pub struct Entries<R: Read + Unpin> {
281    archive: Archive<R>,
282    current: State,
283    fields: Option<EntryFields<Archive<R>>>,
284    gnu_longname: Option<Vec<u8>>,
285    gnu_longlink: Option<Vec<u8>>,
286    pax_extensions: Option<Vec<u8>>,
287}
288
289macro_rules! ready_opt_err {
290    ($val:expr) => {
291        match async_std::task::ready!($val) {
292            Some(Ok(val)) => val,
293            Some(Err(err)) => return Poll::Ready(Some(Err(err))),
294            None => return Poll::Ready(None),
295        }
296    };
297}
298
299macro_rules! ready_err {
300    ($val:expr) => {
301        match async_std::task::ready!($val) {
302            Ok(val) => val,
303            Err(err) => return Poll::Ready(Some(Err(err))),
304        }
305    };
306}
307
308impl<R: Read + Unpin> Stream for Entries<R> {
309    type Item = io::Result<Entry<Archive<R>>>;
310
311    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312        let mut this = self.project();
313        loop {
314            let State {
315                next,
316                current_header,
317                current_header_pos,
318                pax_extensions,
319                ..
320            } = &mut this.current;
321
322            let fields = if let Some(fields) = this.fields.as_mut() {
323                fields
324            } else {
325                *this.fields = Some(EntryFields::from(ready_opt_err!(poll_next_raw(
326                    this.archive,
327                    next,
328                    current_header,
329                    current_header_pos,
330                    pax_extensions.as_deref(),
331                    cx
332                ))));
333                continue;
334            };
335
336            let is_recognized_header =
337                fields.header.as_gnu().is_some() || fields.header.as_ustar().is_some();
338            if is_recognized_header && fields.header.entry_type().is_gnu_longname() {
339                if this.gnu_longname.is_some() {
340                    return Poll::Ready(Some(Err(other(
341                        "two long name entries describing \
342                         the same member",
343                    ))));
344                }
345
346                *this.gnu_longname = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
347                *this.fields = None;
348                continue;
349            }
350
351            if is_recognized_header && fields.header.entry_type().is_gnu_longlink() {
352                if this.gnu_longlink.is_some() {
353                    return Poll::Ready(Some(Err(other(
354                        "two long name entries describing \
355                         the same member",
356                    ))));
357                }
358                *this.gnu_longlink = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
359                *this.fields = None;
360                continue;
361            }
362
363            if is_recognized_header && fields.header.entry_type().is_pax_local_extensions() {
364                if this.pax_extensions.is_some() {
365                    return Poll::Ready(Some(Err(other(
366                        "two pax extensions entries describing \
367                         the same member",
368                    ))));
369                }
370                *this.pax_extensions = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
371                this.current.pax_extensions = this.pax_extensions.clone();
372                *this.fields = None;
373                continue;
374            }
375
376            fields.long_pathname = this.gnu_longname.take();
377            fields.long_linkname = this.gnu_longlink.take();
378            fields.pax_extensions = this.pax_extensions.take();
379
380            let State {
381                next,
382                current_header_pos,
383                current_ext,
384                ..
385            } = &mut this.current;
386            ready_err!(poll_parse_sparse_header(
387                this.archive,
388                next,
389                current_ext,
390                current_header_pos,
391                fields,
392                cx
393            ));
394
395            return Poll::Ready(Some(Ok(this.fields.take().unwrap().into_entry())));
396        }
397    }
398}
399
400/// Stream of raw `Entry`s.
401pub struct RawEntries<R: Read + Unpin> {
402    archive: Archive<R>,
403    current: (u64, Option<Header>, usize),
404}
405
406impl<R: Read + Unpin> Stream for RawEntries<R> {
407    type Item = io::Result<Entry<Archive<R>>>;
408
409    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
410        let archive = self.archive.clone();
411        let (next, current_header, current_header_pos) = &mut self.current;
412        poll_next_raw(&archive, next, current_header, current_header_pos, None, cx)
413    }
414}
415
416fn poll_next_raw<R: Read + Unpin>(
417    archive: &Archive<R>,
418    next: &mut u64,
419    current_header: &mut Option<Header>,
420    current_header_pos: &mut usize,
421    pax_extensions_data: Option<&[u8]>,
422    cx: &mut Context<'_>,
423) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
424    let mut header_pos = *next;
425
426    loop {
427        let archive = archive.clone();
428        // Seek to the start of the next header in the archive
429        if current_header.is_none() {
430            let delta = *next - archive.inner.lock().unwrap().pos;
431            match async_std::task::ready!(poll_skip(archive.clone(), cx, delta)) {
432                Ok(_) => {}
433                Err(err) => return Poll::Ready(Some(Err(err))),
434            }
435
436            *current_header = Some(Header::new_old());
437            *current_header_pos = 0;
438        }
439
440        let header = current_header.as_mut().unwrap();
441
442        // EOF is an indicator that we are at the end of the archive.
443        match async_std::task::ready!(poll_try_read_all(
444            archive.clone(),
445            cx,
446            header.as_mut_bytes(),
447            current_header_pos,
448        )) {
449            Ok(true) => {}
450            Ok(false) => return Poll::Ready(None),
451            Err(err) => return Poll::Ready(Some(Err(err))),
452        }
453
454        // If a header is not all zeros, we have another valid header.
455        // Otherwise, check if we are ignoring zeros and continue, or break as if this is the
456        // end of the archive.
457        if !header.as_bytes().iter().all(|i| *i == 0) {
458            *next += 512;
459            break;
460        }
461
462        if !archive.inner.lock().unwrap().ignore_zeros {
463            return Poll::Ready(None);
464        }
465
466        *next += 512;
467        header_pos = *next;
468    }
469
470    let header = current_header.as_mut().unwrap();
471
472    // Make sure the checksum is ok
473    let sum = header.as_bytes()[..148]
474        .iter()
475        .chain(&header.as_bytes()[156..])
476        .fold(0, |a, b| a + (*b as u32))
477        + 8 * 32;
478    let cksum = header.cksum()?;
479    if sum != cksum {
480        return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
481    }
482
483    let file_pos = *next;
484
485    let mut header = current_header.take().unwrap();
486
487    // when pax extensions are available, the size should come from there.
488    let mut size = header.entry_size()?;
489
490    // the size above will be overriden by the pax data if it has a size field.
491    // same for uid and gid, which will be overridden in the header itself.
492    if let Some(pax_extensions_data) = pax_extensions_data {
493        let pax = pax_extensions(pax_extensions_data);
494        for extension in pax {
495            let extension = extension.map_err(|_e| other("pax extensions invalid"))?;
496
497            // ignore keys that aren't parsable as a string at this stage.
498            // that isn't relevant to the size/uid/gid processing.
499            let Some(key) = extension.key().ok() else {
500                continue;
501            };
502
503            match key {
504                "size" => {
505                    let size_str = extension
506                        .value()
507                        .map_err(|_e| other("failed to parse pax size as string"))?;
508                    size = size_str
509                        .parse::<u64>()
510                        .map_err(|_e| other("failed to parse pax size"))?;
511                }
512
513                "uid" => {
514                    let uid_str = extension
515                        .value()
516                        .map_err(|_e| other("failed to parse pax uid as string"))?;
517                    header.set_uid(
518                        uid_str
519                            .parse::<u64>()
520                            .map_err(|_e| other("failed to parse pax uid"))?,
521                    );
522                }
523
524                "gid" => {
525                    let gid_str = extension
526                        .value()
527                        .map_err(|_e| other("failed to parse pax gid as string"))?;
528                    header.set_gid(
529                        gid_str
530                            .parse::<u64>()
531                            .map_err(|_e| other("failed to parse pax gid"))?,
532                    );
533                }
534
535                _ => {
536                    continue;
537                }
538            }
539        }
540    }
541
542    let data = EntryIo::Data(archive.clone().take(size));
543
544    let ArchiveInner {
545        unpack_xattrs,
546        preserve_mtime,
547        preserve_permissions,
548        ..
549    } = &*archive.inner.lock().unwrap();
550
551    let ret = EntryFields {
552        size,
553        header_pos,
554        file_pos,
555        data: vec![data],
556        header,
557        long_pathname: None,
558        long_linkname: None,
559        pax_extensions: None,
560        unpack_xattrs: *unpack_xattrs,
561        preserve_permissions: *preserve_permissions,
562        preserve_mtime: *preserve_mtime,
563        read_state: None,
564    };
565
566    // Store where the next entry is, rounding up by 512 bytes (the size of
567    // a header);
568    let size = (size + 511) & !(512 - 1);
569    *next += size;
570
571    Poll::Ready(Some(Ok(ret.into_entry())))
572}
573
574fn poll_parse_sparse_header<R: Read + Unpin>(
575    archive: &Archive<R>,
576    next: &mut u64,
577    current_ext: &mut Option<GnuExtSparseHeader>,
578    current_ext_pos: &mut usize,
579    entry: &mut EntryFields<Archive<R>>,
580    cx: &mut Context<'_>,
581) -> Poll<io::Result<()>> {
582    if !entry.header.entry_type().is_gnu_sparse() {
583        return Poll::Ready(Ok(()));
584    }
585
586    let gnu = match entry.header.as_gnu() {
587        Some(gnu) => gnu,
588        None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
589    };
590
591    // Sparse files are represented internally as a list of blocks that are
592    // read. Blocks are either a bunch of 0's or they're data from the
593    // underlying archive.
594    //
595    // Blocks of a sparse file are described by the `GnuSparseHeader`
596    // structure, some of which are contained in `GnuHeader` but some of
597    // which may also be contained after the first header in further
598    // headers.
599    //
600    // We read off all the blocks here and use the `add_block` function to
601    // incrementally add them to the list of I/O block (in `entry.data`).
602    // The `add_block` function also validates that each chunk comes after
603    // the previous, we don't overrun the end of the file, and each block is
604    // aligned to a 512-byte boundary in the archive itself.
605    //
606    // At the end we verify that the sparse file size (`Header::size`) is
607    // the same as the current offset (described by the list of blocks) as
608    // well as the amount of data read equals the size of the entry
609    // (`Header::entry_size`).
610    entry.data.truncate(0);
611
612    let mut cur = 0;
613    let mut remaining = entry.size;
614    {
615        let data = &mut entry.data;
616        let reader = archive.clone();
617        let size = entry.size;
618        let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
619            if block.is_empty() {
620                return Ok(());
621            }
622            let off = block.offset()?;
623            let len = block.length()?;
624
625            if (size - remaining) % 512 != 0 {
626                return Err(other(
627                    "previous block in sparse file was not \
628                     aligned to 512-byte boundary",
629                ));
630            } else if off < cur {
631                return Err(other(
632                    "out of order or overlapping sparse \
633                     blocks",
634                ));
635            } else if cur < off {
636                let block = io::repeat(0).take(off - cur);
637                data.push(EntryIo::Pad(block));
638            }
639            cur = off
640                .checked_add(len)
641                .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
642            remaining = remaining.checked_sub(len).ok_or_else(|| {
643                other(
644                    "sparse file consumed more data than the header \
645                     listed",
646                )
647            })?;
648            data.push(EntryIo::Data(reader.clone().take(len)));
649            Ok(())
650        };
651        for block in &gnu.sparse {
652            add_block(block)?
653        }
654        if gnu.is_extended() {
655            let started_header = current_ext.is_some();
656            if !started_header {
657                let mut ext = GnuExtSparseHeader::new();
658                ext.isextended[0] = 1;
659                *current_ext = Some(ext);
660                *current_ext_pos = 0;
661            }
662
663            let ext = current_ext.as_mut().unwrap();
664            while ext.is_extended() {
665                match async_std::task::ready!(poll_try_read_all(
666                    archive.clone(),
667                    cx,
668                    ext.as_mut_bytes(),
669                    current_ext_pos,
670                )) {
671                    Ok(true) => {}
672                    Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
673                    Err(err) => return Poll::Ready(Err(err)),
674                }
675
676                *next += 512;
677                for block in &ext.sparse {
678                    add_block(block)?;
679                }
680            }
681        }
682    }
683    if cur != gnu.real_size()? {
684        return Poll::Ready(Err(other(
685            "mismatch in sparse file chunks and \
686             size in header",
687        )));
688    }
689    entry.size = cur;
690    if remaining > 0 {
691        return Poll::Ready(Err(other(
692            "mismatch in sparse file chunks and \
693             entry size in header",
694        )));
695    }
696
697    Poll::Ready(Ok(()))
698}
699
700impl<R: Read + Unpin> Read for Archive<R> {
701    fn poll_read(
702        self: Pin<&mut Self>,
703        cx: &mut Context<'_>,
704        into: &mut [u8],
705    ) -> Poll<io::Result<usize>> {
706        let mut lock = self.inner.lock().unwrap();
707        let mut inner = Pin::new(&mut *lock);
708        let r = Pin::new(&mut inner.obj);
709
710        let res = async_std::task::ready!(r.poll_read(cx, into));
711        match res {
712            Ok(i) => {
713                inner.pos += i as u64;
714                Poll::Ready(Ok(i))
715            }
716            Err(err) => Poll::Ready(Err(err)),
717        }
718    }
719}
720
721/// Try to fill the buffer from the reader.
722///
723/// If the reader reaches its end before filling the buffer at all, returns `false`.
724/// Otherwise returns `true`.
725fn poll_try_read_all<R: Read + Unpin>(
726    mut source: R,
727    cx: &mut Context<'_>,
728    buf: &mut [u8],
729    pos: &mut usize,
730) -> Poll<io::Result<bool>> {
731    while *pos < buf.len() {
732        match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[*pos..])) {
733            Ok(0) => {
734                if *pos == 0 {
735                    return Poll::Ready(Ok(false));
736                }
737
738                return Poll::Ready(Err(other("failed to read entire block")));
739            }
740            Ok(n) => *pos += n,
741            Err(err) => return Poll::Ready(Err(err)),
742        }
743    }
744
745    *pos = 0;
746    Poll::Ready(Ok(true))
747}
748
749/// Skip n bytes on the given source.
750fn poll_skip<R: Read + Unpin>(
751    mut source: R,
752    cx: &mut Context<'_>,
753    mut amt: u64,
754) -> Poll<io::Result<()>> {
755    let mut buf = [0u8; 4096 * 8];
756    while amt > 0 {
757        let n = cmp::min(amt, buf.len() as u64);
758        match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[..n as usize])) {
759            Ok(0) => {
760                return Poll::Ready(Err(other("unexpected EOF during skip")));
761            }
762            Ok(n) => {
763                amt -= n as u64;
764            }
765            Err(err) => return Poll::Ready(Err(err)),
766        }
767    }
768
769    Poll::Ready(Ok(()))
770}
771
772#[cfg(test)]
773mod tests {
774    use super::*;
775
776    assert_impl_all!(async_std::fs::File: Send, Sync);
777    assert_impl_all!(Entries<async_std::fs::File>: Send, Sync);
778    assert_impl_all!(Archive<async_std::fs::File>: Send, Sync);
779    assert_impl_all!(Entry<Archive<async_std::fs::File>>: Send, Sync);
780}