1use std::{
2 cmp,
3 pin::Pin,
4 sync::{Arc, Mutex},
5};
6
7use async_std::{
8 fs, io,
9 io::prelude::*,
10 path::Path,
11 prelude::*,
12 stream::Stream,
13 task::{Context, Poll},
14};
15use pin_project::pin_project;
16
17use crate::{
18 entry::{EntryFields, EntryIo},
19 error::TarError,
20 other, Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
21};
22
23#[derive(Debug)]
27pub struct Archive<R: Read + Unpin> {
28 inner: Arc<Mutex<ArchiveInner<R>>>,
29}
30
31impl<R: Read + Unpin> Clone for Archive<R> {
32 fn clone(&self) -> Self {
33 Archive {
34 inner: self.inner.clone(),
35 }
36 }
37}
38
39#[pin_project]
40#[derive(Debug)]
41pub struct ArchiveInner<R: Read + Unpin> {
42 pos: u64,
43 unpack_xattrs: bool,
44 preserve_permissions: bool,
45 preserve_mtime: bool,
46 ignore_zeros: bool,
47 #[pin]
48 obj: R,
49}
50
51pub struct ArchiveBuilder<R: Read + Unpin> {
53 obj: R,
54 unpack_xattrs: bool,
55 preserve_permissions: bool,
56 preserve_mtime: bool,
57 ignore_zeros: bool,
58}
59
60impl<R: Read + Unpin> ArchiveBuilder<R> {
61 pub fn new(obj: R) -> Self {
63 ArchiveBuilder {
64 unpack_xattrs: false,
65 preserve_permissions: false,
66 preserve_mtime: true,
67 ignore_zeros: false,
68 obj,
69 }
70 }
71
72 pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
80 self.unpack_xattrs = unpack_xattrs;
81 self
82 }
83
84 pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
90 self.preserve_permissions = preserve;
91 self
92 }
93
94 pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
99 self.preserve_mtime = preserve;
100 self
101 }
102
103 pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
108 self.ignore_zeros = ignore_zeros;
109 self
110 }
111
112 pub fn build(self) -> Archive<R> {
114 let Self {
115 unpack_xattrs,
116 preserve_permissions,
117 preserve_mtime,
118 ignore_zeros,
119 obj,
120 } = self;
121
122 Archive {
123 inner: Arc::new(Mutex::new(ArchiveInner {
124 unpack_xattrs,
125 preserve_permissions,
126 preserve_mtime,
127 ignore_zeros,
128 obj,
129 pos: 0,
130 })),
131 }
132 }
133}
134
135impl<R: Read + Unpin> Archive<R> {
136 pub fn new(obj: R) -> Archive<R> {
138 Archive {
139 inner: Arc::new(Mutex::new(ArchiveInner {
140 unpack_xattrs: false,
141 preserve_permissions: false,
142 preserve_mtime: true,
143 ignore_zeros: false,
144 obj,
145 pos: 0,
146 })),
147 }
148 }
149
150 pub fn into_inner(self) -> Result<R, Self> {
152 match Arc::try_unwrap(self.inner) {
153 Ok(inner) => Ok(inner.into_inner().unwrap().obj),
154 Err(inner) => Err(Self { inner }),
155 }
156 }
157
158 pub fn entries(self) -> io::Result<Entries<R>> {
165 if self.inner.lock().unwrap().pos != 0 {
166 return Err(other(
167 "cannot call entries unless archive is at \
168 position 0",
169 ));
170 }
171
172 Ok(Entries {
173 archive: self,
174 current: (0, None, 0, None),
175 fields: None,
176 gnu_longlink: None,
177 gnu_longname: None,
178 pax_extensions: None,
179 })
180 }
181
182 pub fn entries_raw(self) -> io::Result<RawEntries<R>> {
189 if self.inner.lock().unwrap().pos != 0 {
190 return Err(other(
191 "cannot call entries_raw unless archive is at \
192 position 0",
193 ));
194 }
195
196 Ok(RawEntries {
197 archive: self,
198 current: (0, None, 0),
199 })
200 }
201
202 pub async fn unpack<P: AsRef<Path>>(self, dst: P) -> io::Result<()> {
226 let mut entries = self.entries()?;
227 let mut pinned = Pin::new(&mut entries);
228 let dst = dst.as_ref();
229
230 if dst.symlink_metadata().await.is_err() {
231 fs::create_dir_all(&dst)
232 .await
233 .map_err(|e| TarError::new(&format!("failed to create `{}`", dst.display()), e))?;
234 }
235
236 let dst = &dst
242 .canonicalize()
243 .await
244 .unwrap_or_else(|_| dst.to_path_buf());
245
246 let mut directories = Vec::new();
250 while let Some(entry) = pinned.next().await {
251 let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
252 if file.header().entry_type() == crate::EntryType::Directory {
253 directories.push(file);
254 } else {
255 file.unpack_in(dst).await?;
256 }
257 }
258 for mut dir in directories {
259 dir.unpack_in(dst).await?;
260 }
261
262 Ok(())
263 }
264}
265
266#[pin_project]
268#[derive(Debug)]
269pub struct Entries<R: Read + Unpin> {
270 archive: Archive<R>,
271 current: (u64, Option<Header>, usize, Option<GnuExtSparseHeader>),
272 fields: Option<EntryFields<Archive<R>>>,
273 gnu_longname: Option<Vec<u8>>,
274 gnu_longlink: Option<Vec<u8>>,
275 pax_extensions: Option<Vec<u8>>,
276}
277
278macro_rules! ready_opt_err {
279 ($val:expr) => {
280 match async_std::task::ready!($val) {
281 Some(Ok(val)) => val,
282 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
283 None => return Poll::Ready(None),
284 }
285 };
286}
287
288macro_rules! ready_err {
289 ($val:expr) => {
290 match async_std::task::ready!($val) {
291 Ok(val) => val,
292 Err(err) => return Poll::Ready(Some(Err(err))),
293 }
294 };
295}
296
297impl<R: Read + Unpin> Stream for Entries<R> {
298 type Item = io::Result<Entry<Archive<R>>>;
299
300 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
301 let mut this = self.project();
302 loop {
303 let (next, current_header, current_header_pos, _) = &mut this.current;
304
305 let fields = if let Some(fields) = this.fields.as_mut() {
306 fields
307 } else {
308 *this.fields = Some(EntryFields::from(ready_opt_err!(poll_next_raw(
309 this.archive,
310 next,
311 current_header,
312 current_header_pos,
313 cx
314 ))));
315 continue;
316 };
317
318 let is_recognized_header =
319 fields.header.as_gnu().is_some() || fields.header.as_ustar().is_some();
320 if is_recognized_header && fields.header.entry_type().is_gnu_longname() {
321 if this.gnu_longname.is_some() {
322 return Poll::Ready(Some(Err(other(
323 "two long name entries describing \
324 the same member",
325 ))));
326 }
327
328 *this.gnu_longname = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
329 *this.fields = None;
330 continue;
331 }
332
333 if is_recognized_header && fields.header.entry_type().is_gnu_longlink() {
334 if this.gnu_longlink.is_some() {
335 return Poll::Ready(Some(Err(other(
336 "two long name entries describing \
337 the same member",
338 ))));
339 }
340 *this.gnu_longlink = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
341 *this.fields = None;
342 continue;
343 }
344
345 if is_recognized_header && fields.header.entry_type().is_pax_local_extensions() {
346 if this.pax_extensions.is_some() {
347 return Poll::Ready(Some(Err(other(
348 "two pax extensions entries describing \
349 the same member",
350 ))));
351 }
352 *this.pax_extensions = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
353 *this.fields = None;
354 continue;
355 }
356
357 fields.long_pathname = this.gnu_longname.take();
358 fields.long_linkname = this.gnu_longlink.take();
359 fields.pax_extensions = this.pax_extensions.take();
360
361 let (next, _, current_pos, current_ext) = &mut this.current;
362 ready_err!(poll_parse_sparse_header(
363 this.archive,
364 next,
365 current_ext,
366 current_pos,
367 fields,
368 cx
369 ));
370
371 return Poll::Ready(Some(Ok(this.fields.take().unwrap().into_entry())));
372 }
373 }
374}
375
376pub struct RawEntries<R: Read + Unpin> {
378 archive: Archive<R>,
379 current: (u64, Option<Header>, usize),
380}
381
382impl<R: Read + Unpin> Stream for RawEntries<R> {
383 type Item = io::Result<Entry<Archive<R>>>;
384
385 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
386 let archive = self.archive.clone();
387 let (next, current_header, current_header_pos) = &mut self.current;
388 poll_next_raw(&archive, next, current_header, current_header_pos, cx)
389 }
390}
391
392fn poll_next_raw<R: Read + Unpin>(
393 archive: &Archive<R>,
394 next: &mut u64,
395 current_header: &mut Option<Header>,
396 current_header_pos: &mut usize,
397 cx: &mut Context<'_>,
398) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
399 let mut header_pos = *next;
400
401 loop {
402 let archive = archive.clone();
403 if current_header.is_none() {
405 let delta = *next - archive.inner.lock().unwrap().pos;
406 match async_std::task::ready!(poll_skip(archive.clone(), cx, delta)) {
407 Ok(_) => {}
408 Err(err) => return Poll::Ready(Some(Err(err))),
409 }
410
411 *current_header = Some(Header::new_old());
412 *current_header_pos = 0;
413 }
414
415 let header = current_header.as_mut().unwrap();
416
417 match async_std::task::ready!(poll_try_read_all(
419 archive.clone(),
420 cx,
421 header.as_mut_bytes(),
422 current_header_pos,
423 )) {
424 Ok(true) => {}
425 Ok(false) => return Poll::Ready(None),
426 Err(err) => return Poll::Ready(Some(Err(err))),
427 }
428
429 if !header.as_bytes().iter().all(|i| *i == 0) {
433 *next += 512;
434 break;
435 }
436
437 if !archive.inner.lock().unwrap().ignore_zeros {
438 return Poll::Ready(None);
439 }
440
441 *next += 512;
442 header_pos = *next;
443 }
444
445 let header = current_header.as_mut().unwrap();
446
447 let sum = header.as_bytes()[..148]
449 .iter()
450 .chain(&header.as_bytes()[156..])
451 .fold(0, |a, b| a + (*b as u32))
452 + 8 * 32;
453 let cksum = header.cksum()?;
454 if sum != cksum {
455 return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
456 }
457
458 let file_pos = *next;
459 let size = header.entry_size()?;
460
461 let data = EntryIo::Data(archive.clone().take(size));
462
463 let header = current_header.take().unwrap();
464
465 let ArchiveInner {
466 unpack_xattrs,
467 preserve_mtime,
468 preserve_permissions,
469 ..
470 } = &*archive.inner.lock().unwrap();
471
472 let ret = EntryFields {
473 size,
474 header_pos,
475 file_pos,
476 data: vec![data],
477 header,
478 long_pathname: None,
479 long_linkname: None,
480 pax_extensions: None,
481 unpack_xattrs: *unpack_xattrs,
482 preserve_permissions: *preserve_permissions,
483 preserve_mtime: *preserve_mtime,
484 read_state: None,
485 };
486
487 let size = (size + 511) & !(512 - 1);
490 *next += size;
491
492 Poll::Ready(Some(Ok(ret.into_entry())))
493}
494
495fn poll_parse_sparse_header<R: Read + Unpin>(
496 archive: &Archive<R>,
497 next: &mut u64,
498 current_ext: &mut Option<GnuExtSparseHeader>,
499 current_ext_pos: &mut usize,
500 entry: &mut EntryFields<Archive<R>>,
501 cx: &mut Context<'_>,
502) -> Poll<io::Result<()>> {
503 if !entry.header.entry_type().is_gnu_sparse() {
504 return Poll::Ready(Ok(()));
505 }
506
507 let gnu = match entry.header.as_gnu() {
508 Some(gnu) => gnu,
509 None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
510 };
511
512 entry.data.truncate(0);
532
533 let mut cur = 0;
534 let mut remaining = entry.size;
535 {
536 let data = &mut entry.data;
537 let reader = archive.clone();
538 let size = entry.size;
539 let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
540 if block.is_empty() {
541 return Ok(());
542 }
543 let off = block.offset()?;
544 let len = block.length()?;
545
546 if (size - remaining) % 512 != 0 {
547 return Err(other(
548 "previous block in sparse file was not \
549 aligned to 512-byte boundary",
550 ));
551 } else if off < cur {
552 return Err(other(
553 "out of order or overlapping sparse \
554 blocks",
555 ));
556 } else if cur < off {
557 let block = io::repeat(0).take(off - cur);
558 data.push(EntryIo::Pad(block));
559 }
560 cur = off
561 .checked_add(len)
562 .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
563 remaining = remaining.checked_sub(len).ok_or_else(|| {
564 other(
565 "sparse file consumed more data than the header \
566 listed",
567 )
568 })?;
569 data.push(EntryIo::Data(reader.clone().take(len)));
570 Ok(())
571 };
572 for block in &gnu.sparse {
573 add_block(block)?
574 }
575 if gnu.is_extended() {
576 let started_header = current_ext.is_some();
577 if !started_header {
578 let mut ext = GnuExtSparseHeader::new();
579 ext.isextended[0] = 1;
580 *current_ext = Some(ext);
581 *current_ext_pos = 0;
582 }
583
584 let ext = current_ext.as_mut().unwrap();
585 while ext.is_extended() {
586 match async_std::task::ready!(poll_try_read_all(
587 archive.clone(),
588 cx,
589 ext.as_mut_bytes(),
590 current_ext_pos,
591 )) {
592 Ok(true) => {}
593 Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
594 Err(err) => return Poll::Ready(Err(err)),
595 }
596
597 *next += 512;
598 for block in &ext.sparse {
599 add_block(block)?;
600 }
601 }
602 }
603 }
604 if cur != gnu.real_size()? {
605 return Poll::Ready(Err(other(
606 "mismatch in sparse file chunks and \
607 size in header",
608 )));
609 }
610 entry.size = cur;
611 if remaining > 0 {
612 return Poll::Ready(Err(other(
613 "mismatch in sparse file chunks and \
614 entry size in header",
615 )));
616 }
617
618 Poll::Ready(Ok(()))
619}
620
621impl<R: Read + Unpin> Read for Archive<R> {
622 fn poll_read(
623 self: Pin<&mut Self>,
624 cx: &mut Context<'_>,
625 into: &mut [u8],
626 ) -> Poll<io::Result<usize>> {
627 let mut lock = self.inner.lock().unwrap();
628 let mut inner = Pin::new(&mut *lock);
629 let r = Pin::new(&mut inner.obj);
630
631 let res = async_std::task::ready!(r.poll_read(cx, into));
632 match res {
633 Ok(i) => {
634 inner.pos += i as u64;
635 Poll::Ready(Ok(i))
636 }
637 Err(err) => Poll::Ready(Err(err)),
638 }
639 }
640}
641
642fn poll_try_read_all<R: Read + Unpin>(
647 mut source: R,
648 cx: &mut Context<'_>,
649 buf: &mut [u8],
650 pos: &mut usize,
651) -> Poll<io::Result<bool>> {
652 while *pos < buf.len() {
653 match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[*pos..])) {
654 Ok(0) => {
655 if *pos == 0 {
656 return Poll::Ready(Ok(false));
657 }
658
659 return Poll::Ready(Err(other("failed to read entire block")));
660 }
661 Ok(n) => *pos += n,
662 Err(err) => return Poll::Ready(Err(err)),
663 }
664 }
665
666 *pos = 0;
667 Poll::Ready(Ok(true))
668}
669
670fn poll_skip<R: Read + Unpin>(
672 mut source: R,
673 cx: &mut Context<'_>,
674 mut amt: u64,
675) -> Poll<io::Result<()>> {
676 let mut buf = [0u8; 4096 * 8];
677 while amt > 0 {
678 let n = cmp::min(amt, buf.len() as u64);
679 match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[..n as usize])) {
680 Ok(0) => {
681 return Poll::Ready(Err(other("unexpected EOF during skip")));
682 }
683 Ok(n) => {
684 amt -= n as u64;
685 }
686 Err(err) => return Poll::Ready(Err(err)),
687 }
688 }
689
690 Poll::Ready(Ok(()))
691}
692
693#[cfg(test)]
694mod tests {
695 use super::*;
696
697 assert_impl_all!(async_std::fs::File: Send, Sync);
698 assert_impl_all!(Entries<async_std::fs::File>: Send, Sync);
699 assert_impl_all!(Archive<async_std::fs::File>: Send, Sync);
700 assert_impl_all!(Entry<Archive<async_std::fs::File>>: Send, Sync);
701}