async_tar_wasm/
archive.rs

1use std::{
2    cmp,
3    pin::Pin,
4    sync::{Arc, Mutex},
5};
6
7#[cfg(feature = "fs")]
8use async_std::{fs, path::Path, prelude::*};
9use async_std::{
10    io,
11    io::prelude::*,
12    stream::Stream,
13    task::{Context, Poll},
14};
15use pin_project::pin_project;
16
17use crate::{
18    entry::{EntryFields, EntryIo},
19    other, Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
20};
21
22#[cfg(feature = "fs")]
23use crate::error::TarError;
24
25/// A top-level representation of an archive file.
26///
27/// This archive can have an entry added to it and it can be iterated over.
28#[derive(Debug)]
29pub struct Archive<R: Read + Unpin> {
30    inner: Arc<Mutex<ArchiveInner<R>>>,
31}
32
33impl<R: Read + Unpin> Clone for Archive<R> {
34    fn clone(&self) -> Self {
35        Archive {
36            inner: self.inner.clone(),
37        }
38    }
39}
40
41#[pin_project]
42#[derive(Debug)]
43pub struct ArchiveInner<R: Read + Unpin> {
44    pos: u64,
45    unpack_xattrs: bool,
46    preserve_permissions: bool,
47    preserve_mtime: bool,
48    ignore_zeros: bool,
49    #[pin]
50    obj: R,
51}
52
53/// Configure the archive.
54pub struct ArchiveBuilder<R: Read + Unpin> {
55    obj: R,
56    unpack_xattrs: bool,
57    preserve_permissions: bool,
58    preserve_mtime: bool,
59    ignore_zeros: bool,
60}
61
62impl<R: Read + Unpin> ArchiveBuilder<R> {
63    /// Create a new builder.
64    pub fn new(obj: R) -> Self {
65        ArchiveBuilder {
66            unpack_xattrs: false,
67            preserve_permissions: false,
68            preserve_mtime: true,
69            ignore_zeros: false,
70            obj,
71        }
72    }
73
74    /// Indicate whether extended file attributes (xattrs on Unix) are preserved
75    /// when unpacking this archive.
76    ///
77    /// This flag is disabled by default and is currently only implemented on
78    /// Unix using xattr support. This may eventually be implemented for
79    /// Windows, however, if other archive implementations are found which do
80    /// this as well.
81    pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
82        self.unpack_xattrs = unpack_xattrs;
83        self
84    }
85
86    /// Indicate whether extended permissions (like suid on Unix) are preserved
87    /// when unpacking this entry.
88    ///
89    /// This flag is disabled by default and is currently only implemented on
90    /// Unix.
91    pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
92        self.preserve_permissions = preserve;
93        self
94    }
95
96    /// Indicate whether access time information is preserved when unpacking
97    /// this entry.
98    ///
99    /// This flag is enabled by default.
100    pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
101        self.preserve_mtime = preserve;
102        self
103    }
104
105    /// Ignore zeroed headers, which would otherwise indicate to the archive that it has no more
106    /// entries.
107    ///
108    /// This can be used in case multiple tar archives have been concatenated together.
109    pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
110        self.ignore_zeros = ignore_zeros;
111        self
112    }
113
114    /// Construct the archive, ready to accept inputs.
115    pub fn build(self) -> Archive<R> {
116        let Self {
117            unpack_xattrs,
118            preserve_permissions,
119            preserve_mtime,
120            ignore_zeros,
121            obj,
122        } = self;
123
124        Archive {
125            inner: Arc::new(Mutex::new(ArchiveInner {
126                unpack_xattrs,
127                preserve_permissions,
128                preserve_mtime,
129                ignore_zeros,
130                obj,
131                pos: 0,
132            })),
133        }
134    }
135}
136
137impl<R: Read + Unpin> Archive<R> {
138    /// Create a new archive with the underlying object as the reader.
139    pub fn new(obj: R) -> Archive<R> {
140        Archive {
141            inner: Arc::new(Mutex::new(ArchiveInner {
142                unpack_xattrs: false,
143                preserve_permissions: false,
144                preserve_mtime: true,
145                ignore_zeros: false,
146                obj,
147                pos: 0,
148            })),
149        }
150    }
151
152    /// Unwrap this archive, returning the underlying object.
153    pub fn into_inner(self) -> Result<R, Self> {
154        match Arc::try_unwrap(self.inner) {
155            Ok(inner) => Ok(inner.into_inner().unwrap().obj),
156            Err(inner) => Err(Self { inner }),
157        }
158    }
159
160    /// Construct an stream over the entries in this archive.
161    ///
162    /// Note that care must be taken to consider each entry within an archive in
163    /// sequence. If entries are processed out of sequence (from what the
164    /// stream returns), then the contents read for each entry may be
165    /// corrupted.
166    pub fn entries(self) -> io::Result<Entries<R>> {
167        if self.inner.lock().unwrap().pos != 0 {
168            return Err(other(
169                "cannot call entries unless archive is at \
170                 position 0",
171            ));
172        }
173
174        Ok(Entries {
175            archive: self,
176            current: (0, None, 0, None),
177            fields: None,
178            gnu_longlink: None,
179            gnu_longname: None,
180            pax_extensions: None,
181        })
182    }
183
184    /// Construct an stream over the raw entries in this archive.
185    ///
186    /// Note that care must be taken to consider each entry within an archive in
187    /// sequence. If entries are processed out of sequence (from what the
188    /// stream returns), then the contents read for each entry may be
189    /// corrupted.
190    pub fn entries_raw(self) -> io::Result<RawEntries<R>> {
191        if self.inner.lock().unwrap().pos != 0 {
192            return Err(other(
193                "cannot call entries_raw unless archive is at \
194                 position 0",
195            ));
196        }
197
198        Ok(RawEntries {
199            archive: self,
200            current: (0, None, 0),
201        })
202    }
203
204    /// Unpacks the contents tarball into the specified `dst`.
205    ///
206    /// This function will iterate over the entire contents of this tarball,
207    /// extracting each file in turn to the location specified by the entry's
208    /// path name.
209    ///
210    /// This operation is relatively sensitive in that it will not write files
211    /// outside of the path specified by `dst`. Files in the archive which have
212    /// a '..' in their path are skipped during the unpacking process.
213    ///
214    /// # Examples
215    ///
216    /// ```no_run
217    /// # fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> { async_std::task::block_on(async {
218    /// #
219    /// use async_std::fs::File;
220    /// use async_tar::Archive;
221    ///
222    /// let mut ar = Archive::new(File::open("foo.tar").await?);
223    /// ar.unpack("foo").await?;
224    /// #
225    /// # Ok(()) }) }
226    /// ```
227    #[cfg(feature = "fs")]
228    pub async fn unpack<P: AsRef<Path>>(self, dst: P) -> io::Result<()> {
229        let mut entries = self.entries()?;
230        let mut pinned = Pin::new(&mut entries);
231        let dst = dst.as_ref();
232
233        if dst.symlink_metadata().await.is_err() {
234            fs::create_dir_all(&dst)
235                .await
236                .map_err(|e| TarError::new(&format!("failed to create `{}`", dst.display()), e))?;
237        }
238
239        // Canonicalizing the dst directory will prepend the path with '\\?\'
240        // on windows which will allow windows APIs to treat the path as an
241        // extended-length path with a 32,767 character limit. Otherwise all
242        // unpacked paths over 260 characters will fail on creation with a
243        // NotFound exception.
244        let dst = &dst
245            .canonicalize()
246            .await
247            .unwrap_or_else(|_| dst.to_path_buf());
248
249        // Delay any directory entries until the end (they will be created if needed by
250        // descendants), to ensure that directory permissions do not interfer with descendant
251        // extraction.
252        let mut directories = Vec::new();
253        while let Some(entry) = pinned.next().await {
254            let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
255            if file.header().entry_type() == crate::EntryType::Directory {
256                directories.push(file);
257            } else {
258                file.unpack_in(dst).await?;
259            }
260        }
261        for mut dir in directories {
262            dir.unpack_in(dst).await?;
263        }
264
265        Ok(())
266    }
267}
268
269/// Stream of `Entry`s.
270#[pin_project]
271#[derive(Debug)]
272pub struct Entries<R: Read + Unpin> {
273    archive: Archive<R>,
274    current: (u64, Option<Header>, usize, Option<GnuExtSparseHeader>),
275    fields: Option<EntryFields<Archive<R>>>,
276    gnu_longname: Option<Vec<u8>>,
277    gnu_longlink: Option<Vec<u8>>,
278    pax_extensions: Option<Vec<u8>>,
279}
280
281macro_rules! ready_opt_err {
282    ($val:expr) => {
283        match async_std::task::ready!($val) {
284            Some(Ok(val)) => val,
285            Some(Err(err)) => return Poll::Ready(Some(Err(err))),
286            None => return Poll::Ready(None),
287        }
288    };
289}
290
291macro_rules! ready_err {
292    ($val:expr) => {
293        match async_std::task::ready!($val) {
294            Ok(val) => val,
295            Err(err) => return Poll::Ready(Some(Err(err))),
296        }
297    };
298}
299
300impl<R: Read + Unpin> Stream for Entries<R> {
301    type Item = io::Result<Entry<Archive<R>>>;
302
303    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
304        let mut this = self.project();
305        loop {
306            let (next, current_header, current_header_pos, _) = &mut this.current;
307
308            let fields = if let Some(fields) = this.fields.as_mut() {
309                fields
310            } else {
311                *this.fields = Some(EntryFields::from(ready_opt_err!(poll_next_raw(
312                    this.archive,
313                    next,
314                    current_header,
315                    current_header_pos,
316                    cx
317                ))));
318                continue;
319            };
320
321            let is_recognized_header =
322                fields.header.as_gnu().is_some() || fields.header.as_ustar().is_some();
323            if is_recognized_header && fields.header.entry_type().is_gnu_longname() {
324                if this.gnu_longname.is_some() {
325                    return Poll::Ready(Some(Err(other(
326                        "two long name entries describing \
327                         the same member",
328                    ))));
329                }
330
331                *this.gnu_longname = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
332                *this.fields = None;
333                continue;
334            }
335
336            if is_recognized_header && fields.header.entry_type().is_gnu_longlink() {
337                if this.gnu_longlink.is_some() {
338                    return Poll::Ready(Some(Err(other(
339                        "two long name entries describing \
340                         the same member",
341                    ))));
342                }
343                *this.gnu_longlink = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
344                *this.fields = None;
345                continue;
346            }
347
348            if is_recognized_header && fields.header.entry_type().is_pax_local_extensions() {
349                if this.pax_extensions.is_some() {
350                    return Poll::Ready(Some(Err(other(
351                        "two pax extensions entries describing \
352                         the same member",
353                    ))));
354                }
355                *this.pax_extensions = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
356                *this.fields = None;
357                continue;
358            }
359
360            fields.long_pathname = this.gnu_longname.take();
361            fields.long_linkname = this.gnu_longlink.take();
362            fields.pax_extensions = this.pax_extensions.take();
363
364            let (next, _, current_pos, current_ext) = &mut this.current;
365            ready_err!(poll_parse_sparse_header(
366                this.archive,
367                next,
368                current_ext,
369                current_pos,
370                fields,
371                cx
372            ));
373
374            return Poll::Ready(Some(Ok(this.fields.take().unwrap().into_entry())));
375        }
376    }
377}
378
379/// Stream of raw `Entry`s.
380pub struct RawEntries<R: Read + Unpin> {
381    archive: Archive<R>,
382    current: (u64, Option<Header>, usize),
383}
384
385impl<R: Read + Unpin> Stream for RawEntries<R> {
386    type Item = io::Result<Entry<Archive<R>>>;
387
388    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
389        let archive = self.archive.clone();
390        let (next, current_header, current_header_pos) = &mut self.current;
391        poll_next_raw(&archive, next, current_header, current_header_pos, cx)
392    }
393}
394
395fn poll_next_raw<R: Read + Unpin>(
396    archive: &Archive<R>,
397    next: &mut u64,
398    current_header: &mut Option<Header>,
399    current_header_pos: &mut usize,
400    cx: &mut Context<'_>,
401) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
402    let mut header_pos = *next;
403
404    loop {
405        let archive = archive.clone();
406        // Seek to the start of the next header in the archive
407        if current_header.is_none() {
408            let delta = *next - archive.inner.lock().unwrap().pos;
409            match async_std::task::ready!(poll_skip(archive.clone(), cx, delta)) {
410                Ok(_) => {}
411                Err(err) => return Poll::Ready(Some(Err(err))),
412            }
413
414            *current_header = Some(Header::new_old());
415            *current_header_pos = 0;
416        }
417
418        let header = current_header.as_mut().unwrap();
419
420        // EOF is an indicator that we are at the end of the archive.
421        match async_std::task::ready!(poll_try_read_all(
422            archive.clone(),
423            cx,
424            header.as_mut_bytes(),
425            current_header_pos,
426        )) {
427            Ok(true) => {}
428            Ok(false) => return Poll::Ready(None),
429            Err(err) => return Poll::Ready(Some(Err(err))),
430        }
431
432        // If a header is not all zeros, we have another valid header.
433        // Otherwise, check if we are ignoring zeros and continue, or break as if this is the
434        // end of the archive.
435        if !header.as_bytes().iter().all(|i| *i == 0) {
436            *next += 512;
437            break;
438        }
439
440        if !archive.inner.lock().unwrap().ignore_zeros {
441            return Poll::Ready(None);
442        }
443
444        *next += 512;
445        header_pos = *next;
446    }
447
448    let header = current_header.as_mut().unwrap();
449
450    // Make sure the checksum is ok
451    let sum = header.as_bytes()[..148]
452        .iter()
453        .chain(&header.as_bytes()[156..])
454        .fold(0, |a, b| a + (*b as u32))
455        + 8 * 32;
456    let cksum = header.cksum()?;
457    if sum != cksum {
458        return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
459    }
460
461    let file_pos = *next;
462    let size = header.entry_size()?;
463
464    let data = EntryIo::Data(archive.clone().take(size));
465
466    let header = current_header.take().unwrap();
467
468    let ArchiveInner {
469        unpack_xattrs,
470        preserve_mtime,
471        preserve_permissions,
472        ..
473    } = &*archive.inner.lock().unwrap();
474
475    let ret = EntryFields {
476        size,
477        header_pos,
478        file_pos,
479        data: vec![data],
480        header,
481        long_pathname: None,
482        long_linkname: None,
483        pax_extensions: None,
484        unpack_xattrs: *unpack_xattrs,
485        preserve_permissions: *preserve_permissions,
486        preserve_mtime: *preserve_mtime,
487        read_state: None,
488    };
489
490    // Store where the next entry is, rounding up by 512 bytes (the size of
491    // a header);
492    let size = (size + 511) & !(512 - 1);
493    *next += size;
494
495    Poll::Ready(Some(Ok(ret.into_entry())))
496}
497
498fn poll_parse_sparse_header<R: Read + Unpin>(
499    archive: &Archive<R>,
500    next: &mut u64,
501    current_ext: &mut Option<GnuExtSparseHeader>,
502    current_ext_pos: &mut usize,
503    entry: &mut EntryFields<Archive<R>>,
504    cx: &mut Context<'_>,
505) -> Poll<io::Result<()>> {
506    if !entry.header.entry_type().is_gnu_sparse() {
507        return Poll::Ready(Ok(()));
508    }
509
510    let gnu = match entry.header.as_gnu() {
511        Some(gnu) => gnu,
512        None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
513    };
514
515    // Sparse files are represented internally as a list of blocks that are
516    // read. Blocks are either a bunch of 0's or they're data from the
517    // underlying archive.
518    //
519    // Blocks of a sparse file are described by the `GnuSparseHeader`
520    // structure, some of which are contained in `GnuHeader` but some of
521    // which may also be contained after the first header in further
522    // headers.
523    //
524    // We read off all the blocks here and use the `add_block` function to
525    // incrementally add them to the list of I/O block (in `entry.data`).
526    // The `add_block` function also validates that each chunk comes after
527    // the previous, we don't overrun the end of the file, and each block is
528    // aligned to a 512-byte boundary in the archive itself.
529    //
530    // At the end we verify that the sparse file size (`Header::size`) is
531    // the same as the current offset (described by the list of blocks) as
532    // well as the amount of data read equals the size of the entry
533    // (`Header::entry_size`).
534    entry.data.truncate(0);
535
536    let mut cur = 0;
537    let mut remaining = entry.size;
538    {
539        let data = &mut entry.data;
540        let reader = archive.clone();
541        let size = entry.size;
542        let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
543            if block.is_empty() {
544                return Ok(());
545            }
546            let off = block.offset()?;
547            let len = block.length()?;
548
549            if (size - remaining) % 512 != 0 {
550                return Err(other(
551                    "previous block in sparse file was not \
552                     aligned to 512-byte boundary",
553                ));
554            } else if off < cur {
555                return Err(other(
556                    "out of order or overlapping sparse \
557                     blocks",
558                ));
559            } else if cur < off {
560                let block = io::repeat(0).take(off - cur);
561                data.push(EntryIo::Pad(block));
562            }
563            cur = off
564                .checked_add(len)
565                .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
566            remaining = remaining.checked_sub(len).ok_or_else(|| {
567                other(
568                    "sparse file consumed more data than the header \
569                     listed",
570                )
571            })?;
572            data.push(EntryIo::Data(reader.clone().take(len)));
573            Ok(())
574        };
575        for block in &gnu.sparse {
576            add_block(block)?
577        }
578        if gnu.is_extended() {
579            let started_header = current_ext.is_some();
580            if !started_header {
581                let mut ext = GnuExtSparseHeader::new();
582                ext.isextended[0] = 1;
583                *current_ext = Some(ext);
584                *current_ext_pos = 0;
585            }
586
587            let ext = current_ext.as_mut().unwrap();
588            while ext.is_extended() {
589                match async_std::task::ready!(poll_try_read_all(
590                    archive.clone(),
591                    cx,
592                    ext.as_mut_bytes(),
593                    current_ext_pos,
594                )) {
595                    Ok(true) => {}
596                    Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
597                    Err(err) => return Poll::Ready(Err(err)),
598                }
599
600                *next += 512;
601                for block in &ext.sparse {
602                    add_block(block)?;
603                }
604            }
605        }
606    }
607    if cur != gnu.real_size()? {
608        return Poll::Ready(Err(other(
609            "mismatch in sparse file chunks and \
610             size in header",
611        )));
612    }
613    entry.size = cur;
614    if remaining > 0 {
615        return Poll::Ready(Err(other(
616            "mismatch in sparse file chunks and \
617             entry size in header",
618        )));
619    }
620
621    Poll::Ready(Ok(()))
622}
623
624impl<R: Read + Unpin> Read for Archive<R> {
625    fn poll_read(
626        self: Pin<&mut Self>,
627        cx: &mut Context<'_>,
628        into: &mut [u8],
629    ) -> Poll<io::Result<usize>> {
630        let mut lock = self.inner.lock().unwrap();
631        let mut inner = Pin::new(&mut *lock);
632        let r = Pin::new(&mut inner.obj);
633
634        let res = async_std::task::ready!(r.poll_read(cx, into));
635        match res {
636            Ok(i) => {
637                inner.pos += i as u64;
638                Poll::Ready(Ok(i))
639            }
640            Err(err) => Poll::Ready(Err(err)),
641        }
642    }
643}
644
645/// Try to fill the buffer from the reader.
646///
647/// If the reader reaches its end before filling the buffer at all, returns `false`.
648/// Otherwise returns `true`.
649fn poll_try_read_all<R: Read + Unpin>(
650    mut source: R,
651    cx: &mut Context<'_>,
652    buf: &mut [u8],
653    pos: &mut usize,
654) -> Poll<io::Result<bool>> {
655    while *pos < buf.len() {
656        match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[*pos..])) {
657            Ok(0) => {
658                if *pos == 0 {
659                    return Poll::Ready(Ok(false));
660                }
661
662                return Poll::Ready(Err(other("failed to read entire block")));
663            }
664            Ok(n) => *pos += n,
665            Err(err) => return Poll::Ready(Err(err)),
666        }
667    }
668
669    *pos = 0;
670    Poll::Ready(Ok(true))
671}
672
673/// Skip n bytes on the given source.
674fn poll_skip<R: Read + Unpin>(
675    mut source: R,
676    cx: &mut Context<'_>,
677    mut amt: u64,
678) -> Poll<io::Result<()>> {
679    let mut buf = [0u8; 4096 * 8];
680    while amt > 0 {
681        let n = cmp::min(amt, buf.len() as u64);
682        match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[..n as usize])) {
683            Ok(n) if n == 0 => {
684                return Poll::Ready(Err(other("unexpected EOF during skip")));
685            }
686            Ok(n) => {
687                amt -= n as u64;
688            }
689            Err(err) => return Poll::Ready(Err(err)),
690        }
691    }
692
693    Poll::Ready(Ok(()))
694}
695
696#[cfg(test)]
697mod tests {
698    use super::*;
699
700    assert_impl_all!(async_std::fs::File: Send, Sync);
701    assert_impl_all!(Entries<async_std::fs::File>: Send, Sync);
702    assert_impl_all!(Archive<async_std::fs::File>: Send, Sync);
703    assert_impl_all!(Entry<Archive<async_std::fs::File>>: Send, Sync);
704}