1#[cfg(feature = "runtime-tokio")]
2use std::path::Path;
3use std::{
4 cmp,
5 pin::Pin,
6 sync::{Arc, Mutex},
7 task::{Context, Poll},
8};
9
10#[cfg(feature = "runtime-async-std")]
11use async_std::{
12 fs, io,
13 io::prelude::*,
14 path::Path,
15 stream::{Stream, StreamExt},
16};
17use futures_core::ready;
18#[cfg(feature = "runtime-tokio")]
19use tokio::{
20 fs,
21 io::{self, AsyncRead as Read, AsyncReadExt},
22};
23#[cfg(feature = "runtime-tokio")]
24use tokio_stream::{Stream, StreamExt};
25
26use crate::{
27 Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
28 entry::{EntryFields, EntryIo},
29 error::TarError,
30 fs_canonicalize, other,
31 pax::pax_extensions,
32 symlink_metadata,
33};
34
35#[derive(Debug)]
39pub struct Archive<R: Read + Unpin> {
40 inner: Arc<Mutex<ArchiveInner<R>>>,
41}
42
43impl<R: Read + Unpin> Clone for Archive<R> {
44 fn clone(&self) -> Self {
45 Archive {
46 inner: self.inner.clone(),
47 }
48 }
49}
50
51#[derive(Debug)]
52pub struct ArchiveInner<R: Read + Unpin> {
53 pos: u64,
54 unpack_xattrs: bool,
55 preserve_permissions: bool,
56 preserve_mtime: bool,
57 ignore_zeros: bool,
58 obj: R,
59}
60
61pub struct ArchiveBuilder<R: Read + Unpin> {
63 obj: R,
64 unpack_xattrs: bool,
65 preserve_permissions: bool,
66 preserve_mtime: bool,
67 ignore_zeros: bool,
68}
69
70impl<R: Read + Unpin> ArchiveBuilder<R> {
71 pub fn new(obj: R) -> Self {
73 ArchiveBuilder {
74 unpack_xattrs: false,
75 preserve_permissions: false,
76 preserve_mtime: true,
77 ignore_zeros: false,
78 obj,
79 }
80 }
81
82 pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
90 self.unpack_xattrs = unpack_xattrs;
91 self
92 }
93
94 pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
100 self.preserve_permissions = preserve;
101 self
102 }
103
104 pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
109 self.preserve_mtime = preserve;
110 self
111 }
112
113 pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
118 self.ignore_zeros = ignore_zeros;
119 self
120 }
121
122 pub fn build(self) -> Archive<R> {
124 let Self {
125 unpack_xattrs,
126 preserve_permissions,
127 preserve_mtime,
128 ignore_zeros,
129 obj,
130 } = self;
131
132 Archive {
133 inner: Arc::new(Mutex::new(ArchiveInner {
134 unpack_xattrs,
135 preserve_permissions,
136 preserve_mtime,
137 ignore_zeros,
138 obj,
139 pos: 0,
140 })),
141 }
142 }
143}
144
145impl<R: Read + Unpin> Archive<R> {
146 pub fn new(obj: R) -> Archive<R> {
148 Archive {
149 inner: Arc::new(Mutex::new(ArchiveInner {
150 unpack_xattrs: false,
151 preserve_permissions: false,
152 preserve_mtime: true,
153 ignore_zeros: false,
154 obj,
155 pos: 0,
156 })),
157 }
158 }
159
160 pub fn into_inner(self) -> Result<R, Self> {
162 match Arc::try_unwrap(self.inner) {
163 Ok(inner) => Ok(inner.into_inner().unwrap().obj),
164 Err(inner) => Err(Self { inner }),
165 }
166 }
167
168 pub fn entries(self) -> io::Result<Entries<R>> {
175 if self.inner.lock().unwrap().pos != 0 {
176 return Err(other(
177 "cannot call entries unless archive is at \
178 position 0",
179 ));
180 }
181
182 Ok(Entries {
183 archive: self,
184 current: State::default(),
185 fields: None,
186 gnu_longlink: None,
187 gnu_longname: None,
188 pax_extensions: None,
189 })
190 }
191
192 pub fn entries_raw(self) -> io::Result<RawEntries<R>> {
199 if self.inner.lock().unwrap().pos != 0 {
200 return Err(other(
201 "cannot call entries_raw unless archive is at \
202 position 0",
203 ));
204 }
205
206 Ok(RawEntries {
207 archive: self,
208 current: (0, None, 0),
209 })
210 }
211
212 #[cfg_attr(feature = "runtime-async-std", doc = "```no_run")]
225 #[cfg_attr(feature = "runtime-tokio", doc = "```ignore")]
226 pub async fn unpack<P: AsRef<Path>>(self, dst: P) -> io::Result<()> {
237 let mut entries = self.entries()?;
238 let mut pinned = Pin::new(&mut entries);
239 let dst = dst.as_ref();
240
241 if symlink_metadata(dst).await.is_err() {
242 fs::create_dir_all(&dst)
243 .await
244 .map_err(|e| TarError::new(&format!("failed to create `{}`", dst.display()), e))?;
245 }
246
247 let dst = &fs_canonicalize(dst)
254 .await
255 .unwrap_or_else(|_| dst.to_path_buf());
256
257 let mut directories = Vec::new();
261 while let Some(entry) = pinned.next().await {
262 let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
263 if file.header().entry_type() == crate::EntryType::Directory {
264 directories.push(file);
265 } else {
266 file.unpack_in(dst).await?;
267 }
268 }
269 for mut dir in directories {
270 dir.unpack_in(dst).await?;
271 }
272
273 Ok(())
274 }
275}
276
277#[derive(Debug, Default)]
278struct State {
279 next: u64,
280 current_header: Option<Header>,
281 current_header_pos: usize,
282 current_ext: Option<GnuExtSparseHeader>,
283 pax_extensions: Option<Vec<u8>>,
284}
285
286#[derive(Debug)]
288pub struct Entries<R: Read + Unpin> {
289 archive: Archive<R>,
290 current: State,
291 fields: Option<EntryFields<Archive<R>>>,
292 gnu_longname: Option<Vec<u8>>,
293 gnu_longlink: Option<Vec<u8>>,
294 pax_extensions: Option<Vec<u8>>,
295}
296
297macro_rules! ready_opt_err {
298 ($val:expr) => {
299 match ready!($val) {
300 Some(Ok(val)) => val,
301 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
302 None => return Poll::Ready(None),
303 }
304 };
305}
306
307macro_rules! ready_err {
308 ($val:expr) => {
309 match ready!($val) {
310 Ok(val) => val,
311 Err(err) => return Poll::Ready(Some(Err(err))),
312 }
313 };
314}
315
316impl<R: Read + Unpin> Stream for Entries<R> {
317 type Item = io::Result<Entry<Archive<R>>>;
318
319 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
320 loop {
321 let Self {
322 current,
323 fields,
324 gnu_longname,
325 gnu_longlink,
326 archive,
327 pax_extensions,
328 ..
329 } = &mut *self;
330 let State {
331 next,
332 current_header,
333 current_header_pos,
334 pax_extensions: current_pax_extensions,
335 ..
336 } = current;
337
338 let new_fields = if let Some(fields) = fields.as_mut() {
339 fields
340 } else {
341 *fields = Some(EntryFields::from(ready_opt_err!(poll_next_raw(
342 archive,
343 next,
344 current_header,
345 current_header_pos,
346 current_pax_extensions.as_deref(),
347 cx
348 ))));
349 continue;
350 };
351
352 let is_recognized_header =
353 new_fields.header.as_gnu().is_some() || new_fields.header.as_ustar().is_some();
354 if is_recognized_header && new_fields.header.entry_type().is_gnu_longname() {
355 if gnu_longname.is_some() {
356 return Poll::Ready(Some(Err(other(
357 "two long name entries describing \
358 the same member",
359 ))));
360 }
361
362 *gnu_longname = Some(ready_err!(Pin::new(new_fields).poll_read_all(cx)));
363 *fields = None;
364 continue;
365 }
366
367 if is_recognized_header && new_fields.header.entry_type().is_gnu_longlink() {
368 if gnu_longlink.is_some() {
369 return Poll::Ready(Some(Err(other(
370 "two long name entries describing \
371 the same member",
372 ))));
373 }
374 *gnu_longlink = Some(ready_err!(Pin::new(new_fields).poll_read_all(cx)));
375 *fields = None;
376 continue;
377 }
378
379 if is_recognized_header && new_fields.header.entry_type().is_pax_local_extensions() {
380 if pax_extensions.is_some() {
381 return Poll::Ready(Some(Err(other(
382 "two pax extensions entries describing \
383 the same member",
384 ))));
385 }
386 *pax_extensions = Some(ready_err!(Pin::new(new_fields).poll_read_all(cx)));
387 *current_pax_extensions = pax_extensions.clone();
388 *fields = None;
389 continue;
390 }
391
392 new_fields.long_pathname = gnu_longname.take();
393 new_fields.long_linkname = gnu_longlink.take();
394 new_fields.pax_extensions = pax_extensions.take();
395
396 let State {
397 next,
398 current_header_pos,
399 current_ext,
400 ..
401 } = current;
402 ready_err!(poll_parse_sparse_header(
403 archive,
404 next,
405 current_ext,
406 current_header_pos,
407 new_fields,
408 cx
409 ));
410
411 return Poll::Ready(Some(Ok(fields.take().unwrap().into_entry())));
412 }
413 }
414}
415
416pub struct RawEntries<R: Read + Unpin> {
418 archive: Archive<R>,
419 current: (u64, Option<Header>, usize),
420}
421
422impl<R: Read + Unpin> Stream for RawEntries<R> {
423 type Item = io::Result<Entry<Archive<R>>>;
424
425 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
426 let archive = self.archive.clone();
427 let (next, current_header, current_header_pos) = &mut self.current;
428 poll_next_raw(&archive, next, current_header, current_header_pos, None, cx)
429 }
430}
431
432fn poll_next_raw<R: Read + Unpin>(
433 archive: &Archive<R>,
434 next: &mut u64,
435 current_header: &mut Option<Header>,
436 current_header_pos: &mut usize,
437 pax_extensions_data: Option<&[u8]>,
438 cx: &mut Context<'_>,
439) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
440 let mut header_pos = *next;
441
442 loop {
443 let archive = archive.clone();
444 if current_header.is_none() {
446 let delta = *next - archive.inner.lock().unwrap().pos;
447 match ready!(poll_skip(archive.clone(), cx, delta)) {
448 Ok(_) => {}
449 Err(err) => return Poll::Ready(Some(Err(err))),
450 }
451
452 *current_header = Some(Header::new_old());
453 *current_header_pos = 0;
454 }
455
456 let header = current_header.as_mut().unwrap();
457
458 match ready!(poll_try_read_all(
460 archive.clone(),
461 cx,
462 header.as_mut_bytes(),
463 current_header_pos,
464 )) {
465 Ok(true) => {}
466 Ok(false) => return Poll::Ready(None),
467 Err(err) => return Poll::Ready(Some(Err(err))),
468 }
469
470 if !header.as_bytes().iter().all(|i| *i == 0) {
474 *next += 512;
475 break;
476 }
477
478 if !archive.inner.lock().unwrap().ignore_zeros {
479 return Poll::Ready(None);
480 }
481
482 *next += 512;
483 header_pos = *next;
484 }
485
486 let header = current_header.as_mut().unwrap();
487
488 let sum = header.as_bytes()[..148]
490 .iter()
491 .chain(&header.as_bytes()[156..])
492 .fold(0, |a, b| a + (*b as u32))
493 + 8 * 32;
494 let cksum = header.cksum()?;
495 if sum != cksum {
496 return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
497 }
498
499 let file_pos = *next;
500
501 let mut header = current_header.take().unwrap();
502
503 let mut size = header.entry_size()?;
505
506 if let Some(pax_extensions_data) = pax_extensions_data {
509 let pax = pax_extensions(pax_extensions_data);
510 for extension in pax {
511 let extension = extension.map_err(|_e| other("pax extensions invalid"))?;
512
513 let Some(key) = extension.key().ok() else {
516 continue;
517 };
518
519 match key {
520 "size" => {
521 let size_str = extension
522 .value()
523 .map_err(|_e| other("failed to parse pax size as string"))?;
524 size = size_str
525 .parse::<u64>()
526 .map_err(|_e| other("failed to parse pax size"))?;
527 }
528
529 "uid" => {
530 let uid_str = extension
531 .value()
532 .map_err(|_e| other("failed to parse pax uid as string"))?;
533 header.set_uid(
534 uid_str
535 .parse::<u64>()
536 .map_err(|_e| other("failed to parse pax uid"))?,
537 );
538 }
539
540 "gid" => {
541 let gid_str = extension
542 .value()
543 .map_err(|_e| other("failed to parse pax gid as string"))?;
544 header.set_gid(
545 gid_str
546 .parse::<u64>()
547 .map_err(|_e| other("failed to parse pax gid"))?,
548 );
549 }
550
551 _ => {
552 continue;
553 }
554 }
555 }
556 }
557
558 let data = EntryIo::Data(archive.clone().take(size));
559
560 let ArchiveInner {
561 unpack_xattrs,
562 preserve_mtime,
563 preserve_permissions,
564 ..
565 } = &*archive.inner.lock().unwrap();
566
567 let ret = EntryFields {
568 size,
569 header_pos,
570 file_pos,
571 data: vec![data],
572 header,
573 long_pathname: None,
574 long_linkname: None,
575 pax_extensions: None,
576 unpack_xattrs: *unpack_xattrs,
577 preserve_permissions: *preserve_permissions,
578 preserve_mtime: *preserve_mtime,
579 read_state: None,
580 };
581
582 let size = (size + 511) & !(512 - 1);
585 *next += size;
586
587 Poll::Ready(Some(Ok(ret.into_entry())))
588}
589
590fn poll_parse_sparse_header<R: Read + Unpin>(
591 archive: &Archive<R>,
592 next: &mut u64,
593 current_ext: &mut Option<GnuExtSparseHeader>,
594 current_ext_pos: &mut usize,
595 entry: &mut EntryFields<Archive<R>>,
596 cx: &mut Context<'_>,
597) -> Poll<io::Result<()>> {
598 if !entry.header.entry_type().is_gnu_sparse() {
599 return Poll::Ready(Ok(()));
600 }
601
602 let gnu = match entry.header.as_gnu() {
603 Some(gnu) => gnu,
604 None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
605 };
606
607 entry.data.truncate(0);
627
628 let mut cur = 0;
629 let mut remaining = entry.size;
630 {
631 let data = &mut entry.data;
632 let reader = archive.clone();
633 let size = entry.size;
634 let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
635 if block.is_empty() {
636 return Ok(());
637 }
638 let off = block.offset()?;
639 let len = block.length()?;
640
641 if (size - remaining) % 512 != 0 {
642 return Err(other(
643 "previous block in sparse file was not \
644 aligned to 512-byte boundary",
645 ));
646 } else if off < cur {
647 return Err(other(
648 "out of order or overlapping sparse \
649 blocks",
650 ));
651 } else if cur < off {
652 let block = io::repeat(0).take((off - cur) as _);
653 data.push(EntryIo::Pad(block));
654 }
655 cur = off
656 .checked_add(len)
657 .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
658 remaining = remaining.checked_sub(len).ok_or_else(|| {
659 other(
660 "sparse file consumed more data than the header \
661 listed",
662 )
663 })?;
664 data.push(EntryIo::Data(reader.clone().take(len)));
665 Ok(())
666 };
667 for block in &gnu.sparse {
668 add_block(block)?
669 }
670 if gnu.is_extended() {
671 let started_header = current_ext.is_some();
672 if !started_header {
673 let mut ext = GnuExtSparseHeader::new();
674 ext.isextended[0] = 1;
675 *current_ext = Some(ext);
676 *current_ext_pos = 0;
677 }
678
679 let ext = current_ext.as_mut().unwrap();
680 while ext.is_extended() {
681 match ready!(poll_try_read_all(
682 archive.clone(),
683 cx,
684 ext.as_mut_bytes(),
685 current_ext_pos,
686 )) {
687 Ok(true) => {}
688 Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
689 Err(err) => return Poll::Ready(Err(err)),
690 }
691
692 *next += 512;
693 for block in &ext.sparse {
694 add_block(block)?;
695 }
696 }
697 }
698 }
699 if cur != gnu.real_size()? {
700 return Poll::Ready(Err(other(
701 "mismatch in sparse file chunks and \
702 size in header",
703 )));
704 }
705 entry.size = cur;
706 if remaining > 0 {
707 return Poll::Ready(Err(other(
708 "mismatch in sparse file chunks and \
709 entry size in header",
710 )));
711 }
712
713 Poll::Ready(Ok(()))
714}
715
716#[cfg(feature = "runtime-async-std")]
717impl<R: Read + Unpin> Read for Archive<R> {
718 fn poll_read(
719 self: Pin<&mut Self>,
720 cx: &mut Context<'_>,
721 into: &mut [u8],
722 ) -> Poll<io::Result<usize>> {
723 let mut lock = self.inner.lock().unwrap();
724 let mut inner = Pin::new(&mut *lock);
725 let r = Pin::new(&mut inner.obj);
726
727 let res = ready!(r.poll_read(cx, into));
728 match res {
729 Ok(i) => {
730 inner.pos += i as u64;
731 Poll::Ready(Ok(i))
732 }
733 Err(err) => Poll::Ready(Err(err)),
734 }
735 }
736}
737
738#[cfg(feature = "runtime-tokio")]
739impl<R: Read + Unpin> Read for Archive<R> {
740 fn poll_read(
741 self: Pin<&mut Self>,
742 cx: &mut Context<'_>,
743 into: &mut tokio::io::ReadBuf,
744 ) -> Poll<io::Result<()>> {
745 let mut lock = self.inner.lock().unwrap();
746 let mut inner = Pin::new(&mut *lock);
747 let r = Pin::new(&mut inner.obj);
748
749 let start = into.filled().len();
750 match ready!(r.poll_read(cx, into)) {
751 Ok(()) => {
752 let diff = into.filled().len() - start;
753 inner.pos += diff as u64;
754 Poll::Ready(Ok(()))
755 }
756 Err(err) => Poll::Ready(Err(err)),
757 }
758 }
759}
760
761#[cfg(feature = "runtime-async-std")]
766fn poll_try_read_all<R: Read + Unpin>(
767 mut source: R,
768 cx: &mut Context<'_>,
769 buf: &mut [u8],
770 pos: &mut usize,
771) -> Poll<io::Result<bool>> {
772 while *pos < buf.len() {
773 match ready!(Pin::new(&mut source).poll_read(cx, &mut buf[*pos..])) {
774 Ok(0) => {
775 if *pos == 0 {
776 return Poll::Ready(Ok(false));
777 }
778
779 return Poll::Ready(Err(other("failed to read entire block")));
780 }
781 Ok(n) => *pos += n,
782 Err(err) => return Poll::Ready(Err(err)),
783 }
784 }
785
786 *pos = 0;
787 Poll::Ready(Ok(true))
788}
789
790#[cfg(feature = "runtime-tokio")]
791fn poll_try_read_all<R: Read + Unpin>(
792 mut source: R,
793 cx: &mut Context<'_>,
794 buf: &mut [u8],
795 pos: &mut usize,
796) -> Poll<io::Result<bool>> {
797 while *pos < buf.len() {
798 let mut read_buf = io::ReadBuf::new(&mut buf[*pos..]);
799 let start = read_buf.filled().len();
800 match ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
801 Ok(()) => {
802 let diff = read_buf.filled().len() - start;
803 if diff == 0 {
804 if *pos == 0 {
805 return Poll::Ready(Ok(false));
806 }
807
808 return Poll::Ready(Err(other("failed to read entire block")));
809 } else {
810 *pos += diff;
811 }
812 }
813 Err(err) => return Poll::Ready(Err(err)),
814 }
815 }
816
817 *pos = 0;
818 Poll::Ready(Ok(true))
819}
820
821#[cfg(feature = "runtime-async-std")]
823fn poll_skip<R: Read + Unpin>(
824 mut source: R,
825 cx: &mut Context<'_>,
826 mut amt: u64,
827) -> Poll<io::Result<()>> {
828 let mut buf = [0u8; 4096 * 8];
829 while amt > 0 {
830 let n = cmp::min(amt, buf.len() as u64);
831 match ready!(Pin::new(&mut source).poll_read(cx, &mut buf[..n as usize])) {
832 Ok(0) => {
833 return Poll::Ready(Err(other("unexpected EOF during skip")));
834 }
835 Ok(n) => {
836 amt -= n as u64;
837 }
838 Err(err) => return Poll::Ready(Err(err)),
839 }
840 }
841
842 Poll::Ready(Ok(()))
843}
844
845#[cfg(feature = "runtime-tokio")]
847fn poll_skip<R: Read + Unpin>(
848 mut source: R,
849 cx: &mut Context<'_>,
850 mut amt: u64,
851) -> Poll<io::Result<()>> {
852 let mut buf = [0u8; 4096 * 8];
853 while amt > 0 {
854 let n = cmp::min(amt, buf.len() as u64);
855 let mut read_buf = io::ReadBuf::new(&mut buf[..n as usize]);
856 let start = read_buf.filled().len();
857 match ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
858 Ok(()) => {
859 let diff = read_buf.filled().len() - start;
860 if diff == 0 {
861 return Poll::Ready(Err(other("unexpected EOF during skip")));
862 } else {
863 amt -= diff as u64;
864 }
865 }
866 Err(err) => return Poll::Ready(Err(err)),
867 }
868 }
869
870 Poll::Ready(Ok(()))
871}
872
873#[cfg(test)]
874mod tests {
875 use super::*;
876
877 assert_impl_all!(fs::File: Send, Sync);
878 assert_impl_all!(Entries<fs::File>: Send, Sync);
879 assert_impl_all!(Archive<fs::File>: Send, Sync);
880 assert_impl_all!(Entry<Archive<fs::File>>: Send, Sync);
881}