mod slice;
use crate::{
archive::{Archive, ArchiveHeader, PNA_HEADER},
chunk::{Chunk, ChunkReader, ChunkType, RawChunk, read_chunk},
entry::{Entry, NormalEntry, RawEntry, ReadEntry},
};
#[cfg(feature = "unstable-async")]
use futures_util::AsyncReadExt;
pub(crate) use slice::read_header_from_slice;
use std::{
io::{self, Read, Seek, SeekFrom},
mem::swap,
};
pub(crate) fn read_pna_header<R: Read>(mut reader: R) -> io::Result<()> {
let mut header = [0u8; PNA_HEADER.len()];
reader.read_exact(&mut header)?;
if &header != PNA_HEADER {
return Err(io::Error::new(io::ErrorKind::InvalidData, "It's not PNA"));
}
Ok(())
}
#[cfg(feature = "unstable-async")]
async fn read_pna_header_async<R: futures_io::AsyncRead + Unpin>(mut reader: R) -> io::Result<()> {
let mut header = [0u8; PNA_HEADER.len()];
reader.read_exact(&mut header).await?;
if &header != PNA_HEADER {
return Err(io::Error::new(io::ErrorKind::InvalidData, "It's not PNA"));
}
Ok(())
}
impl<R: Read> Archive<R> {
#[inline]
pub fn read_header(reader: R) -> io::Result<Self> {
Self::read_header_with_buffer(reader, Default::default())
}
fn read_header_with_buffer(mut reader: R, buf: Vec<RawChunk>) -> io::Result<Self> {
read_pna_header(&mut reader)?;
let mut chunk_reader = ChunkReader::new(&mut reader, None);
let chunk = chunk_reader.read_chunk()?;
if chunk.ty != ChunkType::AHED {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Unexpected Chunk `{}`", chunk.ty),
));
}
let header = ArchiveHeader::try_from_bytes(chunk.data())?;
Ok(Self::with_buffer(reader, header, buf))
}
fn next_raw_item(&mut self) -> io::Result<Option<RawEntry>> {
let mut chunks = Vec::new();
swap(&mut self.buf, &mut chunks);
loop {
let chunk = read_chunk(&mut self.inner, self.max_chunk_size)?;
match chunk.ty {
ChunkType::FEND | ChunkType::SEND => {
chunks.push(chunk);
break;
}
ChunkType::ANXT => self.next_archive = true,
ChunkType::AEND => {
self.buf = chunks;
return Ok(None);
}
_ => chunks.push(chunk),
}
}
Ok(Some(RawEntry(chunks)))
}
fn read_entry(&mut self) -> io::Result<Option<ReadEntry>> {
self.next_raw_item()?.map(TryInto::try_into).transpose()
}
#[inline]
pub fn raw_entries(&mut self) -> impl Iterator<Item = io::Result<impl Entry + Sized>> + '_ {
RawEntries(self)
}
#[inline]
pub fn entries_with_password<'a>(
&'a mut self,
password: Option<&'a [u8]>,
) -> impl Iterator<Item = io::Result<NormalEntry>> + 'a {
self.entries().extract_solid_entries(password)
}
#[inline]
pub fn read_next_archive<OR: Read>(self, reader: OR) -> io::Result<Archive<OR>> {
let current_header = self.header;
let mut next = Archive::<OR>::read_header_with_buffer(reader, self.buf)?;
next.max_chunk_size = self.max_chunk_size;
if current_header.archive_number + 1 != next.header.archive_number {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Next archive number must be +1 (current: {}, detected: {})",
current_header.archive_number, next.header.archive_number
),
));
}
Ok(next)
}
}
impl<R> Archive<R> {
#[inline]
pub const fn entries(&mut self) -> Entries<'_, R> {
Entries::new(self)
}
}
#[cfg(feature = "unstable-async")]
impl<R: futures_io::AsyncRead + Unpin> Archive<R> {
#[inline]
pub async fn read_header_async(reader: R) -> io::Result<Self> {
Self::read_header_with_buffer_async(reader, Default::default()).await
}
async fn read_header_with_buffer_async(mut reader: R, buf: Vec<RawChunk>) -> io::Result<Self> {
read_pna_header_async(&mut reader).await?;
let mut chunk_reader = ChunkReader::new(&mut reader, None);
let chunk = chunk_reader.read_chunk_async().await?;
if chunk.ty != ChunkType::AHED {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Unexpected Chunk `{}`", chunk.ty),
));
}
let header = ArchiveHeader::try_from_bytes(chunk.data())?;
Ok(Self::with_buffer(reader, header, buf))
}
async fn next_raw_item_async(&mut self) -> io::Result<Option<RawEntry>> {
let mut chunks = Vec::new();
swap(&mut self.buf, &mut chunks);
let mut reader = ChunkReader::new(&mut self.inner, self.max_chunk_size);
loop {
let chunk = reader.read_chunk_async().await?;
match chunk.ty {
ChunkType::FEND | ChunkType::SEND => {
chunks.push(chunk);
break;
}
ChunkType::ANXT => self.next_archive = true,
ChunkType::AEND => {
self.buf = chunks;
return Ok(None);
}
_ => chunks.push(chunk),
}
}
Ok(Some(RawEntry(chunks)))
}
#[inline]
pub async fn read_entry_async(&mut self) -> io::Result<Option<ReadEntry>> {
self.next_raw_item_async()
.await?
.map(TryInto::try_into)
.transpose()
}
}
pub(crate) struct RawEntries<'r, R>(&'r mut Archive<R>);
impl<R: Read> Iterator for RawEntries<'_, R> {
type Item = io::Result<RawEntry>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.0.next_raw_item().transpose()
}
}
#[cfg(feature = "unstable-async")]
impl<R: futures_io::AsyncRead + Unpin> futures_util::Stream for RawEntries<'_, R> {
type Item = io::Result<RawEntry>;
#[inline]
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use futures_util::Future;
let this = self.get_mut();
let mut pinned = std::pin::pin!(this.0.next_raw_item_async());
pinned.as_mut().poll(cx).map(|it| it.transpose())
}
}
pub struct Entries<'r, R> {
reader: &'r mut Archive<R>,
}
impl<'r, R> Entries<'r, R> {
#[inline]
pub(crate) const fn new(reader: &'r mut Archive<R>) -> Self {
Self { reader }
}
#[inline]
pub fn extract_solid_entries(self, password: Option<&'r [u8]>) -> NormalEntries<'r, R> {
NormalEntries::new(self.reader, password)
}
}
impl<'r, R: Read> Entries<'r, R> {
#[inline]
pub fn skip_solid(self) -> impl Iterator<Item = io::Result<NormalEntry>> + 'r {
self.filter_map(|it| match it {
Ok(e) => match e {
ReadEntry::Solid(_) => None,
ReadEntry::Normal(r) => Some(Ok(r)),
},
Err(e) => Some(Err(e)),
})
}
}
impl<R: Read> Iterator for Entries<'_, R> {
type Item = io::Result<ReadEntry>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.reader.read_entry().transpose()
}
}
#[cfg(feature = "unstable-async")]
impl<R: futures_io::AsyncRead + Unpin> futures_util::Stream for Entries<'_, R> {
type Item = io::Result<ReadEntry>;
#[inline]
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use futures_util::Future;
let this = self.get_mut();
let mut pinned = std::pin::pin!(this.reader.read_entry_async());
pinned.as_mut().poll(cx).map(|it| it.transpose())
}
}
pub struct NormalEntries<'r, R> {
reader: &'r mut Archive<R>,
password: Option<&'r [u8]>,
solid_iter: Option<crate::entry::SolidIntoEntries>,
}
impl<'r, R> NormalEntries<'r, R> {
#[inline]
pub(crate) fn new(reader: &'r mut Archive<R>, password: Option<&'r [u8]>) -> Self {
Self {
reader,
password,
solid_iter: None,
}
}
}
impl<R: Read> Iterator for NormalEntries<'_, R> {
type Item = io::Result<NormalEntry>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(iter) = &mut self.solid_iter {
if let Some(item) = iter.next() {
return Some(item);
}
self.solid_iter = None;
}
match self.reader.read_entry() {
Ok(Some(ReadEntry::Normal(entry))) => return Some(Ok(entry)),
Ok(Some(ReadEntry::Solid(entry))) => match entry.into_entries(self.password) {
Ok(iter) => {
self.solid_iter = Some(iter);
continue;
}
Err(e) => return Some(Err(e)),
},
Ok(None) => return None,
Err(e) => return Some(Err(e)),
}
}
}
}
impl<R: Read + Seek> Archive<R> {
#[inline]
pub fn seek_to_end(&mut self) -> io::Result<()> {
let mut reader = ChunkReader::new(&mut self.inner, self.max_chunk_size);
let byte = loop {
let (ty, byte_length) = reader.skip_chunk()?;
if ty == ChunkType::AEND {
break byte_length;
} else if ty == ChunkType::ANXT {
self.next_archive = true;
}
};
self.inner.seek(SeekFrom::Current(-(byte as i64)))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]
fn decode() {
let file_bytes = include_bytes!("../../../resources/test/empty.pna");
let mut reader = Archive::read_header(&file_bytes[..]).unwrap();
let mut entries = reader.entries();
assert!(entries.next().is_none());
}
#[cfg(feature = "unstable-async")]
#[tokio::test]
async fn decode_async() {
use tokio_util::compat::TokioAsyncReadCompatExt;
let input = include_bytes!("../../../resources/test/zstd.pna");
let file = io::Cursor::new(input).compat();
let mut reader = Archive::read_header_async(file).await.unwrap();
assert!(reader.read_entry_async().await.unwrap().is_some());
assert!(reader.read_entry_async().await.unwrap().is_some());
assert!(reader.read_entry_async().await.unwrap().is_some());
assert!(reader.read_entry_async().await.unwrap().is_some());
assert!(reader.read_entry_async().await.unwrap().is_some());
assert!(reader.read_entry_async().await.unwrap().is_some());
assert!(reader.read_entry_async().await.unwrap().is_some());
assert!(reader.read_entry_async().await.unwrap().is_some());
assert!(reader.read_entry_async().await.unwrap().is_some());
assert!(reader.read_entry_async().await.unwrap().is_none());
}
#[cfg(feature = "unstable-async")]
#[tokio::test]
async fn extract_async() -> io::Result<()> {
use crate::ReadOptions;
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
let input = include_bytes!("../../../resources/test/zstd.pna");
let file = io::Cursor::new(input).compat();
let mut archive = Archive::read_header_async(file).await?;
while let Some(entry) = archive.read_entry_async().await? {
match entry {
ReadEntry::Solid(solid_entry) => {
for entry in solid_entry.entries(None)? {
let entry = entry?;
let mut file = io::Cursor::new(Vec::new());
let mut reader = entry.reader(ReadOptions::builder().build())?.compat();
tokio::io::copy(&mut reader, &mut file).await?;
}
}
ReadEntry::Normal(entry) => {
let mut file = io::Cursor::new(Vec::new());
let mut reader = entry.reader(ReadOptions::builder().build())?.compat();
tokio::io::copy(&mut reader, &mut file).await?;
}
}
}
Ok(())
}
}