use std::io::Write;
use std::path::Path;
use super::OffsetsWriter;
use super::bvcomp::{CompStats, Compressor};
use crate::prelude::*;
use crate::utils::RaggedArray;
use common_traits::Sequence;
use lender::prelude::*;
#[derive(Default, Clone)]
struct ReferenceTableEntry {
saved_cost: f32,
chosen: bool,
}
#[derive(Debug)]
pub struct BvCompZ<E, W: Write> {
backrefs: RaggedArray<usize>,
references: Vec<usize>,
reference_costs: Matrix<u64>,
saved_costs: Vec<f32>,
chunk_size: usize,
encoder: E,
offsets_writer: OffsetsWriter<W>,
compressors: Vec<Compressor>,
compression_window: usize,
max_ref_count: usize,
min_interval_length: usize,
curr_node: usize,
start_chunk_node: usize,
stats: CompStats,
}
impl BvCompZ<(), std::io::Sink> {
pub fn with_basename(basename: impl AsRef<Path>) -> BvCompConfig {
BvCompConfig::new(basename)
.with_bvgraphz()
.with_comp_flags(CompFlags {
compression_window: 16,
..Default::default()
})
}
}
impl<E: EncodeAndEstimate, W: Write> BvCompZ<E, W> {
pub const NO_INTERVALS: usize = Compressor::NO_INTERVALS;
pub fn new(
encoder: E,
offsets_writer: OffsetsWriter<W>,
compression_window: usize,
chunk_size: usize,
max_ref_count: usize,
min_interval_length: usize,
start_node: usize,
) -> Self {
BvCompZ {
backrefs: RaggedArray::new(),
reference_costs: Matrix::new(chunk_size + 1, compression_window + 1),
references: Vec::with_capacity(chunk_size + 1),
saved_costs: Vec::with_capacity(chunk_size + 1),
chunk_size,
encoder,
offsets_writer,
min_interval_length,
compression_window,
max_ref_count,
start_chunk_node: start_node,
curr_node: start_node,
compressors: (0..compression_window + 1)
.map(|_| Compressor::new())
.collect(),
stats: CompStats::default(),
}
}
pub fn push<I: IntoIterator<Item = usize>>(&mut self, succ_iter: I) -> anyhow::Result<()> {
self.backrefs.push(succ_iter);
let offset_in_chunk = self.curr_node - self.start_chunk_node;
let curr_list = &self.backrefs[offset_in_chunk];
self.stats.num_nodes += 1;
self.stats.num_arcs += curr_list.len() as u64;
let compressor = &mut self.compressors[0];
compressor.compress(curr_list, None, self.min_interval_length)?;
if self.compression_window == 0 {
let written_bits = compressor.write(
&mut self.encoder,
self.curr_node,
None,
self.min_interval_length,
)?;
self.curr_node += 1;
self.stats.offsets_written_bits += self.offsets_writer.push(written_bits)? as u64;
self.stats.written_bits += written_bits;
return Ok(());
}
let mut ref_delta = 0;
let cost = {
let mut estimator = self.encoder.estimator();
compressor.write(
&mut estimator,
self.curr_node,
Some(0),
self.min_interval_length,
)?
};
let mut saved_cost = 0;
self.reference_costs[(offset_in_chunk, 0)] = cost;
let mut min_bits = cost;
let deltas = 1 + self.compression_window.min(offset_in_chunk);
for delta in 1..deltas {
let ref_list = &self.backrefs[offset_in_chunk - delta];
if ref_list.is_empty() {
continue;
}
let compressor = &mut self.compressors[delta];
compressor.compress(curr_list, Some(ref_list), self.min_interval_length)?;
let bits = {
let mut estimator = self.encoder.estimator();
compressor.write(
&mut estimator,
self.curr_node,
Some(delta),
self.min_interval_length,
)?
};
self.reference_costs[(offset_in_chunk, delta)] = bits;
if bits < min_bits {
saved_cost = cost - bits;
min_bits = bits;
ref_delta = delta;
}
}
assert_eq!(
self.references.len(),
self.curr_node - self.start_chunk_node
);
self.saved_costs.push(saved_cost as f32);
self.references.push(ref_delta);
self.curr_node += 1;
if self.references.len() >= self.chunk_size {
self.comp_refs()?;
}
Ok(())
}
pub fn flush(mut self) -> anyhow::Result<CompStats> {
if self.compression_window > 0 {
self.comp_refs()?;
}
self.encoder.flush()?;
self.offsets_writer.flush()?;
Ok(self.stats)
}
pub fn extend<L>(&mut self, iter_nodes: L) -> anyhow::Result<()>
where
L: IntoLender,
L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
{
for_! ( (_, succ) in iter_nodes {
self.push(succ.into_iter())?;
});
Ok(())
}
fn comp_refs(&mut self) -> anyhow::Result<()> {
if self.max_ref_count != usize::MAX {
let nodes_in_chunk = self.references.len();
self.update_references_for_max_length();
assert_eq!(nodes_in_chunk, self.curr_node - self.start_chunk_node);
self.find_additional_references_greedily();
}
self.write_and_clear_current_chunk()?;
Ok(())
}
fn update_references_for_max_length(&mut self) {
let n = self.references.len();
debug_assert!(self.saved_costs.len() == n);
for i in 0..n {
debug_assert!(self.references[i] <= i);
debug_assert!(self.saved_costs[i] >= 0.0);
if self.references[i] == 0 {
debug_assert!(self.saved_costs[i] == 0.0);
}
}
let mut out_edges: Vec<Vec<usize>> = vec![Vec::new(); n];
for (i, reference) in self.references.iter().enumerate() {
if reference != 0 {
out_edges[i - reference].push(i);
}
}
let max_available_references = self.max_ref_count.min(n);
let mut dyn_table: Matrix<ReferenceTableEntry> =
Matrix::new(n, max_available_references + 1);
for i in (0..n).rev() {
let mut child_sum_full_chain = 0.0;
for child in out_edges[i].iter() {
child_sum_full_chain += dyn_table[(child, max_available_references)].saved_cost;
}
dyn_table[(i, 0)] = ReferenceTableEntry {
saved_cost: child_sum_full_chain,
chosen: false,
};
for links_to_use in 1..=max_available_references {
let mut child_sum = self.saved_costs[i];
for child in out_edges[i].iter() {
child_sum += dyn_table[(child, links_to_use - 1)].saved_cost;
}
dyn_table[(i, links_to_use)] = if child_sum > child_sum_full_chain {
ReferenceTableEntry {
saved_cost: child_sum,
chosen: true,
}
} else {
ReferenceTableEntry {
saved_cost: child_sum_full_chain,
chosen: false,
}
};
}
}
let mut available_length = vec![max_available_references; n];
for i in 0..self.references.len() {
if dyn_table[(i, available_length[i])].chosen {
for child in out_edges[i].iter() {
available_length[child] = available_length[i] - 1;
}
} else {
self.references[i] = 0;
}
}
}
fn find_additional_references_greedily(&mut self) {
let n = self.references.len();
let mut chain_length = vec![0usize; self.chunk_size];
for i in 0..n {
if self.references[i] != 0 {
let parent = i - self.references[i];
chain_length[i] = chain_length[parent] + 1;
}
}
let mut forward_chain_length = vec![0usize; self.chunk_size];
for i in (0..n).rev() {
if self.references[i] != 0 {
let parent = i - self.references[i];
forward_chain_length[parent] =
forward_chain_length[parent].max(forward_chain_length[i] + 1);
}
}
for relative_index_in_chunk in 0..n {
if self.references[relative_index_in_chunk] != 0 {
let parent = relative_index_in_chunk - self.references[relative_index_in_chunk];
chain_length[relative_index_in_chunk] = chain_length[parent] + 1;
}
let mut min_bits = self.reference_costs[(relative_index_in_chunk, 0)];
let deltas = 1 + self.compression_window.min(relative_index_in_chunk);
for delta in 1..deltas {
if chain_length[relative_index_in_chunk - delta]
+ forward_chain_length[relative_index_in_chunk]
+ 1
> self.max_ref_count
{
continue;
}
let reference_index = relative_index_in_chunk - delta;
let ref_list = &self.backrefs[reference_index];
if ref_list.is_empty() {
continue;
}
let bits = self.reference_costs[(relative_index_in_chunk, delta)];
if bits < min_bits {
min_bits = bits;
self.references[relative_index_in_chunk] = delta;
}
}
if self.references[relative_index_in_chunk] != 0 {
let parent = relative_index_in_chunk - self.references[relative_index_in_chunk];
chain_length[relative_index_in_chunk] = chain_length[parent] + 1;
}
}
}
fn write_and_clear_current_chunk(&mut self) -> anyhow::Result<()> {
let n = self.references.len();
let compressor = self
.compressors
.first_mut()
.expect("at least one compressor available");
for i in 0..n {
let node_index = self.curr_node - n + i;
let curr_list = &self.backrefs[node_index - self.start_chunk_node];
let reference = self.references[i];
let ref_list = if reference == 0 {
None
} else {
let reference_index = node_index - reference - self.start_chunk_node;
Some(&self.backrefs[reference_index]).filter(|list| !list.is_empty())
};
compressor.clear();
compressor.compress(curr_list, ref_list, self.min_interval_length)?;
let bits = compressor.write(
&mut self.encoder,
node_index,
Some(reference),
self.min_interval_length,
)?;
self.stats.written_bits += bits;
self.stats.offsets_written_bits += self.offsets_writer.push(bits)? as u64;
}
self.start_chunk_node = self.curr_node;
self.references.clear();
self.saved_costs.clear();
if self.backrefs.num_values() < self.backrefs.values_capacity() / 4 {
self.backrefs
.shrink_values_to(self.backrefs.values_capacity() / 2);
}
self.backrefs.clear();
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use dsi_bitstream::prelude::*;
use tempfile::Builder;
#[test]
fn test_writer_window_zero() -> anyhow::Result<()> {
test_compression(0, 0)?;
test_compression(0, 1)?;
test_compression(0, 2)?;
Ok(())
}
#[test]
fn test_writer_window_one() -> anyhow::Result<()> {
test_compression(1, 0)?;
test_compression(1, 1)?;
test_compression(1, 2)?;
Ok(())
}
#[test]
fn test_writer_window_two() -> anyhow::Result<()> {
test_compression(2, 0)?;
test_compression(2, 1)?;
test_compression(2, 2)?;
Ok(())
}
#[test]
fn test_writer_cnr() -> anyhow::Result<()> {
let cnr_2000 = BvGraphSeq::with_basename("../data/cnr-2000")
.endianness::<BE>()
.load()?;
let tmp_dir = Builder::new().prefix("bvcomp_test").tempdir()?;
let basename = tmp_dir.path().join("cnr-2000");
BvCompZ::with_basename(&basename).comp_graph::<BE>(&cnr_2000)?;
let seq_graph = BvGraphSeq::with_basename(&basename).load()?;
labels::eq_sorted(&cnr_2000, &seq_graph)?;
BvCompZ::with_basename(&basename).par_comp_graph::<BE>(&cnr_2000)?;
let seq_graph = BvGraphSeq::with_basename(&basename).load()?;
labels::eq_sorted(&cnr_2000, &seq_graph)?;
Ok(())
}
fn test_compression(
compression_window: usize,
min_interval_length: usize,
) -> anyhow::Result<()> {
let cnr_2000 = BvGraphSeq::with_basename("../data/cnr-2000").load()?;
let tmp_dir = Builder::new().prefix("bvcomp_test").tempdir()?;
let basename = tmp_dir.path().join("cnr-2000");
BvCompZ::with_basename(&basename)
.with_comp_flags(CompFlags {
compression_window,
min_interval_length,
..Default::default()
})
.comp_graph::<BE>(&cnr_2000)?;
let seq_graph = BvGraphSeq::with_basename(&basename).load()?;
labels::eq_sorted(&cnr_2000, &seq_graph)?;
Ok(())
}
}