use std::cmp::{max, min};
use std::fmt::Debug;
use std::marker::PhantomData;
use crate::{Flags, gcd_utils, huffman_encoding};
use crate::bit_writer::BitWriter;
use crate::chunk_metadata::{ChunkMetadata, PrefixMetadata};
use crate::chunk_spec::ChunkSpec;
use crate::compression_table::CompressionTable;
use crate::constants::*;
use crate::data_types::{NumberLike, UnsignedLike};
use crate::delta_encoding;
use crate::delta_encoding::DeltaMoments;
use crate::errors::{QCompressError, QCompressResult};
use crate::gcd_utils::{GcdOperator, GeneralGcdOp, TrivialGcdOp};
use crate::prefix::{Prefix, PrefixCompressionInfo, WeightedPrefix};
use crate::prefix_optimization;
struct JumpstartConfiguration {
weight: usize,
jumpstart: usize,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct CompressorConfig {
pub compression_level: usize,
pub delta_encoding_order: usize,
pub use_gcds: bool,
}
impl Default for CompressorConfig {
fn default() -> Self {
Self {
compression_level: DEFAULT_COMPRESSION_LEVEL,
delta_encoding_order: 0,
use_gcds: true,
}
}
}
impl CompressorConfig {
pub fn with_compression_level(mut self, level: usize) -> Self {
self.compression_level = level;
self
}
pub fn with_delta_encoding_order(mut self, order: usize) -> Self {
self.delta_encoding_order = order;
self
}
pub fn with_use_gcds(mut self, use_gcds: bool) -> Self {
self.use_gcds = use_gcds;
self
}
}
#[derive(Clone, Debug)]
struct InternalCompressorConfig {
pub compression_level: usize,
}
impl From<&CompressorConfig> for InternalCompressorConfig {
fn from(config: &CompressorConfig) -> Self {
InternalCompressorConfig {
compression_level: config.compression_level,
}
}
}
impl Default for InternalCompressorConfig {
fn default() -> Self {
Self::from(&CompressorConfig::default())
}
}
fn cumulative_sum(sizes: &[usize]) -> Vec<usize> {
let mut res = Vec::with_capacity(sizes.len());
let mut sum = 0;
for s in sizes {
res.push(sum);
sum += s;
}
res
}
fn choose_run_len_jumpstart(
count: usize,
n: usize,
) -> JumpstartConfiguration {
let freq = (count as f64) / (n as f64);
let non_freq = 1.0 - freq;
let jumpstart = min((-non_freq.log2()).ceil() as usize, MAX_JUMPSTART);
let expected_n_runs = (freq * non_freq * n as f64).ceil() as usize;
JumpstartConfiguration {
weight: expected_n_runs,
jumpstart,
}
}
struct PrefixBuffer<'a, T: NumberLike> {
pub seq: Vec<WeightedPrefix<T>>,
prefix_idx: usize,
max_n_pref: usize,
n_unsigneds: usize,
sorted: &'a [T::Unsigned],
use_gcd: bool,
pub target_j: usize,
}
impl<'a, T: NumberLike> PrefixBuffer<'a, T> {
fn calc_target_j(&mut self) {
self.target_j = ((self.prefix_idx + 1) * self.n_unsigneds) / self.max_n_pref
}
fn new(
max_n_pref: usize,
n_unsigneds: usize,
sorted: &'a [T::Unsigned],
use_gcd: bool,
) -> Self {
let mut res = Self {
seq: Vec::with_capacity(max_n_pref),
prefix_idx: 0,
max_n_pref,
n_unsigneds,
sorted,
use_gcd,
target_j: 0
};
res.calc_target_j();
res
}
fn push_pref(
&mut self,
i: usize,
j: usize,
) {
let sorted = self.sorted;
let n_unsigneds = self.n_unsigneds;
let count = j - i;
let frequency = count as f64 / self.n_unsigneds as f64;
let new_prefix_idx = max(self.prefix_idx + 1, (j * self.max_n_pref) / n_unsigneds);
let lower = T::from_unsigned(sorted[i]);
let upper = T::from_unsigned(sorted[j - 1]);
let gcd = if self.use_gcd {
gcd_utils::gcd(&sorted[i..j])
} else {
T::Unsigned::ONE
};
let wp = if n_unsigneds < MIN_N_TO_USE_RUN_LEN || frequency < MIN_FREQUENCY_TO_USE_RUN_LEN || count == n_unsigneds {
WeightedPrefix::new(
count,
count,
lower,
upper,
None,
gcd,
)
} else {
let config = choose_run_len_jumpstart(count, n_unsigneds);
WeightedPrefix::new(
count,
config.weight,
lower,
upper,
Some(config.jumpstart),
gcd,
)
};
self.seq.push(wp);
self.prefix_idx = new_prefix_idx;
self.calc_target_j();
}
}
fn choose_max_n_prefixes(comp_level: usize, n_unsigneds: usize) -> usize {
let log_n = (n_unsigneds as f64).log2().floor() as usize;
let max_comp_level_for_n = min(MAX_COMPRESSION_LEVEL, log_n / 2 + 5);
let real_comp_level = comp_level.saturating_sub(MAX_COMPRESSION_LEVEL - max_comp_level_for_n);
min(1_usize << real_comp_level, n_unsigneds)
}
fn choose_unoptimized_prefixes<T: NumberLike>(
sorted: &[T::Unsigned],
internal_config: &InternalCompressorConfig,
flags: &Flags,
) -> Vec<WeightedPrefix<T>> {
let n_unsigneds = sorted.len();
let max_n_pref = choose_max_n_prefixes(internal_config.compression_level, n_unsigneds);
let use_gcd = flags.use_gcds;
let mut i = 0;
let mut backup_j = 0_usize;
let mut prefix_buffer = PrefixBuffer::<T>::new(
max_n_pref,
n_unsigneds,
sorted,
use_gcd,
);
for j in 1..n_unsigneds {
let target_j = prefix_buffer.target_j;
if sorted[j] == sorted[j - 1] {
if j >= target_j && j - target_j >= target_j - backup_j && backup_j > i {
prefix_buffer.push_pref(i, backup_j);
i = backup_j;
}
} else {
backup_j = j;
if j >= target_j {
prefix_buffer.push_pref(i, j);
i = j;
}
}
}
prefix_buffer.push_pref(i, n_unsigneds);
prefix_buffer.seq
}
fn train_prefixes<T: NumberLike>(
unsigneds: Vec<T::Unsigned>,
internal_config: &InternalCompressorConfig,
flags: &Flags,
n: usize, ) -> QCompressResult<Vec<Prefix<T>>> {
if unsigneds.is_empty() {
return Ok(Vec::new());
}
let comp_level = internal_config.compression_level;
if comp_level > MAX_COMPRESSION_LEVEL {
return Err(QCompressError::invalid_argument(format!(
"compression level may not exceed {} (was {})",
MAX_COMPRESSION_LEVEL,
comp_level,
)));
}
if n > MAX_ENTRIES {
return Err(QCompressError::invalid_argument(format!(
"count may not exceed {} per chunk (was {})",
MAX_ENTRIES,
n,
)));
}
let unoptimized_prefs = {
let mut sorted = unsigneds;
sorted.sort_unstable();
choose_unoptimized_prefixes(
&sorted,
internal_config,
flags
)
};
let mut optimized_prefs = prefix_optimization::optimize_prefixes(
unoptimized_prefs,
flags,
n,
);
huffman_encoding::make_huffman_code(&mut optimized_prefs);
let prefixes = optimized_prefs.iter()
.map(|wp| wp.prefix.clone())
.collect();
Ok(prefixes)
}
#[derive(Clone)]
struct TrainedBodyCompressor<'a, U: UnsignedLike, GcdOp: GcdOperator<U>> {
pub table: &'a CompressionTable<U>,
op: PhantomData<GcdOp>,
}
fn trained_compress_body<U: UnsignedLike>(
table: &CompressionTable<U>,
use_gcd: bool,
unsigneds: &[U],
writer: &mut BitWriter,
) -> QCompressResult<()> {
if use_gcd {
TrainedBodyCompressor::<U, GeneralGcdOp> { table, op: PhantomData }
.compress_data_page(unsigneds, writer)
} else {
TrainedBodyCompressor::<U, TrivialGcdOp> { table, op: PhantomData }
.compress_data_page(unsigneds, writer)
}
}
impl<'a, U, GcdOp> TrainedBodyCompressor<'a, U, GcdOp> where U: UnsignedLike, GcdOp: GcdOperator<U> {
fn compress_data_page(
&self,
unsigneds: &[U],
writer: &mut BitWriter,
) -> QCompressResult<()> {
let mut i = 0;
while i < unsigneds.len() {
let unsigned = unsigneds[i];
let p = self.table.search(unsigned)?;
writer.write_usize(p.code, p.code_len);
match p.run_len_jumpstart {
None => {
Self::compress_offset_bits_w_prefix(unsigned, p, writer);
i += 1;
}
Some(jumpstart) => {
let mut reps = 1;
for &other in unsigneds.iter().skip(i + 1) {
if p.contains(other) {
reps += 1;
} else {
break;
}
}
writer.write_varint(reps - 1, jumpstart);
for &unsigned in unsigneds.iter().skip(i).take(reps) {
Self::compress_offset_bits_w_prefix(unsigned, p, writer);
}
i += reps;
}
}
}
writer.finish_byte();
Ok(())
}
fn compress_offset_bits_w_prefix(
unsigned: U,
p: &PrefixCompressionInfo<U>,
writer: &mut BitWriter,
) {
let off = GcdOp::get_offset(unsigned - p.lower, p.gcd);
writer.write_diff(off, p.k);
if off < p.only_k_bits_lower || off > p.only_k_bits_upper {
writer.write_one((off & (U::ONE << p.k)) > U::ZERO);
}
}
}
#[derive(Clone, Debug)]
pub struct MidChunkInfo<T: NumberLike> {
unsigneds: Vec<T::Unsigned>,
use_gcd: bool,
table: CompressionTable<T::Unsigned>,
delta_momentss: Vec<DeltaMoments<T::Signed>>,
page_sizes: Vec<usize>,
idx: usize,
page_idx: usize,
}
impl<T: NumberLike> MidChunkInfo<T> {
fn data_page_n(&self) -> usize {
self.page_sizes[self.page_idx]
}
fn data_page_moments(&self) -> &DeltaMoments<T::Signed> {
&self.delta_momentss[self.page_idx]
}
fn n_pages(&self) -> usize {
self.page_sizes.len()
}
}
#[derive(Clone, Debug)]
pub enum State<T: NumberLike> {
PreHeader,
StartOfChunk,
MidChunk(MidChunkInfo<T>),
Terminated,
}
impl<T: NumberLike> Default for State<T> {
fn default() -> Self {
State::PreHeader
}
}
impl<T: NumberLike> State<T> {
pub fn wrong_step_err(&self, description: &str) -> QCompressError {
let step_str = match self {
State::PreHeader => "has not yet written header",
State::StartOfChunk => "is at the start of a chunk",
State::MidChunk(_) => "is mid-chunk",
State::Terminated => "has already written the footer",
};
QCompressError::invalid_argument(format!(
"attempted to write {} when compressor {}",
description,
step_str,
))
}
}
#[derive(Clone, Debug)]
pub struct BaseCompressor<T> where T: NumberLike {
internal_config: InternalCompressorConfig,
pub flags: Flags,
pub writer: BitWriter,
pub state: State<T>,
}
impl<T> BaseCompressor<T> where T: NumberLike {
pub fn from_config(config: CompressorConfig, use_wrapped_mode: bool) -> Self {
Self {
internal_config: InternalCompressorConfig::from(&config),
flags: Flags::from_config(&config, use_wrapped_mode),
writer: BitWriter::default(),
state: State::default(),
}
}
pub fn header(&mut self) -> QCompressResult<()> {
if !matches!(self.state, State::PreHeader) {
return Err(self.state.wrong_step_err("header"));
}
self.writer.write_aligned_bytes(&MAGIC_HEADER)?;
self.writer.write_aligned_byte(T::HEADER_BYTE)?;
self.flags.write(&mut self.writer)?;
self.state = State::StartOfChunk;
Ok(())
}
pub fn chunk_metadata_internal(
&mut self,
nums: &[T],
spec: &ChunkSpec,
) -> QCompressResult<ChunkMetadata<T>> {
if !matches!(self.state, State::StartOfChunk) {
return Err(self.state.wrong_step_err("chunk metadata"));
}
if nums.is_empty() {
return Err(QCompressError::invalid_argument(
"cannot compress empty chunk"
));
}
let n = nums.len();
let page_sizes = spec.page_sizes(nums.len())?;
let n_pages = page_sizes.len();
if !self.flags.use_wrapped_mode {
self.writer.write_aligned_byte(MAGIC_CHUNK_BYTE)?;
}
let order = self.flags.delta_encoding_order;
let (
unsigneds,
prefix_meta,
use_gcd,
table,
delta_momentss,
) = if order == 0 {
let unsigneds = nums.iter()
.map(|x| x.to_unsigned())
.collect::<Vec<_>>();
let prefixes = train_prefixes(
unsigneds.clone(),
&self.internal_config,
&self.flags,
n,
)?;
let use_gcd = gcd_utils::use_gcd_arithmetic(&prefixes);
let table = CompressionTable::from(prefixes.as_slice());
let prefix_metadata = PrefixMetadata::Simple {
prefixes,
};
(unsigneds, prefix_metadata, use_gcd, table, vec![DeltaMoments::default(); n_pages])
} else {
let page_idxs = cumulative_sum(&page_sizes);
let (deltas, momentss) = delta_encoding::nth_order_deltas(
nums,
order,
&page_idxs,
);
let unsigneds = deltas.iter()
.map(|x| x.to_unsigned())
.collect::<Vec<_>>();
let prefixes = train_prefixes(
unsigneds.clone(),
&self.internal_config,
&self.flags,
n,
)?;
let use_gcd = gcd_utils::use_gcd_arithmetic(&prefixes);
let table = CompressionTable::from(prefixes.as_slice());
let prefix_metadata = PrefixMetadata::Delta {
prefixes,
};
(unsigneds, prefix_metadata, use_gcd, table, momentss)
};
let chunk_meta_moments = delta_momentss[0].clone();
let meta = ChunkMetadata::new(n, prefix_meta, chunk_meta_moments);
meta.write_to(&mut self.writer, &self.flags);
self.state = State::MidChunk(MidChunkInfo {
unsigneds,
use_gcd,
table,
delta_momentss,
page_sizes,
idx: 0,
page_idx: 0,
});
Ok(meta)
}
pub fn data_page_internal(&mut self) -> QCompressResult<()> {
let has_pages_remaining = {
let info = match &mut self.state {
State::MidChunk(info) => Ok(info),
other => Err(other.wrong_step_err("data page")),
}?;
let start = info.idx;
let data_page_n = info.data_page_n();
let end = start + data_page_n.saturating_sub(self.flags.delta_encoding_order);
if self.flags.use_wrapped_mode {
info.data_page_moments().write_to(&mut self.writer);
}
let slice = if end > start { &info.unsigneds[start..end] } else { &[] };
trained_compress_body(
&info.table,
info.use_gcd,
slice,
&mut self.writer,
)?;
info.idx += data_page_n;
info.page_idx += 1;
info.page_idx < info.n_pages()
};
if !has_pages_remaining {
self.state = State::StartOfChunk;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::choose_max_n_prefixes;
#[test]
fn test_choose_max_n_prefixes() {
assert_eq!(choose_max_n_prefixes(0, 100), 1);
assert_eq!(choose_max_n_prefixes(12, 100), 100);
assert_eq!(choose_max_n_prefixes(12, 1 << 10), 1 << 10);
assert_eq!(choose_max_n_prefixes(8, 1 << 10), 1 << 6);
assert_eq!(choose_max_n_prefixes(1, 1 << 10), 1);
assert_eq!(choose_max_n_prefixes(12, (1 << 12) - 1), 1 << 10);
assert_eq!(choose_max_n_prefixes(12, 1 << 12), 1 << 11);
assert_eq!(choose_max_n_prefixes(12, (1 << 14) - 1), 1 << 11);
assert_eq!(choose_max_n_prefixes(12, 1 << 14), 1 << 12);
assert_eq!(choose_max_n_prefixes(12, 1 << 20), 1 << 12);
}
}