#![allow(clippy::missing_panics_doc, clippy::missing_errors_doc)]
use std::fmt::Debug;
use std::io::{self, Write};
use std::marker::PhantomData;
use byteorder::{ByteOrder, LittleEndian};
use bytes::Bytes;
pub use flate2::Compression;
use flate2::DecompressError;
use flume::{unbounded, Receiver, Sender};
use log::warn;
use thiserror::Error;
use crate::check::Check;
use crate::par::compress::ParCompressBuilder;
use crate::syncz::{SyncZ, SyncZBuilder};
pub use crate::bgzf::{BgzfSyncReader, BgzfSyncWriter};
pub use crate::mgzip::{MgzipSyncReader, MgzipSyncWriter};
pub mod bgzf;
pub mod check;
#[cfg(feature = "deflate")]
pub mod deflate;
pub mod mgzip;
pub mod par;
#[cfg(feature = "snappy")]
pub mod snap;
pub mod syncz;
pub const BUFSIZE: usize = 64 * (1 << 10) * 2;
pub const DICT_SIZE: usize = 32768;
pub type CompressResult<C> = Result<(C, Vec<u8>), GzpError>;
#[derive(Error, Debug)]
pub enum GzpError {
#[error("Invalid buffer size ({0}), must be >= {1}")]
BufferSize(usize, usize),
#[error("Compressed block size ({0}) exceeds max allowed: ({1}), try increasing compression.")]
BlockSizeExceeded(usize, usize),
#[error("Failed to send over channel.")]
ChannelSend,
#[error(transparent)]
ChannelReceive(#[from] flume::RecvError),
#[error(transparent)]
DecompressError(#[from] DecompressError),
#[error(transparent)]
DeflateCompress(#[from] flate2::CompressError),
#[error("Invalid block size: {0}")]
InvalidBlockSize(&'static str),
#[error("Invalid checksum, found {found}, expected {expected}")]
InvalidCheck { found: u32, expected: u32 },
#[error("Invalid block header: {0}")]
InvalidHeader(&'static str),
#[error(transparent)]
Io(#[from] io::Error),
#[cfg(feature = "libdeflate")]
#[error("LibDeflater compression error: {0:?}")]
LibDeflaterCompress(libdeflater::CompressionError),
#[cfg(feature = "libdeflate")]
#[error("LibDelfater compression level error: {0:?}")]
LibDeflaterCompressionLvl(libdeflater::CompressionLvlError),
#[cfg(feature = "libdeflate")]
#[error(transparent)]
LibDelfaterDecompress(#[from] libdeflater::DecompressionError),
#[error("Invalid number of threads ({0}) selected.")]
NumThreads(usize),
#[error("Unknown")]
Unknown,
}
pub trait ZWriter<W>: Write {
fn finish(&mut self) -> Result<W, GzpError>;
}
pub trait SyncWriter<W: Write>: Send {
type OutputWriter: Write;
fn sync_writer(writer: W, compression_level: Compression) -> Self::OutputWriter;
}
pub struct ZBuilder<F, W>
where
F: FormatSpec + SyncWriter<W>,
W: Write + Send + 'static,
{
num_threads: usize,
pin_threads: Option<usize>,
compression_level: Compression,
buffer_size: usize,
writer: PhantomData<W>,
format: PhantomData<F>,
}
impl<F, W> ZBuilder<F, W>
where
F: FormatSpec + SyncWriter<W>,
W: Write + Send + 'static,
{
pub fn new() -> Self {
Self {
num_threads: num_cpus::get(),
pin_threads: None,
compression_level: Compression::new(3),
buffer_size: F::DEFAULT_BUFSIZE,
writer: PhantomData,
format: PhantomData,
}
}
pub fn compression_level(mut self, compression_level: Compression) -> Self {
self.compression_level = compression_level;
self
}
pub fn num_threads(mut self, num_threads: usize) -> Self {
self.num_threads = num_threads;
self
}
pub fn pin_threads(mut self, pin_threads: Option<usize>) -> Self {
if core_affinity::get_core_ids().is_none() {
warn!("Pinning threads is not supported on your platform. Please see core_affinity_rs. No threads will be pinned, but everything will work.");
self.pin_threads = None;
} else {
self.pin_threads = pin_threads;
}
self
}
pub fn buffer_size(mut self, buffer_size: usize) -> Self {
self.buffer_size = buffer_size;
self
}
#[allow(clippy::missing_panics_doc)]
pub fn from_writer(self, writer: W) -> Box<dyn ZWriter<W>>
where
SyncZ<<F as SyncWriter<W>>::OutputWriter>: ZWriter<W> + Send,
{
if self.num_threads > 1 {
Box::new(
ParCompressBuilder::<F>::new()
.compression_level(self.compression_level)
.num_threads(self.num_threads)
.unwrap()
.buffer_size(self.buffer_size)
.unwrap()
.pin_threads(self.pin_threads)
.from_writer(writer),
)
} else {
Box::new(
SyncZBuilder::<F, W>::new()
.compression_level(self.compression_level)
.from_writer(writer),
)
}
}
}
impl<F, W> Default for ZBuilder<F, W>
where
F: FormatSpec + SyncWriter<W>,
W: Write + Send + 'static,
{
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub(crate) struct Message<C>
where
C: Check + Send,
{
buffer: Bytes,
oneshot: Sender<CompressResult<C>>,
dictionary: Option<Bytes>,
is_last: bool,
}
impl<C> Message<C>
where
C: Check + Send,
{
pub(crate) fn new_parts(
buffer: Bytes,
dictionary: Option<Bytes>,
) -> (Self, Receiver<CompressResult<C>>) {
let (tx, rx) = unbounded();
(
Message {
buffer,
oneshot: tx,
dictionary,
is_last: false,
},
rx,
)
}
}
#[derive(Debug)]
pub struct Pair {
num_bytes: isize,
value: usize,
}
pub trait FormatSpec: Clone + Copy + Debug + Send + Sync + 'static {
type C: Check + Send + 'static;
type Compressor;
const DEFAULT_BUFSIZE: usize = BUFSIZE;
fn new() -> Self;
#[inline]
fn create_check() -> Self::C {
Self::C::new()
}
fn needs_dict(&self) -> bool;
fn create_compressor(
&self,
compression_level: Compression,
) -> Result<Self::Compressor, GzpError>;
fn encode(
&self,
input: &[u8],
encoder: &mut Self::Compressor,
compression_level: Compression,
dict: Option<&Bytes>,
is_last: bool,
) -> Result<Vec<u8>, GzpError>;
fn header(&self, compression_level: Compression) -> Vec<u8>;
fn footer(&self, check: &Self::C) -> Vec<u8>;
fn to_bytes(&self, pairs: &[Pair]) -> Vec<u8> {
let bytes_to_write = pairs
.iter()
.map(|p| isize::abs(p.num_bytes) as usize)
.sum::<usize>();
let mut buffer = Vec::with_capacity(bytes_to_write);
for Pair { num_bytes, value } in pairs {
let mut n = *num_bytes;
if n < 0 {
n = -n << 3;
loop {
n -= 8;
buffer.push((value >> n) as u8);
if n == 0 {
break;
}
}
} else {
let mut counter = 0;
loop {
buffer.push((value >> counter) as u8);
counter += 8;
if counter == num_bytes * 8 {
break;
}
}
}
}
buffer
}
}
#[derive(Debug, Copy, Clone)]
pub struct FooterValues {
pub sum: u32,
pub amount: u32,
}
pub trait BlockFormatSpec: FormatSpec {
type B: Check + Send + 'static;
type Decompressor;
const HEADER_SIZE: usize;
fn create_decompressor(&self) -> Self::Decompressor;
fn decode_block(
&self,
decoder: &mut Self::Decompressor,
input: &[u8],
orig_size: usize,
) -> Result<Vec<u8>, GzpError>;
fn check_header(&self, _bytes: &[u8]) -> Result<(), GzpError>;
fn get_block_size(&self, _bytes: &[u8]) -> Result<usize, GzpError>;
#[inline]
fn get_footer_values(&self, input: &[u8]) -> FooterValues {
let check_sum = LittleEndian::read_u32(&input[input.len() - 8..input.len() - 4]);
let check_amount = LittleEndian::read_u32(&input[input.len() - 4..]);
FooterValues {
sum: check_sum,
amount: check_amount,
}
}
}