use crate::{
archive_reader::ArchiveReader, chunk_dictionary as dict, chunker,
compression::CompressionAlgorithm, header, ChunkIndex, ChunkOffset, CompressedArchiveChunk,
CompressedChunk, Compression, HashSum,
};
use blake2::{Blake2b512, Digest};
use futures_util::{stream::Stream, StreamExt};
use std::collections::BTreeMap;
use std::{
convert::TryInto,
fmt,
task::{ready, Poll},
};
#[derive(Debug)]
pub enum ArchiveError<R> {
InvalidArchive(Box<dyn std::error::Error + Send + Sync>),
ReaderError(R),
}
impl<R> ArchiveError<R> {
fn invalid_archive<T: Into<Box<dyn std::error::Error + Send + Sync>>>(err: T) -> Self {
Self::InvalidArchive(err.into())
}
}
impl<R> std::error::Error for ArchiveError<R>
where
R: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
ArchiveError::InvalidArchive(err) => Some(err.as_ref()),
ArchiveError::ReaderError(err) => Some(err),
}
}
}
impl<R> fmt::Display for ArchiveError<R>
where
R: std::error::Error,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidArchive(_) => write!(f, "invalid archive"),
Self::ReaderError(_) => write!(f, "reader error"),
}
}
}
impl<R> From<prost::DecodeError> for ArchiveError<R> {
fn from(err: prost::DecodeError) -> Self {
ArchiveError::InvalidArchive(Box::new(err))
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ChunkDescriptor {
pub checksum: HashSum,
pub archive_size: usize,
pub archive_offset: u64,
pub source_size: u32,
}
impl ChunkDescriptor {
pub fn archive_end_offset(&self) -> u64 {
self.archive_offset + self.archive_size as u64
}
}
pub struct Archive<R> {
reader: R,
archive_chunks: Vec<ChunkDescriptor>,
source_order: Vec<usize>,
total_chunks: usize,
header_size: usize,
header_checksum: HashSum,
chunk_compression: Option<Compression>,
created_by_app_version: String,
chunk_data_offset: u64,
source_total_size: u64,
source_checksum: HashSum,
chunker_config: chunker::Config,
chunk_hash_length: usize,
metadata: BTreeMap<String, Vec<u8>>,
}
impl<R> Archive<R> {
fn verify_pre_header<E>(pre_header: &[u8]) -> Result<(), ArchiveError<E>> {
if pre_header.len() < header::ARCHIVE_MAGIC.len() {
return Err(ArchiveError::invalid_archive("not an archive"));
}
if &pre_header[0..header::ARCHIVE_MAGIC.len()] != header::ARCHIVE_MAGIC
&& &pre_header[0..header::ARCHIVE_MAGIC.len()] != b"\0BITA1"
{
return Err(ArchiveError::invalid_archive("not an archive"));
}
Ok(())
}
pub async fn try_init(mut reader: R) -> Result<Self, ArchiveError<R::Error>>
where
R: ArchiveReader,
{
let mut header: Vec<u8> = reader
.read_at(0, header::PRE_HEADER_SIZE)
.await
.map_err(ArchiveError::ReaderError)?
.to_vec();
Self::verify_pre_header(&header)?;
let dictionary_size = u64::from_le_bytes(
header[header::ARCHIVE_MAGIC.len()..header::PRE_HEADER_SIZE]
.try_into()
.unwrap(),
) as usize;
header.extend_from_slice(
&reader
.read_at(header::PRE_HEADER_SIZE as u64, dictionary_size + 8 + 64)
.await
.map_err(ArchiveError::ReaderError)?,
);
let header_checksum = {
let mut hasher = Blake2b512::new();
let offs = header::PRE_HEADER_SIZE + dictionary_size + 8;
hasher.update(&header[..offs]);
let header_checksum = HashSum::from(&header[offs..(offs + 64)]);
if header_checksum != &hasher.finalize()[..] {
return Err(ArchiveError::invalid_archive("invalid header checksum"));
}
header_checksum
};
let dictionary: dict::ChunkDictionary = {
let offs = header::PRE_HEADER_SIZE;
prost::Message::decode(&header[offs..(offs + dictionary_size)])?
};
let chunk_data_offset = {
let offs = header::PRE_HEADER_SIZE + dictionary_size;
u64::from_le_bytes(header[offs..(offs + 8)].try_into().unwrap())
};
let archive_chunks = dictionary
.chunk_descriptors
.into_iter()
.map(|dict| ChunkDescriptor {
checksum: dict.checksum.into(),
archive_size: dict.archive_size as usize,
archive_offset: chunk_data_offset + dict.archive_offset,
source_size: dict.source_size,
})
.collect();
let chunker_params = dictionary
.chunker_params
.ok_or_else(|| ArchiveError::invalid_archive("invalid chunker parameters"))?;
let chunk_hash_length = chunker_params.chunk_hash_length as usize;
let source_order: Vec<usize> = dictionary
.rebuild_order
.into_iter()
.map(|v| v as usize)
.collect();
Ok(Self {
reader,
archive_chunks,
header_checksum,
header_size: header.len(),
source_total_size: dictionary.source_total_size,
source_checksum: dictionary.source_checksum.into(),
created_by_app_version: dictionary.application_version.clone(),
chunk_compression: compression_from_dictionary(
dictionary
.chunk_compression
.ok_or_else(|| ArchiveError::invalid_archive("invalid compression"))?,
)?,
total_chunks: source_order.len(),
source_order,
chunk_data_offset,
chunk_hash_length,
chunker_config: chunker_config_from_params(chunker_params)?,
metadata: dictionary.metadata,
})
}
pub fn total_chunks(&self) -> usize {
self.total_chunks
}
pub fn unique_chunks(&self) -> usize {
self.archive_chunks.len()
}
pub fn compressed_size(&self) -> u64 {
self.archive_chunks
.iter()
.map(|c| c.archive_size as u64)
.sum()
}
pub fn chunk_data_offset(&self) -> u64 {
self.chunk_data_offset
}
pub fn chunk_descriptors(&self) -> &[ChunkDescriptor] {
&self.archive_chunks
}
pub fn total_source_size(&self) -> u64 {
self.source_total_size
}
pub fn source_checksum(&self) -> &HashSum {
&self.source_checksum
}
pub fn chunker_config(&self) -> &chunker::Config {
&self.chunker_config
}
pub fn header_checksum(&self) -> &HashSum {
&self.header_checksum
}
pub fn header_size(&self) -> usize {
self.header_size
}
pub fn chunk_hash_length(&self) -> usize {
self.chunk_hash_length
}
pub fn chunk_compression(&self) -> Option<Compression> {
self.chunk_compression
}
pub fn built_with_version(&self) -> &str {
&self.created_by_app_version
}
pub fn metadata_iter(&self) -> impl Iterator<Item = (&str, &[u8])> {
self.metadata
.iter()
.map(|(k, v)| (k.as_str(), v.as_slice()))
}
pub fn metadata_value(&self, key: &str) -> Option<&[u8]> {
self.metadata.get(key).map(|v| v.as_slice())
}
pub fn iter_source_chunks(&self) -> impl Iterator<Item = (u64, &ChunkDescriptor)> {
let mut chunk_offset = 0;
self.source_order.iter().copied().map(move |index| {
let offset = chunk_offset;
let cd = &self.archive_chunks[index];
chunk_offset += cd.source_size as u64;
(offset, cd)
})
}
pub fn build_source_index(&self) -> ChunkIndex {
let mut ci = ChunkIndex::new_empty(self.chunk_hash_length);
self.iter_source_chunks().for_each(|(offset, cd)| {
ci.add_chunk(cd.checksum.clone(), cd.source_size as usize, &[offset]);
});
ci
}
pub fn chunk_stream<'a>(
&'a mut self,
chunks: &ChunkIndex,
) -> impl Stream<Item = Result<CompressedArchiveChunk, R::Error>> + Unpin + Sized + 'a
where
R: ArchiveReader + 'a,
{
let descriptors: Vec<&ChunkDescriptor> = self
.archive_chunks
.iter()
.filter(|cd| chunks.contains(&cd.checksum))
.collect();
let read_at: Vec<ChunkOffset> = descriptors
.iter()
.map(|cd| ChunkOffset::new(cd.archive_offset, cd.archive_size))
.collect();
let archive_chunk_compression = self.chunk_compression().map(|c| c.algorithm);
let stream = self
.reader
.read_chunks(read_at)
.enumerate()
.map(move |(index, result)| {
match result {
Ok(chunk) => {
let descriptor = descriptors[index];
let source_size: usize = descriptor.source_size.try_into().unwrap();
Ok(CompressedArchiveChunk {
chunk: CompressedChunk {
compression: if source_size == chunk.len() {
None
} else {
archive_chunk_compression
},
data: chunk,
source_size,
},
expected_hash: descriptor.checksum.clone(),
})
}
Err(err) => Err(err),
}
});
StreamUntilFirstError::new(stream)
}
}
fn chunker_config_from_params<R>(
p: dict::ChunkerParameters,
) -> Result<chunker::Config, ArchiveError<R>> {
use dict::chunker_parameters::ChunkingAlgorithm;
match ChunkingAlgorithm::try_from(p.chunking_algorithm) {
Ok(ChunkingAlgorithm::Buzhash) => Ok(chunker::Config::BuzHash(chunker::FilterConfig {
filter_bits: chunker::FilterBits::from_bits(p.chunk_filter_bits),
min_chunk_size: p.min_chunk_size as usize,
max_chunk_size: p.max_chunk_size as usize,
window_size: p.rolling_hash_window_size as usize,
})),
Ok(ChunkingAlgorithm::Rollsum) => Ok(chunker::Config::RollSum(chunker::FilterConfig {
filter_bits: chunker::FilterBits::from_bits(p.chunk_filter_bits),
min_chunk_size: p.min_chunk_size as usize,
max_chunk_size: p.max_chunk_size as usize,
window_size: p.rolling_hash_window_size as usize,
})),
Ok(ChunkingAlgorithm::FixedSize) => {
Ok(chunker::Config::FixedSize(p.max_chunk_size as usize))
}
Err(_err) => Err(ArchiveError::invalid_archive("unknown chunking algorithm")),
}
}
fn compression_from_dictionary<R>(
c: dict::ChunkCompression,
) -> Result<Option<Compression>, ArchiveError<R>> {
use dict::chunk_compression::CompressionType;
match CompressionType::try_from(c.compression) {
#[cfg(feature = "lzma-compression")]
Ok(dict::chunk_compression::CompressionType::Lzma) => Ok(Some(Compression {
algorithm: CompressionAlgorithm::Lzma,
level: c.compression_level,
})),
#[cfg(not(feature = "lzma-compression"))]
Ok(CompressionType::Lzma) => Err(ArchiveError::invalid_archive(
"LZMA compression not enabled",
)),
#[cfg(feature = "zstd-compression")]
Ok(CompressionType::Zstd) => Ok(Some(Compression {
algorithm: CompressionAlgorithm::Zstd,
level: c.compression_level,
})),
#[cfg(not(feature = "zstd-compression"))]
Ok(CompressionType::Zstd) => Err(ArchiveError::invalid_archive(
"ZSTD compression not enabled",
)),
Ok(CompressionType::Brotli) => Ok(Some(Compression {
algorithm: CompressionAlgorithm::Brotli,
level: c.compression_level,
})),
Ok(CompressionType::None) => Ok(None),
Err(_err) => Err(ArchiveError::invalid_archive("unknown compression")),
}
}
struct StreamUntilFirstError<S> {
stream: S,
end: bool,
}
impl<S> StreamUntilFirstError<S> {
fn new(stream: S) -> Self {
Self { stream, end: false }
}
}
impl<S, T, E> Stream for StreamUntilFirstError<S>
where
S: Stream<Item = Result<T, E>> + Unpin,
{
type Item = S::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.end {
return Poll::Ready(None);
}
Poll::Ready(match ready!(self.stream.poll_next_unpin(cx)) {
Some(Err(r)) => {
self.end = true;
Some(Err(r))
}
other => other,
})
}
}