async_tar/
archive.rs

1#[cfg(feature = "runtime-tokio")]
2use std::path::Path;
3use std::{
4    cmp,
5    pin::Pin,
6    sync::{Arc, Mutex},
7    task::{Context, Poll},
8};
9
10#[cfg(feature = "runtime-async-std")]
11use async_std::{
12    fs, io,
13    io::prelude::*,
14    path::Path,
15    stream::{Stream, StreamExt},
16};
17use futures_core::ready;
18#[cfg(feature = "runtime-tokio")]
19use tokio::{
20    fs,
21    io::{self, AsyncRead as Read, AsyncReadExt},
22};
23#[cfg(feature = "runtime-tokio")]
24use tokio_stream::{Stream, StreamExt};
25
26use crate::{
27    Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
28    entry::{EntryFields, EntryIo},
29    error::TarError,
30    fs_canonicalize, other,
31    pax::pax_extensions,
32    symlink_metadata,
33};
34
35/// A top-level representation of an archive file.
36///
37/// This archive can have an entry added to it and it can be iterated over.
38#[derive(Debug)]
39pub struct Archive<R: Read + Unpin> {
40    inner: Arc<Mutex<ArchiveInner<R>>>,
41}
42
43impl<R: Read + Unpin> Clone for Archive<R> {
44    fn clone(&self) -> Self {
45        Archive {
46            inner: self.inner.clone(),
47        }
48    }
49}
50
51#[derive(Debug)]
52pub struct ArchiveInner<R: Read + Unpin> {
53    pos: u64,
54    unpack_xattrs: bool,
55    preserve_permissions: bool,
56    preserve_mtime: bool,
57    ignore_zeros: bool,
58    obj: R,
59}
60
61/// Configure the archive.
62pub struct ArchiveBuilder<R: Read + Unpin> {
63    obj: R,
64    unpack_xattrs: bool,
65    preserve_permissions: bool,
66    preserve_mtime: bool,
67    ignore_zeros: bool,
68}
69
70impl<R: Read + Unpin> ArchiveBuilder<R> {
71    /// Create a new builder.
72    pub fn new(obj: R) -> Self {
73        ArchiveBuilder {
74            unpack_xattrs: false,
75            preserve_permissions: false,
76            preserve_mtime: true,
77            ignore_zeros: false,
78            obj,
79        }
80    }
81
82    /// Indicate whether extended file attributes (xattrs on Unix) are preserved
83    /// when unpacking this archive.
84    ///
85    /// This flag is disabled by default and is currently only implemented on
86    /// Unix using xattr support. This may eventually be implemented for
87    /// Windows, however, if other archive implementations are found which do
88    /// this as well.
89    pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
90        self.unpack_xattrs = unpack_xattrs;
91        self
92    }
93
94    /// Indicate whether extended permissions (like suid on Unix) are preserved
95    /// when unpacking this entry.
96    ///
97    /// This flag is disabled by default and is currently only implemented on
98    /// Unix.
99    pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
100        self.preserve_permissions = preserve;
101        self
102    }
103
104    /// Indicate whether access time information is preserved when unpacking
105    /// this entry.
106    ///
107    /// This flag is enabled by default.
108    pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
109        self.preserve_mtime = preserve;
110        self
111    }
112
113    /// Ignore zeroed headers, which would otherwise indicate to the archive that it has no more
114    /// entries.
115    ///
116    /// This can be used in case multiple tar archives have been concatenated together.
117    pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
118        self.ignore_zeros = ignore_zeros;
119        self
120    }
121
122    /// Construct the archive, ready to accept inputs.
123    pub fn build(self) -> Archive<R> {
124        let Self {
125            unpack_xattrs,
126            preserve_permissions,
127            preserve_mtime,
128            ignore_zeros,
129            obj,
130        } = self;
131
132        Archive {
133            inner: Arc::new(Mutex::new(ArchiveInner {
134                unpack_xattrs,
135                preserve_permissions,
136                preserve_mtime,
137                ignore_zeros,
138                obj,
139                pos: 0,
140            })),
141        }
142    }
143}
144
145impl<R: Read + Unpin> Archive<R> {
146    /// Create a new archive with the underlying object as the reader.
147    pub fn new(obj: R) -> Archive<R> {
148        Archive {
149            inner: Arc::new(Mutex::new(ArchiveInner {
150                unpack_xattrs: false,
151                preserve_permissions: false,
152                preserve_mtime: true,
153                ignore_zeros: false,
154                obj,
155                pos: 0,
156            })),
157        }
158    }
159
160    /// Unwrap this archive, returning the underlying object.
161    pub fn into_inner(self) -> Result<R, Self> {
162        match Arc::try_unwrap(self.inner) {
163            Ok(inner) => Ok(inner.into_inner().unwrap().obj),
164            Err(inner) => Err(Self { inner }),
165        }
166    }
167
168    /// Construct an stream over the entries in this archive.
169    ///
170    /// Note that care must be taken to consider each entry within an archive in
171    /// sequence. If entries are processed out of sequence (from what the
172    /// stream returns), then the contents read for each entry may be
173    /// corrupted.
174    pub fn entries(self) -> io::Result<Entries<R>> {
175        if self.inner.lock().unwrap().pos != 0 {
176            return Err(other(
177                "cannot call entries unless archive is at \
178                 position 0",
179            ));
180        }
181
182        Ok(Entries {
183            archive: self,
184            current: State::default(),
185            fields: None,
186            gnu_longlink: None,
187            gnu_longname: None,
188            pax_extensions: None,
189        })
190    }
191
192    /// Construct an stream over the raw entries in this archive.
193    ///
194    /// Note that care must be taken to consider each entry within an archive in
195    /// sequence. If entries are processed out of sequence (from what the
196    /// stream returns), then the contents read for each entry may be
197    /// corrupted.
198    pub fn entries_raw(self) -> io::Result<RawEntries<R>> {
199        if self.inner.lock().unwrap().pos != 0 {
200            return Err(other(
201                "cannot call entries_raw unless archive is at \
202                 position 0",
203            ));
204        }
205
206        Ok(RawEntries {
207            archive: self,
208            current: (0, None, 0),
209        })
210    }
211
212    /// Unpacks the contents tarball into the specified `dst`.
213    ///
214    /// This function will iterate over the entire contents of this tarball,
215    /// extracting each file in turn to the location specified by the entry's
216    /// path name.
217    ///
218    /// This operation is relatively sensitive in that it will not write files
219    /// outside of the path specified by `dst`. Files in the archive which have
220    /// a '..' in their path are skipped during the unpacking process.
221    ///
222    /// # Examples
223    ///
224    #[cfg_attr(feature = "runtime-async-std", doc = "```no_run")]
225    #[cfg_attr(feature = "runtime-tokio", doc = "```ignore")]
226    /// # fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> { async_std::task::block_on(async {
227    /// #
228    /// use async_std::fs::File;
229    /// use async_tar::Archive;
230    ///
231    /// let mut ar = Archive::new(File::open("foo.tar").await?);
232    /// ar.unpack("foo").await?;
233    /// #
234    /// # Ok(()) }) }
235    /// ```
236    pub async fn unpack<P: AsRef<Path>>(self, dst: P) -> io::Result<()> {
237        let mut entries = self.entries()?;
238        let mut pinned = Pin::new(&mut entries);
239        let dst = dst.as_ref();
240
241        if symlink_metadata(dst).await.is_err() {
242            fs::create_dir_all(&dst)
243                .await
244                .map_err(|e| TarError::new(&format!("failed to create `{}`", dst.display()), e))?;
245        }
246
247        // Canonicalizing the dst directory will prepend the path with '\\?\'
248        // on windows which will allow windows APIs to treat the path as an
249        // extended-length path with a 32,767 character limit. Otherwise all
250        // unpacked paths over 260 characters will fail on creation with a
251        // NotFound exception.
252
253        let dst = &fs_canonicalize(dst)
254            .await
255            .unwrap_or_else(|_| dst.to_path_buf());
256
257        // Delay any directory entries until the end (they will be created if needed by
258        // descendants), to ensure that directory permissions do not interfer with descendant
259        // extraction.
260        let mut directories = Vec::new();
261        while let Some(entry) = pinned.next().await {
262            let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
263            if file.header().entry_type() == crate::EntryType::Directory {
264                directories.push(file);
265            } else {
266                file.unpack_in(dst).await?;
267            }
268        }
269        for mut dir in directories {
270            dir.unpack_in(dst).await?;
271        }
272
273        Ok(())
274    }
275}
276
277#[derive(Debug, Default)]
278struct State {
279    next: u64,
280    current_header: Option<Header>,
281    current_header_pos: usize,
282    current_ext: Option<GnuExtSparseHeader>,
283    pax_extensions: Option<Vec<u8>>,
284}
285
286/// Stream of `Entry`s.
287#[derive(Debug)]
288pub struct Entries<R: Read + Unpin> {
289    archive: Archive<R>,
290    current: State,
291    fields: Option<EntryFields<Archive<R>>>,
292    gnu_longname: Option<Vec<u8>>,
293    gnu_longlink: Option<Vec<u8>>,
294    pax_extensions: Option<Vec<u8>>,
295}
296
297macro_rules! ready_opt_err {
298    ($val:expr) => {
299        match ready!($val) {
300            Some(Ok(val)) => val,
301            Some(Err(err)) => return Poll::Ready(Some(Err(err))),
302            None => return Poll::Ready(None),
303        }
304    };
305}
306
307macro_rules! ready_err {
308    ($val:expr) => {
309        match ready!($val) {
310            Ok(val) => val,
311            Err(err) => return Poll::Ready(Some(Err(err))),
312        }
313    };
314}
315
316impl<R: Read + Unpin> Stream for Entries<R> {
317    type Item = io::Result<Entry<Archive<R>>>;
318
319    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
320        loop {
321            let Self {
322                current,
323                fields,
324                gnu_longname,
325                gnu_longlink,
326                archive,
327                pax_extensions,
328                ..
329            } = &mut *self;
330            let State {
331                next,
332                current_header,
333                current_header_pos,
334                pax_extensions: current_pax_extensions,
335                ..
336            } = current;
337
338            let new_fields = if let Some(fields) = fields.as_mut() {
339                fields
340            } else {
341                *fields = Some(EntryFields::from(ready_opt_err!(poll_next_raw(
342                    archive,
343                    next,
344                    current_header,
345                    current_header_pos,
346                    current_pax_extensions.as_deref(),
347                    cx
348                ))));
349                continue;
350            };
351
352            let is_recognized_header =
353                new_fields.header.as_gnu().is_some() || new_fields.header.as_ustar().is_some();
354            if is_recognized_header && new_fields.header.entry_type().is_gnu_longname() {
355                if gnu_longname.is_some() {
356                    return Poll::Ready(Some(Err(other(
357                        "two long name entries describing \
358                         the same member",
359                    ))));
360                }
361
362                *gnu_longname = Some(ready_err!(Pin::new(new_fields).poll_read_all(cx)));
363                *fields = None;
364                continue;
365            }
366
367            if is_recognized_header && new_fields.header.entry_type().is_gnu_longlink() {
368                if gnu_longlink.is_some() {
369                    return Poll::Ready(Some(Err(other(
370                        "two long name entries describing \
371                         the same member",
372                    ))));
373                }
374                *gnu_longlink = Some(ready_err!(Pin::new(new_fields).poll_read_all(cx)));
375                *fields = None;
376                continue;
377            }
378
379            if is_recognized_header && new_fields.header.entry_type().is_pax_local_extensions() {
380                if pax_extensions.is_some() {
381                    return Poll::Ready(Some(Err(other(
382                        "two pax extensions entries describing \
383                         the same member",
384                    ))));
385                }
386                *pax_extensions = Some(ready_err!(Pin::new(new_fields).poll_read_all(cx)));
387                *current_pax_extensions = pax_extensions.clone();
388                *fields = None;
389                continue;
390            }
391
392            new_fields.long_pathname = gnu_longname.take();
393            new_fields.long_linkname = gnu_longlink.take();
394            new_fields.pax_extensions = pax_extensions.take();
395
396            let State {
397                next,
398                current_header_pos,
399                current_ext,
400                ..
401            } = current;
402            ready_err!(poll_parse_sparse_header(
403                archive,
404                next,
405                current_ext,
406                current_header_pos,
407                new_fields,
408                cx
409            ));
410
411            return Poll::Ready(Some(Ok(fields.take().unwrap().into_entry())));
412        }
413    }
414}
415
416/// Stream of raw `Entry`s.
417pub struct RawEntries<R: Read + Unpin> {
418    archive: Archive<R>,
419    current: (u64, Option<Header>, usize),
420}
421
422impl<R: Read + Unpin> Stream for RawEntries<R> {
423    type Item = io::Result<Entry<Archive<R>>>;
424
425    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
426        let archive = self.archive.clone();
427        let (next, current_header, current_header_pos) = &mut self.current;
428        poll_next_raw(&archive, next, current_header, current_header_pos, None, cx)
429    }
430}
431
432fn poll_next_raw<R: Read + Unpin>(
433    archive: &Archive<R>,
434    next: &mut u64,
435    current_header: &mut Option<Header>,
436    current_header_pos: &mut usize,
437    pax_extensions_data: Option<&[u8]>,
438    cx: &mut Context<'_>,
439) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
440    let mut header_pos = *next;
441
442    loop {
443        let archive = archive.clone();
444        // Seek to the start of the next header in the archive
445        if current_header.is_none() {
446            let delta = *next - archive.inner.lock().unwrap().pos;
447            match ready!(poll_skip(archive.clone(), cx, delta)) {
448                Ok(_) => {}
449                Err(err) => return Poll::Ready(Some(Err(err))),
450            }
451
452            *current_header = Some(Header::new_old());
453            *current_header_pos = 0;
454        }
455
456        let header = current_header.as_mut().unwrap();
457
458        // EOF is an indicator that we are at the end of the archive.
459        match ready!(poll_try_read_all(
460            archive.clone(),
461            cx,
462            header.as_mut_bytes(),
463            current_header_pos,
464        )) {
465            Ok(true) => {}
466            Ok(false) => return Poll::Ready(None),
467            Err(err) => return Poll::Ready(Some(Err(err))),
468        }
469
470        // If a header is not all zeros, we have another valid header.
471        // Otherwise, check if we are ignoring zeros and continue, or break as if this is the
472        // end of the archive.
473        if !header.as_bytes().iter().all(|i| *i == 0) {
474            *next += 512;
475            break;
476        }
477
478        if !archive.inner.lock().unwrap().ignore_zeros {
479            return Poll::Ready(None);
480        }
481
482        *next += 512;
483        header_pos = *next;
484    }
485
486    let header = current_header.as_mut().unwrap();
487
488    // Make sure the checksum is ok
489    let sum = header.as_bytes()[..148]
490        .iter()
491        .chain(&header.as_bytes()[156..])
492        .fold(0, |a, b| a + (*b as u32))
493        + 8 * 32;
494    let cksum = header.cksum()?;
495    if sum != cksum {
496        return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
497    }
498
499    let file_pos = *next;
500
501    let mut header = current_header.take().unwrap();
502
503    // when pax extensions are available, the size should come from there.
504    let mut size = header.entry_size()?;
505
506    // the size above will be overriden by the pax data if it has a size field.
507    // same for uid and gid, which will be overridden in the header itself.
508    if let Some(pax_extensions_data) = pax_extensions_data {
509        let pax = pax_extensions(pax_extensions_data);
510        for extension in pax {
511            let extension = extension.map_err(|_e| other("pax extensions invalid"))?;
512
513            // ignore keys that aren't parsable as a string at this stage.
514            // that isn't relevant to the size/uid/gid processing.
515            let Some(key) = extension.key().ok() else {
516                continue;
517            };
518
519            match key {
520                "size" => {
521                    let size_str = extension
522                        .value()
523                        .map_err(|_e| other("failed to parse pax size as string"))?;
524                    size = size_str
525                        .parse::<u64>()
526                        .map_err(|_e| other("failed to parse pax size"))?;
527                }
528
529                "uid" => {
530                    let uid_str = extension
531                        .value()
532                        .map_err(|_e| other("failed to parse pax uid as string"))?;
533                    header.set_uid(
534                        uid_str
535                            .parse::<u64>()
536                            .map_err(|_e| other("failed to parse pax uid"))?,
537                    );
538                }
539
540                "gid" => {
541                    let gid_str = extension
542                        .value()
543                        .map_err(|_e| other("failed to parse pax gid as string"))?;
544                    header.set_gid(
545                        gid_str
546                            .parse::<u64>()
547                            .map_err(|_e| other("failed to parse pax gid"))?,
548                    );
549                }
550
551                _ => {
552                    continue;
553                }
554            }
555        }
556    }
557
558    let data = EntryIo::Data(archive.clone().take(size));
559
560    let ArchiveInner {
561        unpack_xattrs,
562        preserve_mtime,
563        preserve_permissions,
564        ..
565    } = &*archive.inner.lock().unwrap();
566
567    let ret = EntryFields {
568        size,
569        header_pos,
570        file_pos,
571        data: vec![data],
572        header,
573        long_pathname: None,
574        long_linkname: None,
575        pax_extensions: None,
576        unpack_xattrs: *unpack_xattrs,
577        preserve_permissions: *preserve_permissions,
578        preserve_mtime: *preserve_mtime,
579        read_state: None,
580    };
581
582    // Store where the next entry is, rounding up by 512 bytes (the size of
583    // a header);
584    let size = (size + 511) & !(512 - 1);
585    *next += size;
586
587    Poll::Ready(Some(Ok(ret.into_entry())))
588}
589
590fn poll_parse_sparse_header<R: Read + Unpin>(
591    archive: &Archive<R>,
592    next: &mut u64,
593    current_ext: &mut Option<GnuExtSparseHeader>,
594    current_ext_pos: &mut usize,
595    entry: &mut EntryFields<Archive<R>>,
596    cx: &mut Context<'_>,
597) -> Poll<io::Result<()>> {
598    if !entry.header.entry_type().is_gnu_sparse() {
599        return Poll::Ready(Ok(()));
600    }
601
602    let gnu = match entry.header.as_gnu() {
603        Some(gnu) => gnu,
604        None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
605    };
606
607    // Sparse files are represented internally as a list of blocks that are
608    // read. Blocks are either a bunch of 0's or they're data from the
609    // underlying archive.
610    //
611    // Blocks of a sparse file are described by the `GnuSparseHeader`
612    // structure, some of which are contained in `GnuHeader` but some of
613    // which may also be contained after the first header in further
614    // headers.
615    //
616    // We read off all the blocks here and use the `add_block` function to
617    // incrementally add them to the list of I/O block (in `entry.data`).
618    // The `add_block` function also validates that each chunk comes after
619    // the previous, we don't overrun the end of the file, and each block is
620    // aligned to a 512-byte boundary in the archive itself.
621    //
622    // At the end we verify that the sparse file size (`Header::size`) is
623    // the same as the current offset (described by the list of blocks) as
624    // well as the amount of data read equals the size of the entry
625    // (`Header::entry_size`).
626    entry.data.truncate(0);
627
628    let mut cur = 0;
629    let mut remaining = entry.size;
630    {
631        let data = &mut entry.data;
632        let reader = archive.clone();
633        let size = entry.size;
634        let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
635            if block.is_empty() {
636                return Ok(());
637            }
638            let off = block.offset()?;
639            let len = block.length()?;
640
641            if (size - remaining) % 512 != 0 {
642                return Err(other(
643                    "previous block in sparse file was not \
644                     aligned to 512-byte boundary",
645                ));
646            } else if off < cur {
647                return Err(other(
648                    "out of order or overlapping sparse \
649                     blocks",
650                ));
651            } else if cur < off {
652                let block = io::repeat(0).take((off - cur) as _);
653                data.push(EntryIo::Pad(block));
654            }
655            cur = off
656                .checked_add(len)
657                .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
658            remaining = remaining.checked_sub(len).ok_or_else(|| {
659                other(
660                    "sparse file consumed more data than the header \
661                     listed",
662                )
663            })?;
664            data.push(EntryIo::Data(reader.clone().take(len)));
665            Ok(())
666        };
667        for block in &gnu.sparse {
668            add_block(block)?
669        }
670        if gnu.is_extended() {
671            let started_header = current_ext.is_some();
672            if !started_header {
673                let mut ext = GnuExtSparseHeader::new();
674                ext.isextended[0] = 1;
675                *current_ext = Some(ext);
676                *current_ext_pos = 0;
677            }
678
679            let ext = current_ext.as_mut().unwrap();
680            while ext.is_extended() {
681                match ready!(poll_try_read_all(
682                    archive.clone(),
683                    cx,
684                    ext.as_mut_bytes(),
685                    current_ext_pos,
686                )) {
687                    Ok(true) => {}
688                    Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
689                    Err(err) => return Poll::Ready(Err(err)),
690                }
691
692                *next += 512;
693                for block in &ext.sparse {
694                    add_block(block)?;
695                }
696            }
697        }
698    }
699    if cur != gnu.real_size()? {
700        return Poll::Ready(Err(other(
701            "mismatch in sparse file chunks and \
702             size in header",
703        )));
704    }
705    entry.size = cur;
706    if remaining > 0 {
707        return Poll::Ready(Err(other(
708            "mismatch in sparse file chunks and \
709             entry size in header",
710        )));
711    }
712
713    Poll::Ready(Ok(()))
714}
715
716#[cfg(feature = "runtime-async-std")]
717impl<R: Read + Unpin> Read for Archive<R> {
718    fn poll_read(
719        self: Pin<&mut Self>,
720        cx: &mut Context<'_>,
721        into: &mut [u8],
722    ) -> Poll<io::Result<usize>> {
723        let mut lock = self.inner.lock().unwrap();
724        let mut inner = Pin::new(&mut *lock);
725        let r = Pin::new(&mut inner.obj);
726
727        let res = ready!(r.poll_read(cx, into));
728        match res {
729            Ok(i) => {
730                inner.pos += i as u64;
731                Poll::Ready(Ok(i))
732            }
733            Err(err) => Poll::Ready(Err(err)),
734        }
735    }
736}
737
738#[cfg(feature = "runtime-tokio")]
739impl<R: Read + Unpin> Read for Archive<R> {
740    fn poll_read(
741        self: Pin<&mut Self>,
742        cx: &mut Context<'_>,
743        into: &mut tokio::io::ReadBuf,
744    ) -> Poll<io::Result<()>> {
745        let mut lock = self.inner.lock().unwrap();
746        let mut inner = Pin::new(&mut *lock);
747        let r = Pin::new(&mut inner.obj);
748
749        let start = into.filled().len();
750        match ready!(r.poll_read(cx, into)) {
751            Ok(()) => {
752                let diff = into.filled().len() - start;
753                inner.pos += diff as u64;
754                Poll::Ready(Ok(()))
755            }
756            Err(err) => Poll::Ready(Err(err)),
757        }
758    }
759}
760
761/// Try to fill the buffer from the reader.
762///
763/// If the reader reaches its end before filling the buffer at all, returns `false`.
764/// Otherwise returns `true`.
765#[cfg(feature = "runtime-async-std")]
766fn poll_try_read_all<R: Read + Unpin>(
767    mut source: R,
768    cx: &mut Context<'_>,
769    buf: &mut [u8],
770    pos: &mut usize,
771) -> Poll<io::Result<bool>> {
772    while *pos < buf.len() {
773        match ready!(Pin::new(&mut source).poll_read(cx, &mut buf[*pos..])) {
774            Ok(0) => {
775                if *pos == 0 {
776                    return Poll::Ready(Ok(false));
777                }
778
779                return Poll::Ready(Err(other("failed to read entire block")));
780            }
781            Ok(n) => *pos += n,
782            Err(err) => return Poll::Ready(Err(err)),
783        }
784    }
785
786    *pos = 0;
787    Poll::Ready(Ok(true))
788}
789
790#[cfg(feature = "runtime-tokio")]
791fn poll_try_read_all<R: Read + Unpin>(
792    mut source: R,
793    cx: &mut Context<'_>,
794    buf: &mut [u8],
795    pos: &mut usize,
796) -> Poll<io::Result<bool>> {
797    while *pos < buf.len() {
798        let mut read_buf = io::ReadBuf::new(&mut buf[*pos..]);
799        let start = read_buf.filled().len();
800        match ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
801            Ok(()) => {
802                let diff = read_buf.filled().len() - start;
803                if diff == 0 {
804                    if *pos == 0 {
805                        return Poll::Ready(Ok(false));
806                    }
807
808                    return Poll::Ready(Err(other("failed to read entire block")));
809                } else {
810                    *pos += diff;
811                }
812            }
813            Err(err) => return Poll::Ready(Err(err)),
814        }
815    }
816
817    *pos = 0;
818    Poll::Ready(Ok(true))
819}
820
821/// Skip n bytes on the given source.
822#[cfg(feature = "runtime-async-std")]
823fn poll_skip<R: Read + Unpin>(
824    mut source: R,
825    cx: &mut Context<'_>,
826    mut amt: u64,
827) -> Poll<io::Result<()>> {
828    let mut buf = [0u8; 4096 * 8];
829    while amt > 0 {
830        let n = cmp::min(amt, buf.len() as u64);
831        match ready!(Pin::new(&mut source).poll_read(cx, &mut buf[..n as usize])) {
832            Ok(0) => {
833                return Poll::Ready(Err(other("unexpected EOF during skip")));
834            }
835            Ok(n) => {
836                amt -= n as u64;
837            }
838            Err(err) => return Poll::Ready(Err(err)),
839        }
840    }
841
842    Poll::Ready(Ok(()))
843}
844
845/// Skip n bytes on the given source.
846#[cfg(feature = "runtime-tokio")]
847fn poll_skip<R: Read + Unpin>(
848    mut source: R,
849    cx: &mut Context<'_>,
850    mut amt: u64,
851) -> Poll<io::Result<()>> {
852    let mut buf = [0u8; 4096 * 8];
853    while amt > 0 {
854        let n = cmp::min(amt, buf.len() as u64);
855        let mut read_buf = io::ReadBuf::new(&mut buf[..n as usize]);
856        let start = read_buf.filled().len();
857        match ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
858            Ok(()) => {
859                let diff = read_buf.filled().len() - start;
860                if diff == 0 {
861                    return Poll::Ready(Err(other("unexpected EOF during skip")));
862                } else {
863                    amt -= diff as u64;
864                }
865            }
866            Err(err) => return Poll::Ready(Err(err)),
867        }
868    }
869
870    Poll::Ready(Ok(()))
871}
872
873#[cfg(test)]
874mod tests {
875    use super::*;
876
877    assert_impl_all!(fs::File: Send, Sync);
878    assert_impl_all!(Entries<fs::File>: Send, Sync);
879    assert_impl_all!(Archive<fs::File>: Send, Sync);
880    assert_impl_all!(Entry<Archive<fs::File>>: Send, Sync);
881}