use super::cluster::ClusterCreator;
use super::clusterwriter::ClusterWriterProxy;
use super::{CompHint, ContentAdder, Progress};
use crate::bases::*;
use crate::common::{
CheckInfo, CheckKind, ContentAddress, ContentInfo, ContentPackHeader, PackHeader,
PackHeaderInfo, PackKind,
};
use crate::creator::{Compression, InputReader, NamedFile, PackData, PackRecipient};
use std::cell::Cell;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::Arc;
use log::info;
fn shannon_entropy(data: &[u8]) -> Result<f32> {
let mut entropy = 0.0;
let mut counts = [0; 256];
for byte in data {
counts[*byte as usize] += 1;
}
for &count in &counts {
if count == 0 {
continue;
}
let p: f32 = (count as f32) / (data.len() as f32);
entropy -= p * p.log(2.0);
}
Ok(entropy)
}
pub struct ContentPackCreator<O: PackRecipient + ?Sized> {
app_vendor_id: VendorId,
pack_id: PackId,
free_data: PackFreeData,
content_infos: Vec<ContentInfo>,
raw_open_cluster: Option<ClusterCreator>,
comp_open_cluster: Option<ClusterCreator>,
next_cluster_id: Cell<u32>,
cluster_writer: ClusterWriterProxy<Box<O>>,
progress: Arc<dyn Progress>,
compression: Compression,
}
macro_rules! open_cluster_ref {
($self:expr, $comp: expr) => {
if $comp {
&$self.comp_open_cluster
} else {
&$self.raw_open_cluster
}
};
(mut $self:expr, $comp: expr) => {
if $comp {
&mut $self.comp_open_cluster
} else {
&mut $self.raw_open_cluster
}
};
}
impl ContentPackCreator<NamedFile> {
pub fn new<P: AsRef<Path>>(
path: P,
pack_id: PackId,
app_vendor_id: VendorId,
free_data: PackFreeData,
compression: Compression,
) -> Result<Self> {
Self::new_with_progress(
path,
pack_id,
app_vendor_id,
free_data,
compression,
Arc::new(()),
)
}
pub fn new_with_progress<P: AsRef<Path>>(
path: P,
pack_id: PackId,
app_vendor_id: VendorId,
free_data: PackFreeData,
compression: Compression,
progress: Arc<dyn Progress>,
) -> Result<Self> {
let file = NamedFile::new(path)?;
Self::new_from_output_with_progress(
file,
pack_id,
app_vendor_id,
free_data,
compression,
progress,
)
}
}
impl<O: PackRecipient + 'static + ?Sized> ContentPackCreator<O> {
pub fn new_from_output(
file: Box<O>,
pack_id: PackId,
app_vendor_id: VendorId,
free_data: PackFreeData,
compression: Compression,
) -> Result<Self> {
Self::new_from_output_with_progress(
file,
pack_id,
app_vendor_id,
free_data,
compression,
Arc::new(()),
)
}
pub fn new_from_output_with_progress(
mut file: Box<O>,
pack_id: PackId,
app_vendor_id: VendorId,
free_data: PackFreeData,
compression: Compression,
progress: Arc<dyn Progress>,
) -> Result<Self> {
file.seek(SeekFrom::Start(
(PackHeader::BLOCK_SIZE + ContentPackHeader::BLOCK_SIZE) as u64,
))?;
let nb_threads = std::cmp::max(
std::thread::available_parallelism()
.unwrap_or(8.try_into().unwrap())
.get(),
2,
) - 1;
let cluster_writer =
ClusterWriterProxy::new(file, compression, nb_threads, Arc::clone(&progress));
Ok(Self {
app_vendor_id,
pack_id,
free_data,
content_infos: vec![],
raw_open_cluster: None,
comp_open_cluster: None,
next_cluster_id: Cell::new(0),
cluster_writer,
progress,
compression,
})
}
fn open_cluster(&self, compressed: bool) -> ClusterCreator {
let cluster_id = self.next_cluster_id.replace(self.next_cluster_id.get() + 1);
self.progress.new_cluster(cluster_id, compressed);
ClusterCreator::new(cluster_id.into(), compressed)
}
fn get_open_cluster(
&mut self,
compressed: bool,
content_size: Size,
) -> Result<&mut ClusterCreator> {
if let Some(cluster) = self.setup_slot_and_get_to_close(content_size, compressed) {
self.cluster_writer.write_cluster(cluster, compressed)?;
}
Ok(open_cluster_ref!(mut self, compressed).as_mut().unwrap())
}
fn setup_slot_and_get_to_close(
&mut self,
size: Size,
compressed: bool,
) -> Option<ClusterCreator> {
if let Some(cluster) = open_cluster_ref!(self, compressed).as_ref() {
if cluster.is_full(size) {
let new_cluster = self.open_cluster(compressed);
open_cluster_ref!(mut self, compressed).replace(new_cluster)
} else {
None
}
} else {
let new_cluster = self.open_cluster(compressed);
open_cluster_ref!(mut self, compressed).replace(new_cluster)
}
}
fn detect_compression(
&self,
content: &mut dyn InputReader,
comp_hint: CompHint,
) -> Result<bool> {
if let Compression::None = self.compression {
return Ok(false);
}
match comp_hint {
CompHint::Yes => Ok(true),
CompHint::No => Ok(false),
CompHint::Detect => {
let mut head = Vec::with_capacity(4 * 1024);
{
content.take(4 * 1024).read_to_end(&mut head)?;
}
let entropy = shannon_entropy(&head)?;
content.seek(SeekFrom::Start(0))?;
Ok(entropy <= 6.0)
}
}
}
pub fn add_content(
&mut self,
mut content: Box<dyn InputReader>,
comp_hint: CompHint,
) -> Result<ContentAddress> {
let content_size = content.size();
self.progress.content_added(content_size);
let should_compress = self.detect_compression(content.as_mut(), comp_hint)?;
let cluster = self.get_open_cluster(should_compress, content_size)?;
let content_info = cluster.add_content(content)?;
self.content_infos.push(content_info);
let content_id = ((self.content_infos.len() - 1) as u32).into();
Ok(ContentAddress::new(self.pack_id, content_id))
}
}
impl<O: PackRecipient + 'static + ?Sized> ContentAdder for ContentPackCreator<O> {
fn add_content(
&mut self,
content: Box<dyn InputReader>,
comp_hint: CompHint,
) -> Result<ContentAddress> {
self.add_content(content, comp_hint)
}
}
impl<O: PackRecipient + 'static + ?Sized> ContentPackCreator<O> {
pub fn finalize(mut self) -> Result<(Box<O>, PackData)> {
info!("======= Finalize creation =======");
if let Some(cluster) = self.raw_open_cluster.take() {
if !cluster.is_empty() {
self.cluster_writer.write_cluster(cluster, false)?;
}
}
if let Some(cluster) = self.comp_open_cluster.take() {
if !cluster.is_empty() {
self.cluster_writer.write_cluster(cluster, true)?;
}
}
info!("----- Finalize cluster_writer -----");
let (mut file, cluster_addresses) = self.cluster_writer.finalize()?;
let clusters_offset = file.tell();
let mut buffered = std::io::BufWriter::new(file);
info!("----- Write cluster addresses -----");
let nb_clusters = cluster_addresses.len();
buffered.ser_callable(&|ser| {
for address in &cluster_addresses {
address.get().serialize(ser)?;
}
Ok(())
})?;
info!("----- Write content info -----");
let content_infos_offset = buffered.tell();
buffered.ser_callable(&|ser| {
for content_info in &self.content_infos {
content_info.serialize(ser)?;
}
Ok(())
})?;
let check_offset = buffered.tell();
let pack_size: Size =
(check_offset + CheckKind::Blake3.block_size() + PackHeader::BLOCK_SIZE).into();
buffered.rewind()?;
info!("----- Write pack header -----");
let pack_header = PackHeader::new(
PackKind::Content,
PackHeaderInfo::new(self.app_vendor_id, pack_size, check_offset),
);
buffered.ser_write(&pack_header)?;
info!("----- Write content pack header -----");
let header = ContentPackHeader::new(
self.free_data,
clusters_offset,
(nb_clusters as u32).into(),
content_infos_offset,
(self.content_infos.len() as u32).into(),
);
buffered.ser_write(&header)?;
buffered.flush()?;
let mut file = buffered.into_inner().unwrap();
info!("----- Compute checksum -----");
file.rewind()?;
let check_info = CheckInfo::new_blake3(&mut file)?;
file.ser_write(&check_info)?;
file.rewind()?;
let mut tail_buffer = [0u8; PackHeader::BLOCK_SIZE];
file.read_exact(&mut tail_buffer)?;
tail_buffer.reverse();
file.seek(SeekFrom::End(0))?;
file.write_all(&tail_buffer)?;
Ok((
file,
PackData {
uuid: pack_header.uuid,
pack_id: self.pack_id,
pack_kind: PackKind::Content,
free_data: Default::default(),
pack_size,
check_info,
},
))
}
}