async_tar/
archive.rs

1use std::{
2    cmp,
3    pin::Pin,
4    sync::{Arc, Mutex},
5};
6
7use async_std::{
8    fs, io,
9    io::prelude::*,
10    path::Path,
11    prelude::*,
12    stream::Stream,
13    task::{Context, Poll},
14};
15use pin_project::pin_project;
16
17use crate::{
18    entry::{EntryFields, EntryIo},
19    error::TarError,
20    other, Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
21};
22
23/// A top-level representation of an archive file.
24///
25/// This archive can have an entry added to it and it can be iterated over.
26#[derive(Debug)]
27pub struct Archive<R: Read + Unpin> {
28    inner: Arc<Mutex<ArchiveInner<R>>>,
29}
30
31impl<R: Read + Unpin> Clone for Archive<R> {
32    fn clone(&self) -> Self {
33        Archive {
34            inner: self.inner.clone(),
35        }
36    }
37}
38
39#[pin_project]
40#[derive(Debug)]
41pub struct ArchiveInner<R: Read + Unpin> {
42    pos: u64,
43    unpack_xattrs: bool,
44    preserve_permissions: bool,
45    preserve_mtime: bool,
46    ignore_zeros: bool,
47    #[pin]
48    obj: R,
49}
50
51/// Configure the archive.
52pub struct ArchiveBuilder<R: Read + Unpin> {
53    obj: R,
54    unpack_xattrs: bool,
55    preserve_permissions: bool,
56    preserve_mtime: bool,
57    ignore_zeros: bool,
58}
59
60impl<R: Read + Unpin> ArchiveBuilder<R> {
61    /// Create a new builder.
62    pub fn new(obj: R) -> Self {
63        ArchiveBuilder {
64            unpack_xattrs: false,
65            preserve_permissions: false,
66            preserve_mtime: true,
67            ignore_zeros: false,
68            obj,
69        }
70    }
71
72    /// Indicate whether extended file attributes (xattrs on Unix) are preserved
73    /// when unpacking this archive.
74    ///
75    /// This flag is disabled by default and is currently only implemented on
76    /// Unix using xattr support. This may eventually be implemented for
77    /// Windows, however, if other archive implementations are found which do
78    /// this as well.
79    pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
80        self.unpack_xattrs = unpack_xattrs;
81        self
82    }
83
84    /// Indicate whether extended permissions (like suid on Unix) are preserved
85    /// when unpacking this entry.
86    ///
87    /// This flag is disabled by default and is currently only implemented on
88    /// Unix.
89    pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
90        self.preserve_permissions = preserve;
91        self
92    }
93
94    /// Indicate whether access time information is preserved when unpacking
95    /// this entry.
96    ///
97    /// This flag is enabled by default.
98    pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
99        self.preserve_mtime = preserve;
100        self
101    }
102
103    /// Ignore zeroed headers, which would otherwise indicate to the archive that it has no more
104    /// entries.
105    ///
106    /// This can be used in case multiple tar archives have been concatenated together.
107    pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
108        self.ignore_zeros = ignore_zeros;
109        self
110    }
111
112    /// Construct the archive, ready to accept inputs.
113    pub fn build(self) -> Archive<R> {
114        let Self {
115            unpack_xattrs,
116            preserve_permissions,
117            preserve_mtime,
118            ignore_zeros,
119            obj,
120        } = self;
121
122        Archive {
123            inner: Arc::new(Mutex::new(ArchiveInner {
124                unpack_xattrs,
125                preserve_permissions,
126                preserve_mtime,
127                ignore_zeros,
128                obj,
129                pos: 0,
130            })),
131        }
132    }
133}
134
135impl<R: Read + Unpin> Archive<R> {
136    /// Create a new archive with the underlying object as the reader.
137    pub fn new(obj: R) -> Archive<R> {
138        Archive {
139            inner: Arc::new(Mutex::new(ArchiveInner {
140                unpack_xattrs: false,
141                preserve_permissions: false,
142                preserve_mtime: true,
143                ignore_zeros: false,
144                obj,
145                pos: 0,
146            })),
147        }
148    }
149
150    /// Unwrap this archive, returning the underlying object.
151    pub fn into_inner(self) -> Result<R, Self> {
152        match Arc::try_unwrap(self.inner) {
153            Ok(inner) => Ok(inner.into_inner().unwrap().obj),
154            Err(inner) => Err(Self { inner }),
155        }
156    }
157
158    /// Construct an stream over the entries in this archive.
159    ///
160    /// Note that care must be taken to consider each entry within an archive in
161    /// sequence. If entries are processed out of sequence (from what the
162    /// stream returns), then the contents read for each entry may be
163    /// corrupted.
164    pub fn entries(self) -> io::Result<Entries<R>> {
165        if self.inner.lock().unwrap().pos != 0 {
166            return Err(other(
167                "cannot call entries unless archive is at \
168                 position 0",
169            ));
170        }
171
172        Ok(Entries {
173            archive: self,
174            current: (0, None, 0, None),
175            fields: None,
176            gnu_longlink: None,
177            gnu_longname: None,
178            pax_extensions: None,
179        })
180    }
181
182    /// Construct an stream over the raw entries in this archive.
183    ///
184    /// Note that care must be taken to consider each entry within an archive in
185    /// sequence. If entries are processed out of sequence (from what the
186    /// stream returns), then the contents read for each entry may be
187    /// corrupted.
188    pub fn entries_raw(self) -> io::Result<RawEntries<R>> {
189        if self.inner.lock().unwrap().pos != 0 {
190            return Err(other(
191                "cannot call entries_raw unless archive is at \
192                 position 0",
193            ));
194        }
195
196        Ok(RawEntries {
197            archive: self,
198            current: (0, None, 0),
199        })
200    }
201
202    /// Unpacks the contents tarball into the specified `dst`.
203    ///
204    /// This function will iterate over the entire contents of this tarball,
205    /// extracting each file in turn to the location specified by the entry's
206    /// path name.
207    ///
208    /// This operation is relatively sensitive in that it will not write files
209    /// outside of the path specified by `dst`. Files in the archive which have
210    /// a '..' in their path are skipped during the unpacking process.
211    ///
212    /// # Examples
213    ///
214    /// ```no_run
215    /// # fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> { async_std::task::block_on(async {
216    /// #
217    /// use async_std::fs::File;
218    /// use async_tar::Archive;
219    ///
220    /// let mut ar = Archive::new(File::open("foo.tar").await?);
221    /// ar.unpack("foo").await?;
222    /// #
223    /// # Ok(()) }) }
224    /// ```
225    pub async fn unpack<P: AsRef<Path>>(self, dst: P) -> io::Result<()> {
226        let mut entries = self.entries()?;
227        let mut pinned = Pin::new(&mut entries);
228        let dst = dst.as_ref();
229
230        if dst.symlink_metadata().await.is_err() {
231            fs::create_dir_all(&dst)
232                .await
233                .map_err(|e| TarError::new(&format!("failed to create `{}`", dst.display()), e))?;
234        }
235
236        // Canonicalizing the dst directory will prepend the path with '\\?\'
237        // on windows which will allow windows APIs to treat the path as an
238        // extended-length path with a 32,767 character limit. Otherwise all
239        // unpacked paths over 260 characters will fail on creation with a
240        // NotFound exception.
241        let dst = &dst
242            .canonicalize()
243            .await
244            .unwrap_or_else(|_| dst.to_path_buf());
245
246        // Delay any directory entries until the end (they will be created if needed by
247        // descendants), to ensure that directory permissions do not interfer with descendant
248        // extraction.
249        let mut directories = Vec::new();
250        while let Some(entry) = pinned.next().await {
251            let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
252            if file.header().entry_type() == crate::EntryType::Directory {
253                directories.push(file);
254            } else {
255                file.unpack_in(dst).await?;
256            }
257        }
258        for mut dir in directories {
259            dir.unpack_in(dst).await?;
260        }
261
262        Ok(())
263    }
264}
265
266/// Stream of `Entry`s.
267#[pin_project]
268#[derive(Debug)]
269pub struct Entries<R: Read + Unpin> {
270    archive: Archive<R>,
271    current: (u64, Option<Header>, usize, Option<GnuExtSparseHeader>),
272    fields: Option<EntryFields<Archive<R>>>,
273    gnu_longname: Option<Vec<u8>>,
274    gnu_longlink: Option<Vec<u8>>,
275    pax_extensions: Option<Vec<u8>>,
276}
277
278macro_rules! ready_opt_err {
279    ($val:expr) => {
280        match async_std::task::ready!($val) {
281            Some(Ok(val)) => val,
282            Some(Err(err)) => return Poll::Ready(Some(Err(err))),
283            None => return Poll::Ready(None),
284        }
285    };
286}
287
288macro_rules! ready_err {
289    ($val:expr) => {
290        match async_std::task::ready!($val) {
291            Ok(val) => val,
292            Err(err) => return Poll::Ready(Some(Err(err))),
293        }
294    };
295}
296
297impl<R: Read + Unpin> Stream for Entries<R> {
298    type Item = io::Result<Entry<Archive<R>>>;
299
300    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
301        let mut this = self.project();
302        loop {
303            let (next, current_header, current_header_pos, _) = &mut this.current;
304
305            let fields = if let Some(fields) = this.fields.as_mut() {
306                fields
307            } else {
308                *this.fields = Some(EntryFields::from(ready_opt_err!(poll_next_raw(
309                    this.archive,
310                    next,
311                    current_header,
312                    current_header_pos,
313                    cx
314                ))));
315                continue;
316            };
317
318            let is_recognized_header =
319                fields.header.as_gnu().is_some() || fields.header.as_ustar().is_some();
320            if is_recognized_header && fields.header.entry_type().is_gnu_longname() {
321                if this.gnu_longname.is_some() {
322                    return Poll::Ready(Some(Err(other(
323                        "two long name entries describing \
324                         the same member",
325                    ))));
326                }
327
328                *this.gnu_longname = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
329                *this.fields = None;
330                continue;
331            }
332
333            if is_recognized_header && fields.header.entry_type().is_gnu_longlink() {
334                if this.gnu_longlink.is_some() {
335                    return Poll::Ready(Some(Err(other(
336                        "two long name entries describing \
337                         the same member",
338                    ))));
339                }
340                *this.gnu_longlink = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
341                *this.fields = None;
342                continue;
343            }
344
345            if is_recognized_header && fields.header.entry_type().is_pax_local_extensions() {
346                if this.pax_extensions.is_some() {
347                    return Poll::Ready(Some(Err(other(
348                        "two pax extensions entries describing \
349                         the same member",
350                    ))));
351                }
352                *this.pax_extensions = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
353                *this.fields = None;
354                continue;
355            }
356
357            fields.long_pathname = this.gnu_longname.take();
358            fields.long_linkname = this.gnu_longlink.take();
359            fields.pax_extensions = this.pax_extensions.take();
360
361            let (next, _, current_pos, current_ext) = &mut this.current;
362            ready_err!(poll_parse_sparse_header(
363                this.archive,
364                next,
365                current_ext,
366                current_pos,
367                fields,
368                cx
369            ));
370
371            return Poll::Ready(Some(Ok(this.fields.take().unwrap().into_entry())));
372        }
373    }
374}
375
376/// Stream of raw `Entry`s.
377pub struct RawEntries<R: Read + Unpin> {
378    archive: Archive<R>,
379    current: (u64, Option<Header>, usize),
380}
381
382impl<R: Read + Unpin> Stream for RawEntries<R> {
383    type Item = io::Result<Entry<Archive<R>>>;
384
385    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
386        let archive = self.archive.clone();
387        let (next, current_header, current_header_pos) = &mut self.current;
388        poll_next_raw(&archive, next, current_header, current_header_pos, cx)
389    }
390}
391
392fn poll_next_raw<R: Read + Unpin>(
393    archive: &Archive<R>,
394    next: &mut u64,
395    current_header: &mut Option<Header>,
396    current_header_pos: &mut usize,
397    cx: &mut Context<'_>,
398) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
399    let mut header_pos = *next;
400
401    loop {
402        let archive = archive.clone();
403        // Seek to the start of the next header in the archive
404        if current_header.is_none() {
405            let delta = *next - archive.inner.lock().unwrap().pos;
406            match async_std::task::ready!(poll_skip(archive.clone(), cx, delta)) {
407                Ok(_) => {}
408                Err(err) => return Poll::Ready(Some(Err(err))),
409            }
410
411            *current_header = Some(Header::new_old());
412            *current_header_pos = 0;
413        }
414
415        let header = current_header.as_mut().unwrap();
416
417        // EOF is an indicator that we are at the end of the archive.
418        match async_std::task::ready!(poll_try_read_all(
419            archive.clone(),
420            cx,
421            header.as_mut_bytes(),
422            current_header_pos,
423        )) {
424            Ok(true) => {}
425            Ok(false) => return Poll::Ready(None),
426            Err(err) => return Poll::Ready(Some(Err(err))),
427        }
428
429        // If a header is not all zeros, we have another valid header.
430        // Otherwise, check if we are ignoring zeros and continue, or break as if this is the
431        // end of the archive.
432        if !header.as_bytes().iter().all(|i| *i == 0) {
433            *next += 512;
434            break;
435        }
436
437        if !archive.inner.lock().unwrap().ignore_zeros {
438            return Poll::Ready(None);
439        }
440
441        *next += 512;
442        header_pos = *next;
443    }
444
445    let header = current_header.as_mut().unwrap();
446
447    // Make sure the checksum is ok
448    let sum = header.as_bytes()[..148]
449        .iter()
450        .chain(&header.as_bytes()[156..])
451        .fold(0, |a, b| a + (*b as u32))
452        + 8 * 32;
453    let cksum = header.cksum()?;
454    if sum != cksum {
455        return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
456    }
457
458    let file_pos = *next;
459    let size = header.entry_size()?;
460
461    let data = EntryIo::Data(archive.clone().take(size));
462
463    let header = current_header.take().unwrap();
464
465    let ArchiveInner {
466        unpack_xattrs,
467        preserve_mtime,
468        preserve_permissions,
469        ..
470    } = &*archive.inner.lock().unwrap();
471
472    let ret = EntryFields {
473        size,
474        header_pos,
475        file_pos,
476        data: vec![data],
477        header,
478        long_pathname: None,
479        long_linkname: None,
480        pax_extensions: None,
481        unpack_xattrs: *unpack_xattrs,
482        preserve_permissions: *preserve_permissions,
483        preserve_mtime: *preserve_mtime,
484        read_state: None,
485    };
486
487    // Store where the next entry is, rounding up by 512 bytes (the size of
488    // a header);
489    let size = (size + 511) & !(512 - 1);
490    *next += size;
491
492    Poll::Ready(Some(Ok(ret.into_entry())))
493}
494
495fn poll_parse_sparse_header<R: Read + Unpin>(
496    archive: &Archive<R>,
497    next: &mut u64,
498    current_ext: &mut Option<GnuExtSparseHeader>,
499    current_ext_pos: &mut usize,
500    entry: &mut EntryFields<Archive<R>>,
501    cx: &mut Context<'_>,
502) -> Poll<io::Result<()>> {
503    if !entry.header.entry_type().is_gnu_sparse() {
504        return Poll::Ready(Ok(()));
505    }
506
507    let gnu = match entry.header.as_gnu() {
508        Some(gnu) => gnu,
509        None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
510    };
511
512    // Sparse files are represented internally as a list of blocks that are
513    // read. Blocks are either a bunch of 0's or they're data from the
514    // underlying archive.
515    //
516    // Blocks of a sparse file are described by the `GnuSparseHeader`
517    // structure, some of which are contained in `GnuHeader` but some of
518    // which may also be contained after the first header in further
519    // headers.
520    //
521    // We read off all the blocks here and use the `add_block` function to
522    // incrementally add them to the list of I/O block (in `entry.data`).
523    // The `add_block` function also validates that each chunk comes after
524    // the previous, we don't overrun the end of the file, and each block is
525    // aligned to a 512-byte boundary in the archive itself.
526    //
527    // At the end we verify that the sparse file size (`Header::size`) is
528    // the same as the current offset (described by the list of blocks) as
529    // well as the amount of data read equals the size of the entry
530    // (`Header::entry_size`).
531    entry.data.truncate(0);
532
533    let mut cur = 0;
534    let mut remaining = entry.size;
535    {
536        let data = &mut entry.data;
537        let reader = archive.clone();
538        let size = entry.size;
539        let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
540            if block.is_empty() {
541                return Ok(());
542            }
543            let off = block.offset()?;
544            let len = block.length()?;
545
546            if (size - remaining) % 512 != 0 {
547                return Err(other(
548                    "previous block in sparse file was not \
549                     aligned to 512-byte boundary",
550                ));
551            } else if off < cur {
552                return Err(other(
553                    "out of order or overlapping sparse \
554                     blocks",
555                ));
556            } else if cur < off {
557                let block = io::repeat(0).take(off - cur);
558                data.push(EntryIo::Pad(block));
559            }
560            cur = off
561                .checked_add(len)
562                .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
563            remaining = remaining.checked_sub(len).ok_or_else(|| {
564                other(
565                    "sparse file consumed more data than the header \
566                     listed",
567                )
568            })?;
569            data.push(EntryIo::Data(reader.clone().take(len)));
570            Ok(())
571        };
572        for block in &gnu.sparse {
573            add_block(block)?
574        }
575        if gnu.is_extended() {
576            let started_header = current_ext.is_some();
577            if !started_header {
578                let mut ext = GnuExtSparseHeader::new();
579                ext.isextended[0] = 1;
580                *current_ext = Some(ext);
581                *current_ext_pos = 0;
582            }
583
584            let ext = current_ext.as_mut().unwrap();
585            while ext.is_extended() {
586                match async_std::task::ready!(poll_try_read_all(
587                    archive.clone(),
588                    cx,
589                    ext.as_mut_bytes(),
590                    current_ext_pos,
591                )) {
592                    Ok(true) => {}
593                    Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
594                    Err(err) => return Poll::Ready(Err(err)),
595                }
596
597                *next += 512;
598                for block in &ext.sparse {
599                    add_block(block)?;
600                }
601            }
602        }
603    }
604    if cur != gnu.real_size()? {
605        return Poll::Ready(Err(other(
606            "mismatch in sparse file chunks and \
607             size in header",
608        )));
609    }
610    entry.size = cur;
611    if remaining > 0 {
612        return Poll::Ready(Err(other(
613            "mismatch in sparse file chunks and \
614             entry size in header",
615        )));
616    }
617
618    Poll::Ready(Ok(()))
619}
620
621impl<R: Read + Unpin> Read for Archive<R> {
622    fn poll_read(
623        self: Pin<&mut Self>,
624        cx: &mut Context<'_>,
625        into: &mut [u8],
626    ) -> Poll<io::Result<usize>> {
627        let mut lock = self.inner.lock().unwrap();
628        let mut inner = Pin::new(&mut *lock);
629        let r = Pin::new(&mut inner.obj);
630
631        let res = async_std::task::ready!(r.poll_read(cx, into));
632        match res {
633            Ok(i) => {
634                inner.pos += i as u64;
635                Poll::Ready(Ok(i))
636            }
637            Err(err) => Poll::Ready(Err(err)),
638        }
639    }
640}
641
642/// Try to fill the buffer from the reader.
643///
644/// If the reader reaches its end before filling the buffer at all, returns `false`.
645/// Otherwise returns `true`.
646fn poll_try_read_all<R: Read + Unpin>(
647    mut source: R,
648    cx: &mut Context<'_>,
649    buf: &mut [u8],
650    pos: &mut usize,
651) -> Poll<io::Result<bool>> {
652    while *pos < buf.len() {
653        match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[*pos..])) {
654            Ok(0) => {
655                if *pos == 0 {
656                    return Poll::Ready(Ok(false));
657                }
658
659                return Poll::Ready(Err(other("failed to read entire block")));
660            }
661            Ok(n) => *pos += n,
662            Err(err) => return Poll::Ready(Err(err)),
663        }
664    }
665
666    *pos = 0;
667    Poll::Ready(Ok(true))
668}
669
670/// Skip n bytes on the given source.
671fn poll_skip<R: Read + Unpin>(
672    mut source: R,
673    cx: &mut Context<'_>,
674    mut amt: u64,
675) -> Poll<io::Result<()>> {
676    let mut buf = [0u8; 4096 * 8];
677    while amt > 0 {
678        let n = cmp::min(amt, buf.len() as u64);
679        match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[..n as usize])) {
680            Ok(0) => {
681                return Poll::Ready(Err(other("unexpected EOF during skip")));
682            }
683            Ok(n) => {
684                amt -= n as u64;
685            }
686            Err(err) => return Poll::Ready(Err(err)),
687        }
688    }
689
690    Poll::Ready(Ok(()))
691}
692
693#[cfg(test)]
694mod tests {
695    use super::*;
696
697    assert_impl_all!(async_std::fs::File: Send, Sync);
698    assert_impl_all!(Entries<async_std::fs::File>: Send, Sync);
699    assert_impl_all!(Archive<async_std::fs::File>: Send, Sync);
700    assert_impl_all!(Entry<Archive<async_std::fs::File>>: Send, Sync);
701}