use std::collections::{BTreeMap, HashMap};
use std::default::Default;
use std::error;
use std::fmt;
use std::path::PathBuf;
use blake2::{Blake2b512, Digest};
use futures_util::{future, StreamExt};
use tokio::fs;
use tokio::io;
use tokio::io::AsyncSeekExt;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::task::JoinError;
use crate::chunk_dictionary;
use crate::chunker;
use crate::Compression;
use crate::CompressionAlgorithm;
pub const PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
#[derive(Clone, Debug)]
pub struct CreateArchiveOptions {
pub chunker_config: chunker::Config,
pub num_chunk_buffers: usize,
pub chunk_hash_length: usize,
pub temporary_file_override: Option<PathBuf>,
pub compression: Option<Compression>,
pub metadata: BTreeMap<String, Vec<u8>>,
}
impl Default for CreateArchiveOptions {
fn default() -> CreateArchiveOptions {
let num_buffers = match num_cpus::get() {
0 | 1 => 1,
n => n * 2,
};
CreateArchiveOptions {
chunker_config: chunker::Config::RollSum(chunker::FilterConfig::default()),
num_chunk_buffers: num_buffers,
chunk_hash_length: 64,
temporary_file_override: None,
compression: Some(Compression {
algorithm: CompressionAlgorithm::Brotli,
level: 6,
}),
metadata: BTreeMap::new(),
}
}
}
#[derive(Default)]
pub struct CreateArchiveResult {
pub source_hash: Vec<u8>,
pub source_length: usize,
pub header: chunk_dictionary::ChunkDictionary,
}
#[derive(Debug)]
pub enum CreateArchiveError {
TempFileError(io::Error),
ChunkerError(JoinError),
OutputWriteError(io::Error),
}
impl fmt::Display for CreateArchiveError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CreateArchiveError::TempFileError(_) => {
write!(f, "Error occurred while operating on the temp file")
}
CreateArchiveError::ChunkerError(_) => write!(f, "Error chunking the input file"),
CreateArchiveError::OutputWriteError(_) => write!(f, "Error writing to output file"),
}
}
}
impl error::Error for CreateArchiveError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
CreateArchiveError::TempFileError(e) => Some(e),
CreateArchiveError::ChunkerError(e) => Some(e),
CreateArchiveError::OutputWriteError(e) => Some(e),
}
}
}
pub async fn create_archive<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin>(
mut input: R,
mut output: W,
options: &CreateArchiveOptions,
) -> Result<CreateArchiveResult, CreateArchiveError> {
let mut source_hasher = Blake2b512::new();
let mut source_length: usize = 0;
let mut chunk_order = Vec::new();
let mut unique_chunks = HashMap::new();
let mut unique_chunk_index: usize = 0;
let chunker = options.chunker_config.new_chunker(&mut input);
let mut chunk_stream = chunker
.map(|result| {
let (offset, chunk) = result.expect("Error chunking");
source_hasher.update(chunk.data());
source_length += chunk.len();
tokio::task::spawn_blocking(move || (offset, chunk.verify()))
})
.buffered(options.num_chunk_buffers)
.filter_map(|result| {
let (offset, verified) = result.expect("error while hashing chunk");
let (unique, chunk_index) = if unique_chunks.contains_key(verified.hash()) {
(false, *unique_chunks.get(verified.hash()).unwrap())
} else {
let chunk_index = unique_chunk_index;
unique_chunks.insert(verified.hash().clone(), chunk_index);
unique_chunk_index += 1;
(true, chunk_index)
};
chunk_order.push(chunk_index);
future::ready(if unique {
Some((chunk_index, offset, verified))
} else {
None
})
})
.map(|(chunk_index, offset, verified)| {
let _opt = options.clone();
tokio::task::spawn_blocking(move || {
let compressed = verified
.chunk()
.clone()
.compress(_opt.compression)
.expect("compress chunk");
let (_compression, bytes) = compressed.into_inner();
(chunk_index, offset, verified, bytes)
})
})
.buffered(options.num_chunk_buffers);
let mut archive_offset: u64 = 0;
let mut archive_chunks = Vec::new();
let mut temp_file = if let Some(p) = &options.temporary_file_override {
fs::File::create(p).await
} else {
tempfile::tempfile().map(tokio::fs::File::from_std)
}
.map_err(CreateArchiveError::TempFileError)?;
while let Some(result) = chunk_stream.next().await {
let (_chunk_index, _offset, verified, compressed_bytes) =
result.map_err(CreateArchiveError::ChunkerError)?;
let use_data = {
if compressed_bytes.len() < verified.chunk().len() {
&compressed_bytes
} else {
verified.chunk().data()
}
};
let mut hash = verified.hash().clone();
hash.truncate(options.chunk_hash_length);
temp_file
.write_all(use_data)
.await
.map_err(CreateArchiveError::TempFileError)?;
archive_chunks.push(chunk_dictionary::ChunkDescriptor {
checksum: hash.to_vec(),
source_size: verified.len() as u32,
archive_offset,
archive_size: use_data.len() as u32,
});
archive_offset += use_data.len() as u64;
}
let chunker_params = match &options.chunker_config {
chunker::Config::BuzHash(hash_config) => chunk_dictionary::ChunkerParameters {
chunk_filter_bits: hash_config.filter_bits.bits(),
min_chunk_size: hash_config.min_chunk_size as u32,
max_chunk_size: hash_config.max_chunk_size as u32,
rolling_hash_window_size: hash_config.window_size as u32,
chunk_hash_length: options.chunk_hash_length as u32,
chunking_algorithm: chunk_dictionary::chunker_parameters::ChunkingAlgorithm::Buzhash
as i32,
},
chunker::Config::RollSum(hash_config) => chunk_dictionary::ChunkerParameters {
chunk_filter_bits: hash_config.filter_bits.bits(),
min_chunk_size: hash_config.min_chunk_size as u32,
max_chunk_size: hash_config.max_chunk_size as u32,
rolling_hash_window_size: hash_config.window_size as u32,
chunk_hash_length: options.chunk_hash_length as u32,
chunking_algorithm: chunk_dictionary::chunker_parameters::ChunkingAlgorithm::Rollsum
as i32,
},
chunker::Config::FixedSize(chunk_size) => chunk_dictionary::ChunkerParameters {
min_chunk_size: 0,
chunk_filter_bits: 0,
rolling_hash_window_size: 0,
max_chunk_size: *chunk_size as u32,
chunk_hash_length: options.chunk_hash_length as u32,
chunking_algorithm: chunk_dictionary::chunker_parameters::ChunkingAlgorithm::FixedSize
as i32,
},
};
drop(chunk_stream);
let source_hash = source_hasher.finalize().to_vec();
let file_header = chunk_dictionary::ChunkDictionary {
rebuild_order: chunk_order.iter().map(|&index| index as u32).collect(),
application_version: PKG_VERSION.to_string(),
chunk_descriptors: archive_chunks,
source_checksum: source_hash.clone(),
source_total_size: source_length as u64,
chunker_params: Some(chunker_params),
chunk_compression: Some(options.compression.into()),
metadata: options.metadata.clone(),
};
let header_buf = crate::header::build(&file_header, None).expect("Failed to create header");
output
.write_all(&header_buf)
.await
.map_err(CreateArchiveError::OutputWriteError)?;
temp_file
.rewind()
.await
.map_err(CreateArchiveError::TempFileError)?;
io::copy(&mut temp_file, &mut output)
.await
.map_err(CreateArchiveError::OutputWriteError)?;
Ok(CreateArchiveResult {
source_length,
source_hash,
header: file_header.clone(),
})
}