1use std::{
2 cmp,
3 pin::Pin,
4 sync::{Arc, Mutex},
5};
6
7#[cfg(feature = "fs")]
8use async_std::{fs, path::Path, prelude::*};
9use async_std::{
10 io,
11 io::prelude::*,
12 stream::Stream,
13 task::{Context, Poll},
14};
15use pin_project::pin_project;
16
17use crate::{
18 entry::{EntryFields, EntryIo},
19 other, Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
20};
21
22#[cfg(feature = "fs")]
23use crate::error::TarError;
24
25#[derive(Debug)]
29pub struct Archive<R: Read + Unpin> {
30 inner: Arc<Mutex<ArchiveInner<R>>>,
31}
32
33impl<R: Read + Unpin> Clone for Archive<R> {
34 fn clone(&self) -> Self {
35 Archive {
36 inner: self.inner.clone(),
37 }
38 }
39}
40
41#[pin_project]
42#[derive(Debug)]
43pub struct ArchiveInner<R: Read + Unpin> {
44 pos: u64,
45 unpack_xattrs: bool,
46 preserve_permissions: bool,
47 preserve_mtime: bool,
48 ignore_zeros: bool,
49 #[pin]
50 obj: R,
51}
52
53pub struct ArchiveBuilder<R: Read + Unpin> {
55 obj: R,
56 unpack_xattrs: bool,
57 preserve_permissions: bool,
58 preserve_mtime: bool,
59 ignore_zeros: bool,
60}
61
62impl<R: Read + Unpin> ArchiveBuilder<R> {
63 pub fn new(obj: R) -> Self {
65 ArchiveBuilder {
66 unpack_xattrs: false,
67 preserve_permissions: false,
68 preserve_mtime: true,
69 ignore_zeros: false,
70 obj,
71 }
72 }
73
74 pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
82 self.unpack_xattrs = unpack_xattrs;
83 self
84 }
85
86 pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
92 self.preserve_permissions = preserve;
93 self
94 }
95
96 pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
101 self.preserve_mtime = preserve;
102 self
103 }
104
105 pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
110 self.ignore_zeros = ignore_zeros;
111 self
112 }
113
114 pub fn build(self) -> Archive<R> {
116 let Self {
117 unpack_xattrs,
118 preserve_permissions,
119 preserve_mtime,
120 ignore_zeros,
121 obj,
122 } = self;
123
124 Archive {
125 inner: Arc::new(Mutex::new(ArchiveInner {
126 unpack_xattrs,
127 preserve_permissions,
128 preserve_mtime,
129 ignore_zeros,
130 obj,
131 pos: 0,
132 })),
133 }
134 }
135}
136
137impl<R: Read + Unpin> Archive<R> {
138 pub fn new(obj: R) -> Archive<R> {
140 Archive {
141 inner: Arc::new(Mutex::new(ArchiveInner {
142 unpack_xattrs: false,
143 preserve_permissions: false,
144 preserve_mtime: true,
145 ignore_zeros: false,
146 obj,
147 pos: 0,
148 })),
149 }
150 }
151
152 pub fn into_inner(self) -> Result<R, Self> {
154 match Arc::try_unwrap(self.inner) {
155 Ok(inner) => Ok(inner.into_inner().unwrap().obj),
156 Err(inner) => Err(Self { inner }),
157 }
158 }
159
160 pub fn entries(self) -> io::Result<Entries<R>> {
167 if self.inner.lock().unwrap().pos != 0 {
168 return Err(other(
169 "cannot call entries unless archive is at \
170 position 0",
171 ));
172 }
173
174 Ok(Entries {
175 archive: self,
176 current: (0, None, 0, None),
177 fields: None,
178 gnu_longlink: None,
179 gnu_longname: None,
180 pax_extensions: None,
181 })
182 }
183
184 pub fn entries_raw(self) -> io::Result<RawEntries<R>> {
191 if self.inner.lock().unwrap().pos != 0 {
192 return Err(other(
193 "cannot call entries_raw unless archive is at \
194 position 0",
195 ));
196 }
197
198 Ok(RawEntries {
199 archive: self,
200 current: (0, None, 0),
201 })
202 }
203
204 #[cfg(feature = "fs")]
228 pub async fn unpack<P: AsRef<Path>>(self, dst: P) -> io::Result<()> {
229 let mut entries = self.entries()?;
230 let mut pinned = Pin::new(&mut entries);
231 let dst = dst.as_ref();
232
233 if dst.symlink_metadata().await.is_err() {
234 fs::create_dir_all(&dst)
235 .await
236 .map_err(|e| TarError::new(&format!("failed to create `{}`", dst.display()), e))?;
237 }
238
239 let dst = &dst
245 .canonicalize()
246 .await
247 .unwrap_or_else(|_| dst.to_path_buf());
248
249 let mut directories = Vec::new();
253 while let Some(entry) = pinned.next().await {
254 let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
255 if file.header().entry_type() == crate::EntryType::Directory {
256 directories.push(file);
257 } else {
258 file.unpack_in(dst).await?;
259 }
260 }
261 for mut dir in directories {
262 dir.unpack_in(dst).await?;
263 }
264
265 Ok(())
266 }
267}
268
269#[pin_project]
271#[derive(Debug)]
272pub struct Entries<R: Read + Unpin> {
273 archive: Archive<R>,
274 current: (u64, Option<Header>, usize, Option<GnuExtSparseHeader>),
275 fields: Option<EntryFields<Archive<R>>>,
276 gnu_longname: Option<Vec<u8>>,
277 gnu_longlink: Option<Vec<u8>>,
278 pax_extensions: Option<Vec<u8>>,
279}
280
281macro_rules! ready_opt_err {
282 ($val:expr) => {
283 match async_std::task::ready!($val) {
284 Some(Ok(val)) => val,
285 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
286 None => return Poll::Ready(None),
287 }
288 };
289}
290
291macro_rules! ready_err {
292 ($val:expr) => {
293 match async_std::task::ready!($val) {
294 Ok(val) => val,
295 Err(err) => return Poll::Ready(Some(Err(err))),
296 }
297 };
298}
299
300impl<R: Read + Unpin> Stream for Entries<R> {
301 type Item = io::Result<Entry<Archive<R>>>;
302
303 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
304 let mut this = self.project();
305 loop {
306 let (next, current_header, current_header_pos, _) = &mut this.current;
307
308 let fields = if let Some(fields) = this.fields.as_mut() {
309 fields
310 } else {
311 *this.fields = Some(EntryFields::from(ready_opt_err!(poll_next_raw(
312 this.archive,
313 next,
314 current_header,
315 current_header_pos,
316 cx
317 ))));
318 continue;
319 };
320
321 let is_recognized_header =
322 fields.header.as_gnu().is_some() || fields.header.as_ustar().is_some();
323 if is_recognized_header && fields.header.entry_type().is_gnu_longname() {
324 if this.gnu_longname.is_some() {
325 return Poll::Ready(Some(Err(other(
326 "two long name entries describing \
327 the same member",
328 ))));
329 }
330
331 *this.gnu_longname = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
332 *this.fields = None;
333 continue;
334 }
335
336 if is_recognized_header && fields.header.entry_type().is_gnu_longlink() {
337 if this.gnu_longlink.is_some() {
338 return Poll::Ready(Some(Err(other(
339 "two long name entries describing \
340 the same member",
341 ))));
342 }
343 *this.gnu_longlink = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
344 *this.fields = None;
345 continue;
346 }
347
348 if is_recognized_header && fields.header.entry_type().is_pax_local_extensions() {
349 if this.pax_extensions.is_some() {
350 return Poll::Ready(Some(Err(other(
351 "two pax extensions entries describing \
352 the same member",
353 ))));
354 }
355 *this.pax_extensions = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
356 *this.fields = None;
357 continue;
358 }
359
360 fields.long_pathname = this.gnu_longname.take();
361 fields.long_linkname = this.gnu_longlink.take();
362 fields.pax_extensions = this.pax_extensions.take();
363
364 let (next, _, current_pos, current_ext) = &mut this.current;
365 ready_err!(poll_parse_sparse_header(
366 this.archive,
367 next,
368 current_ext,
369 current_pos,
370 fields,
371 cx
372 ));
373
374 return Poll::Ready(Some(Ok(this.fields.take().unwrap().into_entry())));
375 }
376 }
377}
378
379pub struct RawEntries<R: Read + Unpin> {
381 archive: Archive<R>,
382 current: (u64, Option<Header>, usize),
383}
384
385impl<R: Read + Unpin> Stream for RawEntries<R> {
386 type Item = io::Result<Entry<Archive<R>>>;
387
388 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
389 let archive = self.archive.clone();
390 let (next, current_header, current_header_pos) = &mut self.current;
391 poll_next_raw(&archive, next, current_header, current_header_pos, cx)
392 }
393}
394
395fn poll_next_raw<R: Read + Unpin>(
396 archive: &Archive<R>,
397 next: &mut u64,
398 current_header: &mut Option<Header>,
399 current_header_pos: &mut usize,
400 cx: &mut Context<'_>,
401) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
402 let mut header_pos = *next;
403
404 loop {
405 let archive = archive.clone();
406 if current_header.is_none() {
408 let delta = *next - archive.inner.lock().unwrap().pos;
409 match async_std::task::ready!(poll_skip(archive.clone(), cx, delta)) {
410 Ok(_) => {}
411 Err(err) => return Poll::Ready(Some(Err(err))),
412 }
413
414 *current_header = Some(Header::new_old());
415 *current_header_pos = 0;
416 }
417
418 let header = current_header.as_mut().unwrap();
419
420 match async_std::task::ready!(poll_try_read_all(
422 archive.clone(),
423 cx,
424 header.as_mut_bytes(),
425 current_header_pos,
426 )) {
427 Ok(true) => {}
428 Ok(false) => return Poll::Ready(None),
429 Err(err) => return Poll::Ready(Some(Err(err))),
430 }
431
432 if !header.as_bytes().iter().all(|i| *i == 0) {
436 *next += 512;
437 break;
438 }
439
440 if !archive.inner.lock().unwrap().ignore_zeros {
441 return Poll::Ready(None);
442 }
443
444 *next += 512;
445 header_pos = *next;
446 }
447
448 let header = current_header.as_mut().unwrap();
449
450 let sum = header.as_bytes()[..148]
452 .iter()
453 .chain(&header.as_bytes()[156..])
454 .fold(0, |a, b| a + (*b as u32))
455 + 8 * 32;
456 let cksum = header.cksum()?;
457 if sum != cksum {
458 return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
459 }
460
461 let file_pos = *next;
462 let size = header.entry_size()?;
463
464 let data = EntryIo::Data(archive.clone().take(size));
465
466 let header = current_header.take().unwrap();
467
468 let ArchiveInner {
469 unpack_xattrs,
470 preserve_mtime,
471 preserve_permissions,
472 ..
473 } = &*archive.inner.lock().unwrap();
474
475 let ret = EntryFields {
476 size,
477 header_pos,
478 file_pos,
479 data: vec![data],
480 header,
481 long_pathname: None,
482 long_linkname: None,
483 pax_extensions: None,
484 unpack_xattrs: *unpack_xattrs,
485 preserve_permissions: *preserve_permissions,
486 preserve_mtime: *preserve_mtime,
487 read_state: None,
488 };
489
490 let size = (size + 511) & !(512 - 1);
493 *next += size;
494
495 Poll::Ready(Some(Ok(ret.into_entry())))
496}
497
498fn poll_parse_sparse_header<R: Read + Unpin>(
499 archive: &Archive<R>,
500 next: &mut u64,
501 current_ext: &mut Option<GnuExtSparseHeader>,
502 current_ext_pos: &mut usize,
503 entry: &mut EntryFields<Archive<R>>,
504 cx: &mut Context<'_>,
505) -> Poll<io::Result<()>> {
506 if !entry.header.entry_type().is_gnu_sparse() {
507 return Poll::Ready(Ok(()));
508 }
509
510 let gnu = match entry.header.as_gnu() {
511 Some(gnu) => gnu,
512 None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
513 };
514
515 entry.data.truncate(0);
535
536 let mut cur = 0;
537 let mut remaining = entry.size;
538 {
539 let data = &mut entry.data;
540 let reader = archive.clone();
541 let size = entry.size;
542 let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
543 if block.is_empty() {
544 return Ok(());
545 }
546 let off = block.offset()?;
547 let len = block.length()?;
548
549 if (size - remaining) % 512 != 0 {
550 return Err(other(
551 "previous block in sparse file was not \
552 aligned to 512-byte boundary",
553 ));
554 } else if off < cur {
555 return Err(other(
556 "out of order or overlapping sparse \
557 blocks",
558 ));
559 } else if cur < off {
560 let block = io::repeat(0).take(off - cur);
561 data.push(EntryIo::Pad(block));
562 }
563 cur = off
564 .checked_add(len)
565 .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
566 remaining = remaining.checked_sub(len).ok_or_else(|| {
567 other(
568 "sparse file consumed more data than the header \
569 listed",
570 )
571 })?;
572 data.push(EntryIo::Data(reader.clone().take(len)));
573 Ok(())
574 };
575 for block in &gnu.sparse {
576 add_block(block)?
577 }
578 if gnu.is_extended() {
579 let started_header = current_ext.is_some();
580 if !started_header {
581 let mut ext = GnuExtSparseHeader::new();
582 ext.isextended[0] = 1;
583 *current_ext = Some(ext);
584 *current_ext_pos = 0;
585 }
586
587 let ext = current_ext.as_mut().unwrap();
588 while ext.is_extended() {
589 match async_std::task::ready!(poll_try_read_all(
590 archive.clone(),
591 cx,
592 ext.as_mut_bytes(),
593 current_ext_pos,
594 )) {
595 Ok(true) => {}
596 Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
597 Err(err) => return Poll::Ready(Err(err)),
598 }
599
600 *next += 512;
601 for block in &ext.sparse {
602 add_block(block)?;
603 }
604 }
605 }
606 }
607 if cur != gnu.real_size()? {
608 return Poll::Ready(Err(other(
609 "mismatch in sparse file chunks and \
610 size in header",
611 )));
612 }
613 entry.size = cur;
614 if remaining > 0 {
615 return Poll::Ready(Err(other(
616 "mismatch in sparse file chunks and \
617 entry size in header",
618 )));
619 }
620
621 Poll::Ready(Ok(()))
622}
623
624impl<R: Read + Unpin> Read for Archive<R> {
625 fn poll_read(
626 self: Pin<&mut Self>,
627 cx: &mut Context<'_>,
628 into: &mut [u8],
629 ) -> Poll<io::Result<usize>> {
630 let mut lock = self.inner.lock().unwrap();
631 let mut inner = Pin::new(&mut *lock);
632 let r = Pin::new(&mut inner.obj);
633
634 let res = async_std::task::ready!(r.poll_read(cx, into));
635 match res {
636 Ok(i) => {
637 inner.pos += i as u64;
638 Poll::Ready(Ok(i))
639 }
640 Err(err) => Poll::Ready(Err(err)),
641 }
642 }
643}
644
645fn poll_try_read_all<R: Read + Unpin>(
650 mut source: R,
651 cx: &mut Context<'_>,
652 buf: &mut [u8],
653 pos: &mut usize,
654) -> Poll<io::Result<bool>> {
655 while *pos < buf.len() {
656 match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[*pos..])) {
657 Ok(0) => {
658 if *pos == 0 {
659 return Poll::Ready(Ok(false));
660 }
661
662 return Poll::Ready(Err(other("failed to read entire block")));
663 }
664 Ok(n) => *pos += n,
665 Err(err) => return Poll::Ready(Err(err)),
666 }
667 }
668
669 *pos = 0;
670 Poll::Ready(Ok(true))
671}
672
673fn poll_skip<R: Read + Unpin>(
675 mut source: R,
676 cx: &mut Context<'_>,
677 mut amt: u64,
678) -> Poll<io::Result<()>> {
679 let mut buf = [0u8; 4096 * 8];
680 while amt > 0 {
681 let n = cmp::min(amt, buf.len() as u64);
682 match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[..n as usize])) {
683 Ok(n) if n == 0 => {
684 return Poll::Ready(Err(other("unexpected EOF during skip")));
685 }
686 Ok(n) => {
687 amt -= n as u64;
688 }
689 Err(err) => return Poll::Ready(Err(err)),
690 }
691 }
692
693 Poll::Ready(Ok(()))
694}
695
696#[cfg(test)]
697mod tests {
698 use super::*;
699
700 assert_impl_all!(async_std::fs::File: Send, Sync);
701 assert_impl_all!(Entries<async_std::fs::File>: Send, Sync);
702 assert_impl_all!(Archive<async_std::fs::File>: Send, Sync);
703 assert_impl_all!(Entry<Archive<async_std::fs::File>>: Send, Sync);
704}