1use std::{
2 cmp,
3 pin::Pin,
4 sync::{Arc, Mutex},
5};
6
7use async_std::{
8 fs, io,
9 io::prelude::*,
10 path::Path,
11 prelude::*,
12 stream::Stream,
13 task::{Context, Poll},
14};
15use pin_project::pin_project;
16
17use crate::{
18 Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
19 entry::{EntryFields, EntryIo},
20 error::TarError,
21 other,
22 pax::pax_extensions,
23};
24
25#[derive(Debug)]
29pub struct Archive<R: Read + Unpin> {
30 inner: Arc<Mutex<ArchiveInner<R>>>,
31}
32
33impl<R: Read + Unpin> Clone for Archive<R> {
34 fn clone(&self) -> Self {
35 Archive {
36 inner: self.inner.clone(),
37 }
38 }
39}
40
41#[pin_project]
42#[derive(Debug)]
43pub struct ArchiveInner<R: Read + Unpin> {
44 pos: u64,
45 unpack_xattrs: bool,
46 preserve_permissions: bool,
47 preserve_mtime: bool,
48 ignore_zeros: bool,
49 #[pin]
50 obj: R,
51}
52
53pub struct ArchiveBuilder<R: Read + Unpin> {
55 obj: R,
56 unpack_xattrs: bool,
57 preserve_permissions: bool,
58 preserve_mtime: bool,
59 ignore_zeros: bool,
60}
61
62impl<R: Read + Unpin> ArchiveBuilder<R> {
63 pub fn new(obj: R) -> Self {
65 ArchiveBuilder {
66 unpack_xattrs: false,
67 preserve_permissions: false,
68 preserve_mtime: true,
69 ignore_zeros: false,
70 obj,
71 }
72 }
73
74 pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
82 self.unpack_xattrs = unpack_xattrs;
83 self
84 }
85
86 pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
92 self.preserve_permissions = preserve;
93 self
94 }
95
96 pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
101 self.preserve_mtime = preserve;
102 self
103 }
104
105 pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
110 self.ignore_zeros = ignore_zeros;
111 self
112 }
113
114 pub fn build(self) -> Archive<R> {
116 let Self {
117 unpack_xattrs,
118 preserve_permissions,
119 preserve_mtime,
120 ignore_zeros,
121 obj,
122 } = self;
123
124 Archive {
125 inner: Arc::new(Mutex::new(ArchiveInner {
126 unpack_xattrs,
127 preserve_permissions,
128 preserve_mtime,
129 ignore_zeros,
130 obj,
131 pos: 0,
132 })),
133 }
134 }
135}
136
137impl<R: Read + Unpin> Archive<R> {
138 pub fn new(obj: R) -> Archive<R> {
140 Archive {
141 inner: Arc::new(Mutex::new(ArchiveInner {
142 unpack_xattrs: false,
143 preserve_permissions: false,
144 preserve_mtime: true,
145 ignore_zeros: false,
146 obj,
147 pos: 0,
148 })),
149 }
150 }
151
152 pub fn into_inner(self) -> Result<R, Self> {
154 match Arc::try_unwrap(self.inner) {
155 Ok(inner) => Ok(inner.into_inner().unwrap().obj),
156 Err(inner) => Err(Self { inner }),
157 }
158 }
159
160 pub fn entries(self) -> io::Result<Entries<R>> {
167 if self.inner.lock().unwrap().pos != 0 {
168 return Err(other(
169 "cannot call entries unless archive is at \
170 position 0",
171 ));
172 }
173
174 Ok(Entries {
175 archive: self,
176 current: State::default(),
177 fields: None,
178 gnu_longlink: None,
179 gnu_longname: None,
180 pax_extensions: None,
181 })
182 }
183
184 pub fn entries_raw(self) -> io::Result<RawEntries<R>> {
191 if self.inner.lock().unwrap().pos != 0 {
192 return Err(other(
193 "cannot call entries_raw unless archive is at \
194 position 0",
195 ));
196 }
197
198 Ok(RawEntries {
199 archive: self,
200 current: (0, None, 0),
201 })
202 }
203
204 pub async fn unpack<P: AsRef<Path>>(self, dst: P) -> io::Result<()> {
228 let mut entries = self.entries()?;
229 let mut pinned = Pin::new(&mut entries);
230 let dst = dst.as_ref();
231
232 if dst.symlink_metadata().await.is_err() {
233 fs::create_dir_all(&dst)
234 .await
235 .map_err(|e| TarError::new(&format!("failed to create `{}`", dst.display()), e))?;
236 }
237
238 let dst = &dst
244 .canonicalize()
245 .await
246 .unwrap_or_else(|_| dst.to_path_buf());
247
248 let mut directories = Vec::new();
252 while let Some(entry) = pinned.next().await {
253 let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
254 if file.header().entry_type() == crate::EntryType::Directory {
255 directories.push(file);
256 } else {
257 file.unpack_in(dst).await?;
258 }
259 }
260 for mut dir in directories {
261 dir.unpack_in(dst).await?;
262 }
263
264 Ok(())
265 }
266}
267
268#[derive(Debug, Default)]
269struct State {
270 next: u64,
271 current_header: Option<Header>,
272 current_header_pos: usize,
273 current_ext: Option<GnuExtSparseHeader>,
274 pax_extensions: Option<Vec<u8>>,
275}
276
277#[pin_project]
279#[derive(Debug)]
280pub struct Entries<R: Read + Unpin> {
281 archive: Archive<R>,
282 current: State,
283 fields: Option<EntryFields<Archive<R>>>,
284 gnu_longname: Option<Vec<u8>>,
285 gnu_longlink: Option<Vec<u8>>,
286 pax_extensions: Option<Vec<u8>>,
287}
288
289macro_rules! ready_opt_err {
290 ($val:expr) => {
291 match async_std::task::ready!($val) {
292 Some(Ok(val)) => val,
293 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
294 None => return Poll::Ready(None),
295 }
296 };
297}
298
299macro_rules! ready_err {
300 ($val:expr) => {
301 match async_std::task::ready!($val) {
302 Ok(val) => val,
303 Err(err) => return Poll::Ready(Some(Err(err))),
304 }
305 };
306}
307
308impl<R: Read + Unpin> Stream for Entries<R> {
309 type Item = io::Result<Entry<Archive<R>>>;
310
311 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312 let mut this = self.project();
313 loop {
314 let State {
315 next,
316 current_header,
317 current_header_pos,
318 pax_extensions,
319 ..
320 } = &mut this.current;
321
322 let fields = if let Some(fields) = this.fields.as_mut() {
323 fields
324 } else {
325 *this.fields = Some(EntryFields::from(ready_opt_err!(poll_next_raw(
326 this.archive,
327 next,
328 current_header,
329 current_header_pos,
330 pax_extensions.as_deref(),
331 cx
332 ))));
333 continue;
334 };
335
336 let is_recognized_header =
337 fields.header.as_gnu().is_some() || fields.header.as_ustar().is_some();
338 if is_recognized_header && fields.header.entry_type().is_gnu_longname() {
339 if this.gnu_longname.is_some() {
340 return Poll::Ready(Some(Err(other(
341 "two long name entries describing \
342 the same member",
343 ))));
344 }
345
346 *this.gnu_longname = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
347 *this.fields = None;
348 continue;
349 }
350
351 if is_recognized_header && fields.header.entry_type().is_gnu_longlink() {
352 if this.gnu_longlink.is_some() {
353 return Poll::Ready(Some(Err(other(
354 "two long name entries describing \
355 the same member",
356 ))));
357 }
358 *this.gnu_longlink = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
359 *this.fields = None;
360 continue;
361 }
362
363 if is_recognized_header && fields.header.entry_type().is_pax_local_extensions() {
364 if this.pax_extensions.is_some() {
365 return Poll::Ready(Some(Err(other(
366 "two pax extensions entries describing \
367 the same member",
368 ))));
369 }
370 *this.pax_extensions = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
371 this.current.pax_extensions = this.pax_extensions.clone();
372 *this.fields = None;
373 continue;
374 }
375
376 fields.long_pathname = this.gnu_longname.take();
377 fields.long_linkname = this.gnu_longlink.take();
378 fields.pax_extensions = this.pax_extensions.take();
379
380 let State {
381 next,
382 current_header_pos,
383 current_ext,
384 ..
385 } = &mut this.current;
386 ready_err!(poll_parse_sparse_header(
387 this.archive,
388 next,
389 current_ext,
390 current_header_pos,
391 fields,
392 cx
393 ));
394
395 return Poll::Ready(Some(Ok(this.fields.take().unwrap().into_entry())));
396 }
397 }
398}
399
400pub struct RawEntries<R: Read + Unpin> {
402 archive: Archive<R>,
403 current: (u64, Option<Header>, usize),
404}
405
406impl<R: Read + Unpin> Stream for RawEntries<R> {
407 type Item = io::Result<Entry<Archive<R>>>;
408
409 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
410 let archive = self.archive.clone();
411 let (next, current_header, current_header_pos) = &mut self.current;
412 poll_next_raw(&archive, next, current_header, current_header_pos, None, cx)
413 }
414}
415
416fn poll_next_raw<R: Read + Unpin>(
417 archive: &Archive<R>,
418 next: &mut u64,
419 current_header: &mut Option<Header>,
420 current_header_pos: &mut usize,
421 pax_extensions_data: Option<&[u8]>,
422 cx: &mut Context<'_>,
423) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
424 let mut header_pos = *next;
425
426 loop {
427 let archive = archive.clone();
428 if current_header.is_none() {
430 let delta = *next - archive.inner.lock().unwrap().pos;
431 match async_std::task::ready!(poll_skip(archive.clone(), cx, delta)) {
432 Ok(_) => {}
433 Err(err) => return Poll::Ready(Some(Err(err))),
434 }
435
436 *current_header = Some(Header::new_old());
437 *current_header_pos = 0;
438 }
439
440 let header = current_header.as_mut().unwrap();
441
442 match async_std::task::ready!(poll_try_read_all(
444 archive.clone(),
445 cx,
446 header.as_mut_bytes(),
447 current_header_pos,
448 )) {
449 Ok(true) => {}
450 Ok(false) => return Poll::Ready(None),
451 Err(err) => return Poll::Ready(Some(Err(err))),
452 }
453
454 if !header.as_bytes().iter().all(|i| *i == 0) {
458 *next += 512;
459 break;
460 }
461
462 if !archive.inner.lock().unwrap().ignore_zeros {
463 return Poll::Ready(None);
464 }
465
466 *next += 512;
467 header_pos = *next;
468 }
469
470 let header = current_header.as_mut().unwrap();
471
472 let sum = header.as_bytes()[..148]
474 .iter()
475 .chain(&header.as_bytes()[156..])
476 .fold(0, |a, b| a + (*b as u32))
477 + 8 * 32;
478 let cksum = header.cksum()?;
479 if sum != cksum {
480 return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
481 }
482
483 let file_pos = *next;
484
485 let mut header = current_header.take().unwrap();
486
487 let mut size = header.entry_size()?;
489
490 if let Some(pax_extensions_data) = pax_extensions_data {
493 let pax = pax_extensions(pax_extensions_data);
494 for extension in pax {
495 let extension = extension.map_err(|_e| other("pax extensions invalid"))?;
496
497 let Some(key) = extension.key().ok() else {
500 continue;
501 };
502
503 match key {
504 "size" => {
505 let size_str = extension
506 .value()
507 .map_err(|_e| other("failed to parse pax size as string"))?;
508 size = size_str
509 .parse::<u64>()
510 .map_err(|_e| other("failed to parse pax size"))?;
511 }
512
513 "uid" => {
514 let uid_str = extension
515 .value()
516 .map_err(|_e| other("failed to parse pax uid as string"))?;
517 header.set_uid(
518 uid_str
519 .parse::<u64>()
520 .map_err(|_e| other("failed to parse pax uid"))?,
521 );
522 }
523
524 "gid" => {
525 let gid_str = extension
526 .value()
527 .map_err(|_e| other("failed to parse pax gid as string"))?;
528 header.set_gid(
529 gid_str
530 .parse::<u64>()
531 .map_err(|_e| other("failed to parse pax gid"))?,
532 );
533 }
534
535 _ => {
536 continue;
537 }
538 }
539 }
540 }
541
542 let data = EntryIo::Data(archive.clone().take(size));
543
544 let ArchiveInner {
545 unpack_xattrs,
546 preserve_mtime,
547 preserve_permissions,
548 ..
549 } = &*archive.inner.lock().unwrap();
550
551 let ret = EntryFields {
552 size,
553 header_pos,
554 file_pos,
555 data: vec![data],
556 header,
557 long_pathname: None,
558 long_linkname: None,
559 pax_extensions: None,
560 unpack_xattrs: *unpack_xattrs,
561 preserve_permissions: *preserve_permissions,
562 preserve_mtime: *preserve_mtime,
563 read_state: None,
564 };
565
566 let size = (size + 511) & !(512 - 1);
569 *next += size;
570
571 Poll::Ready(Some(Ok(ret.into_entry())))
572}
573
574fn poll_parse_sparse_header<R: Read + Unpin>(
575 archive: &Archive<R>,
576 next: &mut u64,
577 current_ext: &mut Option<GnuExtSparseHeader>,
578 current_ext_pos: &mut usize,
579 entry: &mut EntryFields<Archive<R>>,
580 cx: &mut Context<'_>,
581) -> Poll<io::Result<()>> {
582 if !entry.header.entry_type().is_gnu_sparse() {
583 return Poll::Ready(Ok(()));
584 }
585
586 let gnu = match entry.header.as_gnu() {
587 Some(gnu) => gnu,
588 None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
589 };
590
591 entry.data.truncate(0);
611
612 let mut cur = 0;
613 let mut remaining = entry.size;
614 {
615 let data = &mut entry.data;
616 let reader = archive.clone();
617 let size = entry.size;
618 let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
619 if block.is_empty() {
620 return Ok(());
621 }
622 let off = block.offset()?;
623 let len = block.length()?;
624
625 if (size - remaining) % 512 != 0 {
626 return Err(other(
627 "previous block in sparse file was not \
628 aligned to 512-byte boundary",
629 ));
630 } else if off < cur {
631 return Err(other(
632 "out of order or overlapping sparse \
633 blocks",
634 ));
635 } else if cur < off {
636 let block = io::repeat(0).take(off - cur);
637 data.push(EntryIo::Pad(block));
638 }
639 cur = off
640 .checked_add(len)
641 .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
642 remaining = remaining.checked_sub(len).ok_or_else(|| {
643 other(
644 "sparse file consumed more data than the header \
645 listed",
646 )
647 })?;
648 data.push(EntryIo::Data(reader.clone().take(len)));
649 Ok(())
650 };
651 for block in &gnu.sparse {
652 add_block(block)?
653 }
654 if gnu.is_extended() {
655 let started_header = current_ext.is_some();
656 if !started_header {
657 let mut ext = GnuExtSparseHeader::new();
658 ext.isextended[0] = 1;
659 *current_ext = Some(ext);
660 *current_ext_pos = 0;
661 }
662
663 let ext = current_ext.as_mut().unwrap();
664 while ext.is_extended() {
665 match async_std::task::ready!(poll_try_read_all(
666 archive.clone(),
667 cx,
668 ext.as_mut_bytes(),
669 current_ext_pos,
670 )) {
671 Ok(true) => {}
672 Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
673 Err(err) => return Poll::Ready(Err(err)),
674 }
675
676 *next += 512;
677 for block in &ext.sparse {
678 add_block(block)?;
679 }
680 }
681 }
682 }
683 if cur != gnu.real_size()? {
684 return Poll::Ready(Err(other(
685 "mismatch in sparse file chunks and \
686 size in header",
687 )));
688 }
689 entry.size = cur;
690 if remaining > 0 {
691 return Poll::Ready(Err(other(
692 "mismatch in sparse file chunks and \
693 entry size in header",
694 )));
695 }
696
697 Poll::Ready(Ok(()))
698}
699
700impl<R: Read + Unpin> Read for Archive<R> {
701 fn poll_read(
702 self: Pin<&mut Self>,
703 cx: &mut Context<'_>,
704 into: &mut [u8],
705 ) -> Poll<io::Result<usize>> {
706 let mut lock = self.inner.lock().unwrap();
707 let mut inner = Pin::new(&mut *lock);
708 let r = Pin::new(&mut inner.obj);
709
710 let res = async_std::task::ready!(r.poll_read(cx, into));
711 match res {
712 Ok(i) => {
713 inner.pos += i as u64;
714 Poll::Ready(Ok(i))
715 }
716 Err(err) => Poll::Ready(Err(err)),
717 }
718 }
719}
720
721fn poll_try_read_all<R: Read + Unpin>(
726 mut source: R,
727 cx: &mut Context<'_>,
728 buf: &mut [u8],
729 pos: &mut usize,
730) -> Poll<io::Result<bool>> {
731 while *pos < buf.len() {
732 match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[*pos..])) {
733 Ok(0) => {
734 if *pos == 0 {
735 return Poll::Ready(Ok(false));
736 }
737
738 return Poll::Ready(Err(other("failed to read entire block")));
739 }
740 Ok(n) => *pos += n,
741 Err(err) => return Poll::Ready(Err(err)),
742 }
743 }
744
745 *pos = 0;
746 Poll::Ready(Ok(true))
747}
748
749fn poll_skip<R: Read + Unpin>(
751 mut source: R,
752 cx: &mut Context<'_>,
753 mut amt: u64,
754) -> Poll<io::Result<()>> {
755 let mut buf = [0u8; 4096 * 8];
756 while amt > 0 {
757 let n = cmp::min(amt, buf.len() as u64);
758 match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[..n as usize])) {
759 Ok(0) => {
760 return Poll::Ready(Err(other("unexpected EOF during skip")));
761 }
762 Ok(n) => {
763 amt -= n as u64;
764 }
765 Err(err) => return Poll::Ready(Err(err)),
766 }
767 }
768
769 Poll::Ready(Ok(()))
770}
771
772#[cfg(test)]
773mod tests {
774 use super::*;
775
776 assert_impl_all!(async_std::fs::File: Send, Sync);
777 assert_impl_all!(Entries<async_std::fs::File>: Send, Sync);
778 assert_impl_all!(Archive<async_std::fs::File>: Send, Sync);
779 assert_impl_all!(Entry<Archive<async_std::fs::File>>: Send, Sync);
780}