Skip to main content

async_tar/
archive.rs

1#[cfg(feature = "runtime-tokio")]
2use std::path::Path;
3use std::{
4    cmp,
5    pin::Pin,
6    sync::{Arc, Mutex},
7    task::{Context, Poll},
8};
9
10#[cfg(feature = "runtime-async-std")]
11use async_std::{
12    fs, io,
13    io::prelude::*,
14    path::Path,
15    stream::{Stream, StreamExt},
16};
17use futures_core::ready;
18#[cfg(feature = "runtime-tokio")]
19use tokio::{
20    fs,
21    io::{self, AsyncRead as Read, AsyncReadExt},
22};
23#[cfg(feature = "runtime-tokio")]
24use tokio_stream::{Stream, StreamExt};
25
26use crate::{
27    Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
28    entry::{EntryFields, EntryIo},
29    error::TarError,
30    fs_canonicalize, other,
31    pax::pax_extensions,
32    symlink_metadata,
33};
34
35/// A top-level representation of an archive file.
36///
37/// This archive can have an entry added to it and it can be iterated over.
38#[derive(Debug)]
39pub struct Archive<R: Read + Unpin> {
40    inner: Arc<Mutex<ArchiveInner<R>>>,
41}
42
43impl<R: Read + Unpin> Clone for Archive<R> {
44    fn clone(&self) -> Self {
45        Archive {
46            inner: self.inner.clone(),
47        }
48    }
49}
50
51#[derive(Debug)]
52pub struct ArchiveInner<R: Read + Unpin> {
53    pos: u64,
54    unpack_xattrs: bool,
55    preserve_permissions: bool,
56    preserve_mtime: bool,
57    ignore_zeros: bool,
58    obj: R,
59}
60
61/// Configure the archive.
62pub struct ArchiveBuilder<R: Read + Unpin> {
63    obj: R,
64    unpack_xattrs: bool,
65    preserve_permissions: bool,
66    preserve_mtime: bool,
67    ignore_zeros: bool,
68}
69
70impl<R: Read + Unpin> ArchiveBuilder<R> {
71    /// Create a new builder.
72    pub fn new(obj: R) -> Self {
73        ArchiveBuilder {
74            unpack_xattrs: false,
75            preserve_permissions: false,
76            preserve_mtime: true,
77            ignore_zeros: false,
78            obj,
79        }
80    }
81
82    /// Indicate whether extended file attributes (xattrs on Unix) are preserved
83    /// when unpacking this archive.
84    ///
85    /// This flag is disabled by default and is currently only implemented on
86    /// Unix using xattr support. This may eventually be implemented for
87    /// Windows, however, if other archive implementations are found which do
88    /// this as well.
89    pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
90        self.unpack_xattrs = unpack_xattrs;
91        self
92    }
93
94    /// Indicate whether extended permissions (like suid on Unix) are preserved
95    /// when unpacking this entry.
96    ///
97    /// This flag is disabled by default and is currently only implemented on
98    /// Unix.
99    pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
100        self.preserve_permissions = preserve;
101        self
102    }
103
104    /// Indicate whether access time information is preserved when unpacking
105    /// this entry.
106    ///
107    /// This flag is enabled by default.
108    pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
109        self.preserve_mtime = preserve;
110        self
111    }
112
113    /// Ignore zeroed headers, which would otherwise indicate to the archive that it has no more
114    /// entries.
115    ///
116    /// This can be used in case multiple tar archives have been concatenated together.
117    pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
118        self.ignore_zeros = ignore_zeros;
119        self
120    }
121
122    /// Construct the archive, ready to accept inputs.
123    pub fn build(self) -> Archive<R> {
124        let Self {
125            unpack_xattrs,
126            preserve_permissions,
127            preserve_mtime,
128            ignore_zeros,
129            obj,
130        } = self;
131
132        Archive {
133            inner: Arc::new(Mutex::new(ArchiveInner {
134                unpack_xattrs,
135                preserve_permissions,
136                preserve_mtime,
137                ignore_zeros,
138                obj,
139                pos: 0,
140            })),
141        }
142    }
143}
144
145impl<R: Read + Unpin> Archive<R> {
146    /// Create a new archive with the underlying object as the reader.
147    pub fn new(obj: R) -> Archive<R> {
148        Archive {
149            inner: Arc::new(Mutex::new(ArchiveInner {
150                unpack_xattrs: false,
151                preserve_permissions: false,
152                preserve_mtime: true,
153                ignore_zeros: false,
154                obj,
155                pos: 0,
156            })),
157        }
158    }
159
160    /// Unwrap this archive, returning the underlying object.
161    pub fn into_inner(self) -> Result<R, Self> {
162        match Arc::try_unwrap(self.inner) {
163            Ok(inner) => Ok(inner.into_inner().unwrap().obj),
164            Err(inner) => Err(Self { inner }),
165        }
166    }
167
168    /// Construct an stream over the entries in this archive.
169    ///
170    /// Note that care must be taken to consider each entry within an archive in
171    /// sequence. If entries are processed out of sequence (from what the
172    /// stream returns), then the contents read for each entry may be
173    /// corrupted.
174    pub fn entries(self) -> io::Result<Entries<R>> {
175        if self.inner.lock().unwrap().pos != 0 {
176            return Err(other(
177                "cannot call entries unless archive is at \
178                 position 0",
179            ));
180        }
181
182        Ok(Entries {
183            archive: self,
184            current: State::default(),
185            fields: None,
186            gnu_longlink: None,
187            gnu_longname: None,
188            pax_extensions: None,
189        })
190    }
191
192    /// Construct an stream over the raw entries in this archive.
193    ///
194    /// Note that care must be taken to consider each entry within an archive in
195    /// sequence. If entries are processed out of sequence (from what the
196    /// stream returns), then the contents read for each entry may be
197    /// corrupted.
198    pub fn entries_raw(self) -> io::Result<RawEntries<R>> {
199        if self.inner.lock().unwrap().pos != 0 {
200            return Err(other(
201                "cannot call entries_raw unless archive is at \
202                 position 0",
203            ));
204        }
205
206        Ok(RawEntries {
207            archive: self,
208            current: (0, None, 0),
209        })
210    }
211
212    /// Unpacks the contents tarball into the specified `dst`.
213    ///
214    /// This function will iterate over the entire contents of this tarball,
215    /// extracting each file in turn to the location specified by the entry's
216    /// path name.
217    ///
218    /// This operation is relatively sensitive in that it will not write files
219    /// outside of the path specified by `dst`. Files in the archive which have
220    /// a '..' in their path are skipped during the unpacking process.
221    ///
222    /// # Examples
223    ///
224    #[cfg_attr(feature = "runtime-async-std", doc = "```no_run")]
225    #[cfg_attr(feature = "runtime-tokio", doc = "```ignore")]
226    /// # fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> { async_std::task::block_on(async {
227    /// #
228    /// use async_std::fs::File;
229    /// use async_tar::Archive;
230    ///
231    /// let mut ar = Archive::new(File::open("foo.tar").await?);
232    /// ar.unpack("foo").await?;
233    /// #
234    /// # Ok(()) }) }
235    /// ```
236    pub async fn unpack<P: AsRef<Path>>(self, dst: P) -> io::Result<()> {
237        let mut entries = self.entries()?;
238        let mut pinned = Pin::new(&mut entries);
239        let dst = dst.as_ref();
240
241        if symlink_metadata(dst).await.is_err() {
242            fs::create_dir_all(&dst)
243                .await
244                .map_err(|e| TarError::new(&format!("failed to create `{}`", dst.display()), e))?;
245        }
246
247        // Canonicalizing the dst directory will prepend the path with '\\?\'
248        // on windows which will allow windows APIs to treat the path as an
249        // extended-length path with a 32,767 character limit. Otherwise all
250        // unpacked paths over 260 characters will fail on creation with a
251        // NotFound exception.
252
253        let dst = &fs_canonicalize(dst)
254            .await
255            .unwrap_or_else(|_| dst.to_path_buf());
256
257        // Delay any directory entries until the end (they will be created if needed by
258        // descendants), to ensure that directory permissions do not interfer with descendant
259        // extraction.
260        let mut directories = Vec::new();
261        while let Some(entry) = pinned.next().await {
262            let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
263            if file.header().entry_type() == crate::EntryType::Directory {
264                directories.push(file);
265            } else {
266                file.unpack_in(dst).await?;
267            }
268        }
269        for mut dir in directories {
270            dir.unpack_in(dst).await?;
271        }
272
273        Ok(())
274    }
275}
276
277#[derive(Debug, Default)]
278struct State {
279    next: u64,
280    current_header: Option<Header>,
281    current_header_pos: usize,
282    current_ext: Option<GnuExtSparseHeader>,
283    pax_extensions: Option<Vec<u8>>,
284}
285
286/// Stream of `Entry`s.
287#[derive(Debug)]
288pub struct Entries<R: Read + Unpin> {
289    archive: Archive<R>,
290    current: State,
291    fields: Option<EntryFields<Archive<R>>>,
292    gnu_longname: Option<Vec<u8>>,
293    gnu_longlink: Option<Vec<u8>>,
294    pax_extensions: Option<Vec<u8>>,
295}
296
297macro_rules! ready_opt_err {
298    ($val:expr) => {
299        match ready!($val) {
300            Some(Ok(val)) => val,
301            Some(Err(err)) => return Poll::Ready(Some(Err(err))),
302            None => return Poll::Ready(None),
303        }
304    };
305}
306
307macro_rules! ready_err {
308    ($val:expr) => {
309        match ready!($val) {
310            Ok(val) => val,
311            Err(err) => return Poll::Ready(Some(Err(err))),
312        }
313    };
314}
315
316impl<R: Read + Unpin> Stream for Entries<R> {
317    type Item = io::Result<Entry<Archive<R>>>;
318
319    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
320        loop {
321            let Self {
322                current,
323                fields,
324                gnu_longname,
325                gnu_longlink,
326                archive,
327                pax_extensions,
328                ..
329            } = &mut *self;
330            let State {
331                next,
332                current_header,
333                current_header_pos,
334                pax_extensions: current_pax_extensions,
335                ..
336            } = current;
337
338            let new_fields = if let Some(fields) = fields.as_mut() {
339                fields
340            } else {
341                *fields = Some(EntryFields::from(ready_opt_err!(poll_next_raw(
342                    archive,
343                    next,
344                    current_header,
345                    current_header_pos,
346                    current_pax_extensions.as_deref(),
347                    cx
348                ))));
349                continue;
350            };
351
352            let is_recognized_header =
353                new_fields.header.as_gnu().is_some() || new_fields.header.as_ustar().is_some();
354            if is_recognized_header && new_fields.header.entry_type().is_gnu_longname() {
355                if gnu_longname.is_some() {
356                    return Poll::Ready(Some(Err(other(
357                        "two long name entries describing \
358                         the same member",
359                    ))));
360                }
361
362                *gnu_longname = Some(ready_err!(Pin::new(new_fields).poll_read_all(cx)));
363                *fields = None;
364                continue;
365            }
366
367            if is_recognized_header && new_fields.header.entry_type().is_gnu_longlink() {
368                if gnu_longlink.is_some() {
369                    return Poll::Ready(Some(Err(other(
370                        "two long name entries describing \
371                         the same member",
372                    ))));
373                }
374                *gnu_longlink = Some(ready_err!(Pin::new(new_fields).poll_read_all(cx)));
375                *fields = None;
376                continue;
377            }
378
379            if is_recognized_header && new_fields.header.entry_type().is_pax_local_extensions() {
380                if pax_extensions.is_some() {
381                    return Poll::Ready(Some(Err(other(
382                        "two pax extensions entries describing \
383                         the same member",
384                    ))));
385                }
386                *pax_extensions = Some(ready_err!(Pin::new(new_fields).poll_read_all(cx)));
387                *current_pax_extensions = pax_extensions.clone();
388                *fields = None;
389                continue;
390            }
391
392            new_fields.long_pathname = gnu_longname.take();
393            new_fields.long_linkname = gnu_longlink.take();
394            new_fields.pax_extensions = pax_extensions.take();
395            // these pax records apply to this entry only; clear the sizing copy
396            // so it doesn't leak onto the next entry
397            *current_pax_extensions = None;
398
399            let State {
400                next,
401                current_header_pos,
402                current_ext,
403                ..
404            } = current;
405            ready_err!(poll_parse_sparse_header(
406                archive,
407                next,
408                current_ext,
409                current_header_pos,
410                new_fields,
411                cx
412            ));
413
414            return Poll::Ready(Some(Ok(fields.take().unwrap().into_entry())));
415        }
416    }
417}
418
419/// Stream of raw `Entry`s.
420pub struct RawEntries<R: Read + Unpin> {
421    archive: Archive<R>,
422    current: (u64, Option<Header>, usize),
423}
424
425impl<R: Read + Unpin> Stream for RawEntries<R> {
426    type Item = io::Result<Entry<Archive<R>>>;
427
428    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
429        let archive = self.archive.clone();
430        let (next, current_header, current_header_pos) = &mut self.current;
431        poll_next_raw(&archive, next, current_header, current_header_pos, None, cx)
432    }
433}
434
435fn poll_next_raw<R: Read + Unpin>(
436    archive: &Archive<R>,
437    next: &mut u64,
438    current_header: &mut Option<Header>,
439    current_header_pos: &mut usize,
440    pax_extensions_data: Option<&[u8]>,
441    cx: &mut Context<'_>,
442) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
443    let mut header_pos = *next;
444
445    loop {
446        let archive = archive.clone();
447        // Seek to the start of the next header in the archive
448        if current_header.is_none() {
449            let delta = *next - archive.inner.lock().unwrap().pos;
450            match ready!(poll_skip(archive.clone(), cx, delta)) {
451                Ok(_) => {}
452                Err(err) => return Poll::Ready(Some(Err(err))),
453            }
454
455            *current_header = Some(Header::new_old());
456            *current_header_pos = 0;
457        }
458
459        let header = current_header.as_mut().unwrap();
460
461        // EOF is an indicator that we are at the end of the archive.
462        match ready!(poll_try_read_all(
463            archive.clone(),
464            cx,
465            header.as_mut_bytes(),
466            current_header_pos,
467        )) {
468            Ok(true) => {}
469            Ok(false) => return Poll::Ready(None),
470            Err(err) => return Poll::Ready(Some(Err(err))),
471        }
472
473        // If a header is not all zeros, we have another valid header.
474        // Otherwise, check if we are ignoring zeros and continue, or break as if this is the
475        // end of the archive.
476        if !header.as_bytes().iter().all(|i| *i == 0) {
477            *next += 512;
478            break;
479        }
480
481        if !archive.inner.lock().unwrap().ignore_zeros {
482            return Poll::Ready(None);
483        }
484
485        *next += 512;
486        header_pos = *next;
487    }
488
489    let header = current_header.as_mut().unwrap();
490
491    // Make sure the checksum is ok
492    let sum = header.as_bytes()[..148]
493        .iter()
494        .chain(&header.as_bytes()[156..])
495        .fold(0, |a, b| a + (*b as u32))
496        + 8 * 32;
497    let cksum = header.cksum()?;
498    if sum != cksum {
499        return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
500    }
501
502    let file_pos = *next;
503
504    let mut header = current_header.take().unwrap();
505
506    // when pax extensions are available, the size should come from there.
507    let mut size = header.entry_size()?;
508
509    // PAX extension records describe the next *file entry*, not an intermediary
510    // extension header. Applying a buffered PAX `size` (or uid/gid) to a GNU
511    // longname/longlink (`L`/`K`) or a PAX local/global (`x`/`g`) header would
512    // advance the stream cursor by the wrong amount and desync the parse, since
513    // those headers' body length must come from their own declared size.
514    let entry_type = header.entry_type();
515    let is_extension_header = entry_type.is_gnu_longname()
516        || entry_type.is_gnu_longlink()
517        || entry_type.is_pax_local_extensions()
518        || entry_type.is_pax_global_extensions();
519
520    // the size above will be overriden by the pax data if it has a size field.
521    // same for uid and gid, which will be overridden in the header itself.
522    if let Some(pax_extensions_data) = pax_extensions_data.filter(|_| !is_extension_header) {
523        let pax = pax_extensions(pax_extensions_data);
524        for extension in pax {
525            let extension = extension.map_err(|_e| other("pax extensions invalid"))?;
526
527            // ignore keys that aren't parsable as a string at this stage.
528            // that isn't relevant to the size/uid/gid processing.
529            let Some(key) = extension.key().ok() else {
530                continue;
531            };
532
533            match key {
534                "size" => {
535                    let size_str = extension
536                        .value()
537                        .map_err(|_e| other("failed to parse pax size as string"))?;
538                    size = size_str
539                        .parse::<u64>()
540                        .map_err(|_e| other("failed to parse pax size"))?;
541                }
542
543                "uid" => {
544                    let uid_str = extension
545                        .value()
546                        .map_err(|_e| other("failed to parse pax uid as string"))?;
547                    header.set_uid(
548                        uid_str
549                            .parse::<u64>()
550                            .map_err(|_e| other("failed to parse pax uid"))?,
551                    );
552                }
553
554                "gid" => {
555                    let gid_str = extension
556                        .value()
557                        .map_err(|_e| other("failed to parse pax gid as string"))?;
558                    header.set_gid(
559                        gid_str
560                            .parse::<u64>()
561                            .map_err(|_e| other("failed to parse pax gid"))?,
562                    );
563                }
564
565                _ => {
566                    continue;
567                }
568            }
569        }
570    }
571
572    let data = EntryIo::Data(archive.clone().take(size));
573
574    let ArchiveInner {
575        unpack_xattrs,
576        preserve_mtime,
577        preserve_permissions,
578        ..
579    } = &*archive.inner.lock().unwrap();
580
581    let ret = EntryFields {
582        size,
583        header_pos,
584        file_pos,
585        data: vec![data],
586        header,
587        long_pathname: None,
588        long_linkname: None,
589        pax_extensions: None,
590        unpack_xattrs: *unpack_xattrs,
591        preserve_permissions: *preserve_permissions,
592        preserve_mtime: *preserve_mtime,
593        read_state: None,
594    };
595
596    // Store where the next entry is, rounding up by 512 bytes (the size of
597    // a header);
598    let size = (size + 511) & !(512 - 1);
599    *next += size;
600
601    Poll::Ready(Some(Ok(ret.into_entry())))
602}
603
604fn poll_parse_sparse_header<R: Read + Unpin>(
605    archive: &Archive<R>,
606    next: &mut u64,
607    current_ext: &mut Option<GnuExtSparseHeader>,
608    current_ext_pos: &mut usize,
609    entry: &mut EntryFields<Archive<R>>,
610    cx: &mut Context<'_>,
611) -> Poll<io::Result<()>> {
612    if !entry.header.entry_type().is_gnu_sparse() {
613        return Poll::Ready(Ok(()));
614    }
615
616    let gnu = match entry.header.as_gnu() {
617        Some(gnu) => gnu,
618        None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
619    };
620
621    // Sparse files are represented internally as a list of blocks that are
622    // read. Blocks are either a bunch of 0's or they're data from the
623    // underlying archive.
624    //
625    // Blocks of a sparse file are described by the `GnuSparseHeader`
626    // structure, some of which are contained in `GnuHeader` but some of
627    // which may also be contained after the first header in further
628    // headers.
629    //
630    // We read off all the blocks here and use the `add_block` function to
631    // incrementally add them to the list of I/O block (in `entry.data`).
632    // The `add_block` function also validates that each chunk comes after
633    // the previous, we don't overrun the end of the file, and each block is
634    // aligned to a 512-byte boundary in the archive itself.
635    //
636    // At the end we verify that the sparse file size (`Header::size`) is
637    // the same as the current offset (described by the list of blocks) as
638    // well as the amount of data read equals the size of the entry
639    // (`Header::entry_size`).
640    entry.data.truncate(0);
641
642    let mut cur = 0;
643    let mut remaining = entry.size;
644    {
645        let data = &mut entry.data;
646        let reader = archive.clone();
647        let size = entry.size;
648        let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
649            if block.is_empty() {
650                return Ok(());
651            }
652            let off = block.offset()?;
653            let len = block.length()?;
654
655            if (size - remaining) % 512 != 0 {
656                return Err(other(
657                    "previous block in sparse file was not \
658                     aligned to 512-byte boundary",
659                ));
660            } else if off < cur {
661                return Err(other(
662                    "out of order or overlapping sparse \
663                     blocks",
664                ));
665            } else if cur < off {
666                let block = io::repeat(0).take((off - cur) as _);
667                data.push(EntryIo::Pad(block));
668            }
669            cur = off
670                .checked_add(len)
671                .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
672            remaining = remaining.checked_sub(len).ok_or_else(|| {
673                other(
674                    "sparse file consumed more data than the header \
675                     listed",
676                )
677            })?;
678            data.push(EntryIo::Data(reader.clone().take(len)));
679            Ok(())
680        };
681        for block in &gnu.sparse {
682            add_block(block)?
683        }
684        if gnu.is_extended() {
685            let started_header = current_ext.is_some();
686            if !started_header {
687                let mut ext = GnuExtSparseHeader::new();
688                ext.isextended[0] = 1;
689                *current_ext = Some(ext);
690                *current_ext_pos = 0;
691            }
692
693            let ext = current_ext.as_mut().unwrap();
694            while ext.is_extended() {
695                match ready!(poll_try_read_all(
696                    archive.clone(),
697                    cx,
698                    ext.as_mut_bytes(),
699                    current_ext_pos,
700                )) {
701                    Ok(true) => {}
702                    Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
703                    Err(err) => return Poll::Ready(Err(err)),
704                }
705
706                *next += 512;
707                for block in &ext.sparse {
708                    add_block(block)?;
709                }
710            }
711        }
712    }
713    if cur != gnu.real_size()? {
714        return Poll::Ready(Err(other(
715            "mismatch in sparse file chunks and \
716             size in header",
717        )));
718    }
719    entry.size = cur;
720    if remaining > 0 {
721        return Poll::Ready(Err(other(
722            "mismatch in sparse file chunks and \
723             entry size in header",
724        )));
725    }
726
727    Poll::Ready(Ok(()))
728}
729
730#[cfg(feature = "runtime-async-std")]
731impl<R: Read + Unpin> Read for Archive<R> {
732    fn poll_read(
733        self: Pin<&mut Self>,
734        cx: &mut Context<'_>,
735        into: &mut [u8],
736    ) -> Poll<io::Result<usize>> {
737        let mut lock = self.inner.lock().unwrap();
738        let mut inner = Pin::new(&mut *lock);
739        let r = Pin::new(&mut inner.obj);
740
741        let res = ready!(r.poll_read(cx, into));
742        match res {
743            Ok(i) => {
744                inner.pos += i as u64;
745                Poll::Ready(Ok(i))
746            }
747            Err(err) => Poll::Ready(Err(err)),
748        }
749    }
750}
751
752#[cfg(feature = "runtime-tokio")]
753impl<R: Read + Unpin> Read for Archive<R> {
754    fn poll_read(
755        self: Pin<&mut Self>,
756        cx: &mut Context<'_>,
757        into: &mut tokio::io::ReadBuf,
758    ) -> Poll<io::Result<()>> {
759        let mut lock = self.inner.lock().unwrap();
760        let mut inner = Pin::new(&mut *lock);
761        let r = Pin::new(&mut inner.obj);
762
763        let start = into.filled().len();
764        match ready!(r.poll_read(cx, into)) {
765            Ok(()) => {
766                let diff = into.filled().len() - start;
767                inner.pos += diff as u64;
768                Poll::Ready(Ok(()))
769            }
770            Err(err) => Poll::Ready(Err(err)),
771        }
772    }
773}
774
775/// Try to fill the buffer from the reader.
776///
777/// If the reader reaches its end before filling the buffer at all, returns `false`.
778/// Otherwise returns `true`.
779#[cfg(feature = "runtime-async-std")]
780fn poll_try_read_all<R: Read + Unpin>(
781    mut source: R,
782    cx: &mut Context<'_>,
783    buf: &mut [u8],
784    pos: &mut usize,
785) -> Poll<io::Result<bool>> {
786    while *pos < buf.len() {
787        match ready!(Pin::new(&mut source).poll_read(cx, &mut buf[*pos..])) {
788            Ok(0) => {
789                if *pos == 0 {
790                    return Poll::Ready(Ok(false));
791                }
792
793                return Poll::Ready(Err(other("failed to read entire block")));
794            }
795            Ok(n) => *pos += n,
796            Err(err) => return Poll::Ready(Err(err)),
797        }
798    }
799
800    *pos = 0;
801    Poll::Ready(Ok(true))
802}
803
804#[cfg(feature = "runtime-tokio")]
805fn poll_try_read_all<R: Read + Unpin>(
806    mut source: R,
807    cx: &mut Context<'_>,
808    buf: &mut [u8],
809    pos: &mut usize,
810) -> Poll<io::Result<bool>> {
811    while *pos < buf.len() {
812        let mut read_buf = io::ReadBuf::new(&mut buf[*pos..]);
813        let start = read_buf.filled().len();
814        match ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
815            Ok(()) => {
816                let diff = read_buf.filled().len() - start;
817                if diff == 0 {
818                    if *pos == 0 {
819                        return Poll::Ready(Ok(false));
820                    }
821
822                    return Poll::Ready(Err(other("failed to read entire block")));
823                } else {
824                    *pos += diff;
825                }
826            }
827            Err(err) => return Poll::Ready(Err(err)),
828        }
829    }
830
831    *pos = 0;
832    Poll::Ready(Ok(true))
833}
834
835/// Skip n bytes on the given source.
836#[cfg(feature = "runtime-async-std")]
837fn poll_skip<R: Read + Unpin>(
838    mut source: R,
839    cx: &mut Context<'_>,
840    mut amt: u64,
841) -> Poll<io::Result<()>> {
842    let mut buf = [0u8; 4096 * 8];
843    while amt > 0 {
844        let n = cmp::min(amt, buf.len() as u64);
845        match ready!(Pin::new(&mut source).poll_read(cx, &mut buf[..n as usize])) {
846            Ok(0) => {
847                return Poll::Ready(Err(other("unexpected EOF during skip")));
848            }
849            Ok(n) => {
850                amt -= n as u64;
851            }
852            Err(err) => return Poll::Ready(Err(err)),
853        }
854    }
855
856    Poll::Ready(Ok(()))
857}
858
859/// Skip n bytes on the given source.
860#[cfg(feature = "runtime-tokio")]
861fn poll_skip<R: Read + Unpin>(
862    mut source: R,
863    cx: &mut Context<'_>,
864    mut amt: u64,
865) -> Poll<io::Result<()>> {
866    let mut buf = [0u8; 4096 * 8];
867    while amt > 0 {
868        let n = cmp::min(amt, buf.len() as u64);
869        let mut read_buf = io::ReadBuf::new(&mut buf[..n as usize]);
870        let start = read_buf.filled().len();
871        match ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
872            Ok(()) => {
873                let diff = read_buf.filled().len() - start;
874                if diff == 0 {
875                    return Poll::Ready(Err(other("unexpected EOF during skip")));
876                } else {
877                    amt -= diff as u64;
878                }
879            }
880            Err(err) => return Poll::Ready(Err(err)),
881        }
882    }
883
884    Poll::Ready(Ok(()))
885}
886
887#[cfg(test)]
888mod tests {
889    use super::*;
890
891    assert_impl_all!(fs::File: Send, Sync);
892    assert_impl_all!(Entries<fs::File>: Send, Sync);
893    assert_impl_all!(Archive<fs::File>: Send, Sync);
894    assert_impl_all!(Entry<Archive<fs::File>>: Send, Sync);
895}