use std::cell::{Cell, RefCell};
use std::cmp;
use std::pin::Pin;
use async_std::io;
use async_std::io::prelude::*;
use async_std::path::Path;
use async_std::prelude::*;
use async_std::stream::Stream;
use async_std::sync::Arc;
use async_std::task::{Context, Poll};
use pin_project::pin_project;
use crate::pin_cell::PinCell;
use crate::entry::{EntryFields, EntryIo};
use crate::error::TarError;
use crate::other;
use crate::{Entry, GnuExtSparseHeader, GnuSparseHeader, Header};
#[derive(Debug)]
pub struct Archive<R: Read + Unpin> {
inner: Arc<ArchiveInner<R>>,
}
impl<R: Read + Unpin> Clone for Archive<R> {
fn clone(&self) -> Self {
Archive {
inner: self.inner.clone(),
}
}
}
#[pin_project]
#[derive(Debug)]
pub struct ArchiveInner<R> {
pos: Cell<u64>,
unpack_xattrs: bool,
preserve_permissions: bool,
preserve_mtime: bool,
ignore_zeros: bool,
#[pin]
obj: PinCell<R>,
}
pub struct ArchiveBuilder<R: Read + Unpin> {
obj: PinCell<R>,
unpack_xattrs: bool,
preserve_permissions: bool,
preserve_mtime: bool,
ignore_zeros: bool,
}
impl<R: Read + Unpin> ArchiveBuilder<R> {
pub fn new(obj: R) -> Self {
ArchiveBuilder {
unpack_xattrs: false,
preserve_permissions: false,
preserve_mtime: true,
ignore_zeros: false,
obj: PinCell::new(obj),
}
}
pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
self.unpack_xattrs = unpack_xattrs;
self
}
pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
self.preserve_permissions = preserve;
self
}
pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
self.preserve_mtime = preserve;
self
}
pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
self.ignore_zeros = ignore_zeros;
self
}
pub fn build(self) -> Archive<R> {
let Self {
unpack_xattrs,
preserve_permissions,
preserve_mtime,
ignore_zeros,
obj,
} = self;
Archive {
inner: Arc::new(ArchiveInner {
unpack_xattrs,
preserve_permissions,
preserve_mtime,
ignore_zeros,
obj,
pos: Cell::new(0),
}),
}
}
}
impl<R: Read + Unpin + Sync + Send> Archive<R> {
pub fn new(obj: R) -> Archive<R> {
Archive {
inner: Arc::new(ArchiveInner {
unpack_xattrs: false,
preserve_permissions: false,
preserve_mtime: true,
ignore_zeros: false,
obj: PinCell::new(obj),
pos: Cell::new(0),
}),
}
}
pub fn into_inner(self) -> Result<R, Self> {
let Self { inner } = self;
match Arc::try_unwrap(inner) {
Ok(inner) => {
let c: RefCell<R> = inner.obj.into();
Ok(c.into_inner())
}
Err(inner) => Err(Self { inner }),
}
}
pub fn entries(&mut self) -> io::Result<Entries<R>> {
if self.inner.pos.get() != 0 {
return Err(other(
"cannot call entries unless archive is at \
position 0",
));
}
Ok(Entries {
archive: self.clone(),
next: 0,
gnu_longlink: None,
gnu_longname: None,
pax_extensions: None,
})
}
pub fn entries_raw(&mut self) -> io::Result<RawEntries<R>> {
if self.inner.pos.get() != 0 {
return Err(other(
"cannot call entries_raw unless archive is at \
position 0",
));
}
Ok(RawEntries {
archive: self.clone(),
next: 0,
})
}
pub async fn unpack<P: AsRef<Path>>(&mut self, dst: P) -> io::Result<()> {
let mut entries = self.entries()?;
let mut pinned = Pin::new(&mut entries);
while let Some(entry) = pinned.next().await {
let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
file.unpack_in(dst.as_ref()).await?;
}
Ok(())
}
}
pub struct Entries<R: Read + Unpin> {
archive: Archive<R>,
next: u64,
gnu_longname: Option<Vec<u8>>,
gnu_longlink: Option<Vec<u8>>,
pax_extensions: Option<Vec<u8>>,
}
macro_rules! ready_opt_err {
($val:expr) => {
match async_std::task::ready!($val) {
Some(Ok(val)) => val,
Some(Err(err)) => return Poll::Ready(Some(Err(err))),
None => return Poll::Ready(None),
}
};
}
macro_rules! ready_err {
($val:expr) => {
match async_std::task::ready!($val) {
Ok(val) => val,
Err(err) => return Poll::Ready(Some(Err(err))),
}
};
}
impl<R: Read + Unpin> Stream for Entries<R> {
type Item = io::Result<Entry<Archive<R>>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let entry = ready_opt_err!(poll_next_raw(self.archive.clone(), &mut self.next, cx));
if entry.header().as_gnu().is_some() && entry.header().entry_type().is_gnu_longname() {
if self.gnu_longname.is_some() {
return Poll::Ready(Some(Err(other(
"two long name entries describing \
the same member",
))));
}
let mut ef = EntryFields::from(entry);
let val = ready_err!(Pin::new(&mut ef).poll_read_all(cx));
self.gnu_longname = Some(val);
continue;
}
if entry.header().as_gnu().is_some() && entry.header().entry_type().is_gnu_longlink() {
if self.gnu_longlink.is_some() {
return Poll::Ready(Some(Err(other(
"two long name entries describing \
the same member",
))));
}
let mut ef = EntryFields::from(entry);
let val = ready_err!(Pin::new(&mut ef).poll_read_all(cx));
self.gnu_longlink = Some(val);
continue;
}
if entry.header().as_ustar().is_some()
&& entry.header().entry_type().is_pax_local_extensions()
{
if self.pax_extensions.is_some() {
return Poll::Ready(Some(Err(other(
"two pax extensions entries describing \
the same member",
))));
}
let mut ef = EntryFields::from(entry);
let val = ready_err!(Pin::new(&mut ef).poll_read_all(cx));
self.pax_extensions = Some(val);
continue;
}
let mut fields = EntryFields::from(entry);
fields.long_pathname = self.gnu_longname.take();
fields.long_linkname = self.gnu_longlink.take();
fields.pax_extensions = self.pax_extensions.take();
ready_err!(poll_parse_sparse_header(
self.archive.clone(),
&mut self.next,
&mut fields,
cx
));
return Poll::Ready(Some(Ok(fields.into_entry())));
}
}
}
pub struct RawEntries<R: Read + Unpin> {
archive: Archive<R>,
next: u64,
}
impl<R: Read + Unpin> Stream for RawEntries<R> {
type Item = io::Result<Entry<Archive<R>>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
poll_next_raw(self.archive.clone(), &mut self.next, cx)
}
}
fn poll_next_raw<R: Read + Unpin>(
mut archive: Archive<R>,
next: &mut u64,
cx: &mut Context<'_>,
) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
let mut header = Header::new_old();
let mut header_pos = *next;
loop {
let delta = *next - archive.inner.pos.get();
match async_std::task::ready!(poll_skip(&mut archive, cx, delta)) {
Ok(_) => {}
Err(err) => return Poll::Ready(Some(Err(err))),
}
match async_std::task::ready!(poll_try_read_all(&mut archive, cx, header.as_mut_bytes())) {
Ok(true) => {}
Ok(false) => return Poll::Ready(None),
Err(err) => return Poll::Ready(Some(Err(err))),
}
if !header.as_bytes().iter().all(|i| *i == 0) {
*next += 512;
break;
}
if !archive.inner.ignore_zeros {
return Poll::Ready(None);
}
*next += 512;
header_pos = *next;
}
let sum = header.as_bytes()[..148]
.iter()
.chain(&header.as_bytes()[156..])
.fold(0, |a, b| a + (*b as u32))
+ 8 * 32;
let cksum = header.cksum()?;
if sum != cksum {
return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
}
let file_pos = *next;
let size = header.entry_size()?;
let data = EntryIo::Data(archive.clone().take(size));
let ret = EntryFields {
size: size,
header_pos: header_pos,
file_pos: file_pos,
data: vec![data],
header: header,
long_pathname: None,
long_linkname: None,
pax_extensions: None,
unpack_xattrs: archive.inner.unpack_xattrs,
preserve_permissions: archive.inner.preserve_permissions,
preserve_mtime: archive.inner.preserve_mtime,
read_state: None,
};
let size = (size + 511) & !(512 - 1);
*next += size;
Poll::Ready(Some(Ok(ret.into_entry())))
}
fn poll_parse_sparse_header<R: Read + Unpin>(
mut archive: Archive<R>,
next: &mut u64,
entry: &mut EntryFields<Archive<R>>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
if !entry.header.entry_type().is_gnu_sparse() {
return Poll::Ready(Ok(()));
}
let gnu = match entry.header.as_gnu() {
Some(gnu) => gnu,
None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
};
entry.data.truncate(0);
let mut cur = 0;
let mut remaining = entry.size;
{
let data = &mut entry.data;
let reader = archive.clone();
let size = entry.size;
let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
if block.is_empty() {
return Ok(());
}
let off = block.offset()?;
let len = block.length()?;
if (size - remaining) % 512 != 0 {
return Err(other(
"previous block in sparse file was not \
aligned to 512-byte boundary",
));
} else if off < cur {
return Err(other(
"out of order or overlapping sparse \
blocks",
));
} else if cur < off {
let block = io::repeat(0).take(off - cur);
data.push(EntryIo::Pad(block));
}
cur = off
.checked_add(len)
.ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
remaining = remaining.checked_sub(len).ok_or_else(|| {
other(
"sparse file consumed more data than the header \
listed",
)
})?;
data.push(EntryIo::Data(reader.clone().take(len)));
Ok(())
};
for block in gnu.sparse.iter() {
add_block(block)?
}
if gnu.is_extended() {
let mut ext = GnuExtSparseHeader::new();
ext.isextended[0] = 1;
while ext.is_extended() {
match async_std::task::ready!(poll_try_read_all(
&mut archive,
cx,
ext.as_mut_bytes()
)) {
Ok(true) => {}
Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
Err(err) => return Poll::Ready(Err(err)),
}
*next += 512;
for block in ext.sparse.iter() {
add_block(block)?;
}
}
}
}
if cur != gnu.real_size()? {
return Poll::Ready(Err(other(
"mismatch in sparse file chunks and \
size in header",
)));
}
entry.size = cur;
if remaining > 0 {
return Poll::Ready(Err(other(
"mismatch in sparse file chunks and \
entry size in header",
)));
}
Poll::Ready(Ok(()))
}
impl<R: Read + Unpin> Read for Archive<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
into: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut r = Pin::new(&Pin::new(&mut &*self.inner).obj).borrow_mut();
let res =
async_std::task::ready!(crate::pin_cell::PinMut::as_mut(&mut r).poll_read(cx, into));
match res {
Ok(i) => {
self.inner.pos.set(self.inner.pos.get() + i as u64);
Poll::Ready(Ok(i))
}
Err(err) => Poll::Ready(Err(err)),
}
}
}
fn poll_try_read_all<R: Read + Unpin>(
mut source: R,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<bool>> {
let mut read = 0;
while read < buf.len() {
match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[read..])) {
Ok(0) => {
if read == 0 {
return Poll::Ready(Ok(false));
}
return Poll::Ready(Err(other("failed to read entire block")));
}
Ok(n) => read += n,
Err(err) => return Poll::Ready(Err(err)),
}
}
Poll::Ready(Ok(true))
}
fn poll_skip<R: Read + Unpin>(
mut source: R,
cx: &mut Context<'_>,
mut amt: u64,
) -> Poll<io::Result<()>> {
let mut buf = [0u8; 4096 * 8];
while amt > 0 {
let n = cmp::min(amt, buf.len() as u64);
match async_std::task::ready!(Pin::new(&mut source).poll_read(cx, &mut buf[..n as usize])) {
Ok(n) if n == 0 => {
return Poll::Ready(Err(other("unexpected EOF during skip")));
}
Ok(n) => {
amt -= n as u64;
}
Err(err) => return Poll::Ready(Err(err)),
}
}
Poll::Ready(Ok(()))
}