use super::OffsetsWriter;
use crate::prelude::*;
use core::cmp::Ordering;
use dsi_bitstream::codes::ToNat;
use lender::prelude::*;
use std::{io::Write, path::Path};
#[derive(Debug, Clone, Copy, Default)]
pub struct CompStats {
pub num_nodes: usize,
pub num_arcs: u64,
pub written_bits: u64,
pub offsets_written_bits: u64,
}
#[derive(Debug)]
pub struct BvComp<E, W: Write> {
backrefs: CircularBuffer<Vec<usize>>,
ref_counts: CircularBuffer<usize>,
encoder: E,
pub offsets_writer: OffsetsWriter<W>,
compressors: Vec<Compressor>,
compression_window: usize,
max_ref_count: usize,
min_interval_length: usize,
curr_node: usize,
start_node: usize,
stats: CompStats,
}
impl BvComp<(), std::io::Sink> {
pub fn with_basename(basename: impl AsRef<Path>) -> BvCompConfig {
BvCompConfig::new(basename)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Compressor {
outdegree: usize,
blocks: Vec<usize>,
extra_nodes: Vec<usize>,
left_interval: Vec<usize>,
len_interval: Vec<usize>,
residuals: Vec<usize>,
}
impl Compressor {
pub(crate) const NO_INTERVALS: usize = 0;
pub(crate) fn new() -> Self {
Compressor {
outdegree: 0,
blocks: Vec::with_capacity(1024),
extra_nodes: Vec::with_capacity(1024),
left_interval: Vec::with_capacity(1024),
len_interval: Vec::with_capacity(1024),
residuals: Vec::with_capacity(1024),
}
}
pub(crate) fn write<E: Encode>(
&self,
writer: &mut E,
curr_node: usize,
reference_offset: Option<usize>,
min_interval_length: usize,
) -> Result<u64, E::Error> {
let mut written_bits: u64 = 0;
written_bits += writer.start_node(curr_node)? as u64;
written_bits += writer.write_outdegree(self.outdegree as u64)? as u64;
if self.outdegree != 0 {
if let Some(reference_offset) = reference_offset {
written_bits += writer.write_reference_offset(reference_offset as u64)? as u64;
if reference_offset != 0 {
written_bits += writer.write_block_count(self.blocks.len() as _)? as u64;
if !self.blocks.is_empty() {
for i in 0..self.blocks.len() {
written_bits += writer.write_block((self.blocks[i] - 1) as u64)? as u64;
}
}
}
}
}
if !self.extra_nodes.is_empty() && min_interval_length != Self::NO_INTERVALS {
written_bits += writer.write_interval_count(self.left_interval.len() as _)? as u64;
if !self.left_interval.is_empty() {
written_bits += writer.write_interval_start(
(self.left_interval[0] as i64 - curr_node as i64).to_nat(),
)? as u64;
written_bits += writer
.write_interval_len((self.len_interval[0] - min_interval_length) as u64)?
as u64;
let mut prev = self.left_interval[0] + self.len_interval[0];
for i in 1..self.left_interval.len() {
written_bits += writer
.write_interval_start((self.left_interval[i] - prev - 1) as u64)?
as u64;
written_bits += writer
.write_interval_len((self.len_interval[i] - min_interval_length) as u64)?
as u64;
prev = self.left_interval[i] + self.len_interval[i];
}
}
}
writer.num_of_residuals(self.residuals.len());
if !self.residuals.is_empty() {
written_bits += writer
.write_first_residual((self.residuals[0] as i64 - curr_node as i64).to_nat())?
as u64;
for i in 1..self.residuals.len() {
written_bits += writer
.write_residual((self.residuals[i] - self.residuals[i - 1] - 1) as u64)?
as u64;
}
}
written_bits += writer.end_node(curr_node)? as u64;
Ok(written_bits)
}
#[inline(always)]
pub(crate) fn clear(&mut self) {
self.outdegree = 0;
self.blocks.clear();
self.extra_nodes.clear();
self.left_interval.clear();
self.len_interval.clear();
self.residuals.clear();
}
pub(crate) fn compress(
&mut self,
curr_list: &[usize],
ref_list: Option<&[usize]>,
min_interval_length: usize,
) -> anyhow::Result<()> {
self.clear();
self.outdegree = curr_list.len();
if self.outdegree != 0 {
if let Some(ref_list) = ref_list {
self.diff_comp(curr_list, ref_list);
} else {
self.extra_nodes.extend(curr_list)
}
if !self.extra_nodes.is_empty() {
if min_interval_length != Self::NO_INTERVALS {
self.intervalize(min_interval_length);
} else {
self.residuals.extend(&self.extra_nodes);
}
}
}
debug_assert_eq!(self.left_interval.len(), self.len_interval.len());
Ok(())
}
fn intervalize(&mut self, min_interval_length: usize) {
let vl = self.extra_nodes.len();
let mut i = 0;
while i < vl {
let mut j = 0;
if i < vl - 1 && self.extra_nodes[i] + 1 == self.extra_nodes[i + 1] {
j += 1;
while i + j < vl - 1 && self.extra_nodes[i + j] + 1 == self.extra_nodes[i + j + 1] {
j += 1;
}
j += 1;
if j >= min_interval_length {
self.left_interval.push(self.extra_nodes[i]);
self.len_interval.push(j);
i += j - 1;
}
}
if j < min_interval_length {
self.residuals.push(self.extra_nodes[i]);
}
i += 1;
}
}
fn diff_comp(&mut self, curr_list: &[usize], ref_list: &[usize]) {
let mut j = 0;
let mut k = 0;
let mut curr_block_len = 0;
let mut copying = true;
while j < curr_list.len() && k < ref_list.len() {
if copying {
match curr_list[j].cmp(&ref_list[k]) {
Ordering::Greater => {
self.blocks.push(curr_block_len);
copying = false;
curr_block_len = 0;
}
Ordering::Less => {
self.extra_nodes.push(curr_list[j]);
j += 1;
}
Ordering::Equal => {
j += 1;
k += 1;
curr_block_len += 1;
}
}
} else {
match curr_list[j].cmp(&ref_list[k]) {
Ordering::Greater => {
k += 1;
curr_block_len += 1;
}
Ordering::Less => {
self.extra_nodes.push(curr_list[j]);
j += 1;
}
Ordering::Equal => {
self.blocks.push(curr_block_len);
copying = true;
curr_block_len = 0;
}
}
}
}
if copying && k < ref_list.len() {
self.blocks.push(curr_block_len);
}
while j < curr_list.len() {
self.extra_nodes.push(curr_list[j]);
j += 1;
}
if !self.blocks.is_empty() {
self.blocks[0] += 1;
}
}
}
impl<E: EncodeAndEstimate, W: Write> BvComp<E, W> {
pub const NO_INTERVALS: usize = Compressor::NO_INTERVALS;
pub fn new(
encoder: E,
offsets_writer: OffsetsWriter<W>,
compression_window: usize,
max_ref_count: usize,
min_interval_length: usize,
start_node: usize,
) -> Self {
BvComp {
backrefs: CircularBuffer::new(compression_window + 1),
ref_counts: CircularBuffer::new(compression_window + 1),
encoder,
offsets_writer,
min_interval_length,
compression_window,
max_ref_count,
start_node,
curr_node: start_node,
compressors: (0..compression_window + 1)
.map(|_| Compressor::new())
.collect(),
stats: CompStats::default(),
}
}
pub fn push<I: IntoIterator<Item = usize>>(&mut self, succ_iter: I) -> anyhow::Result<()> {
{
let succ_vec = &mut self.backrefs[self.curr_node];
succ_vec.clear();
succ_vec.extend(succ_iter);
if succ_vec.len() < succ_vec.capacity() / 4 {
succ_vec.shrink_to(succ_vec.capacity() / 2);
}
}
let curr_list = &self.backrefs[self.curr_node];
self.stats.num_nodes += 1;
self.stats.num_arcs += curr_list.len() as u64;
let compressor = &mut self.compressors[0];
compressor.compress(curr_list, None, self.min_interval_length)?;
if self.compression_window == 0 {
let written_bits = compressor.write(
&mut self.encoder,
self.curr_node,
None,
self.min_interval_length,
)?;
self.curr_node += 1;
self.stats.offsets_written_bits += self.offsets_writer.push(written_bits)? as u64;
self.stats.written_bits += written_bits;
return Ok(());
}
let mut ref_delta = 0;
let mut min_bits = {
let mut estimator = self.encoder.estimator();
compressor.write(
&mut estimator,
self.curr_node,
Some(0),
self.min_interval_length,
)?
};
let mut ref_count = 0;
let deltas = 1 + self
.compression_window
.min(self.curr_node - self.start_node);
for delta in 1..deltas {
let ref_node = self.curr_node - delta;
let count = self.ref_counts[ref_node];
if count >= self.max_ref_count {
continue;
}
let ref_list = &self.backrefs[ref_node];
if ref_list.is_empty() {
continue;
}
let compressor = &mut self.compressors[delta];
compressor.compress(curr_list, Some(ref_list), self.min_interval_length)?;
let bits = {
let mut estimator = self.encoder.estimator();
compressor.write(
&mut estimator,
self.curr_node,
Some(delta),
self.min_interval_length,
)?
};
if bits < min_bits {
min_bits = bits;
ref_delta = delta;
ref_count = count + 1;
}
}
let compressor = &mut self.compressors[ref_delta];
let written_bits = compressor.write(
&mut self.encoder,
self.curr_node,
Some(ref_delta),
self.min_interval_length,
)?;
self.ref_counts[self.curr_node] = ref_count;
self.curr_node += 1;
self.stats.offsets_written_bits += self.offsets_writer.push(written_bits)? as u64;
self.stats.written_bits += written_bits;
Ok(())
}
pub fn flush(mut self) -> anyhow::Result<CompStats> {
self.encoder.flush()?;
self.offsets_writer.flush()?;
Ok(self.stats)
}
pub fn extend<L>(&mut self, iter_nodes: L) -> anyhow::Result<()>
where
L: IntoLender,
L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
{
for_! ( (_, succ) in iter_nodes {
self.push(succ.into_iter())?;
});
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use dsi_bitstream::prelude::*;
use tempfile::Builder;
#[test]
fn test_compressor_no_ref() -> anyhow::Result<()> {
let mut compressor = Compressor::new();
compressor.compress(&[0, 1, 2, 5, 7, 8, 9], None, 2)?;
assert_eq!(
compressor,
Compressor {
outdegree: 7,
blocks: vec![],
extra_nodes: vec![0, 1, 2, 5, 7, 8, 9],
left_interval: vec![0, 7],
len_interval: vec![3, 3],
residuals: vec![5],
}
);
Ok(())
}
#[test]
fn test_compressor1() -> anyhow::Result<()> {
let mut compressor = Compressor::new();
compressor.compress(&[0, 1, 2, 5, 7, 8, 9], Some(&[0, 1, 2]), 2)?;
assert_eq!(
compressor,
Compressor {
outdegree: 7,
blocks: vec![],
extra_nodes: vec![5, 7, 8, 9],
left_interval: vec![7],
len_interval: vec![3],
residuals: vec![5],
}
);
Ok(())
}
#[test]
fn test_compressor2() -> anyhow::Result<()> {
let mut compressor = Compressor::new();
compressor.compress(&[0, 1, 2, 5, 7, 8, 9], Some(&[0, 1, 2, 100]), 2)?;
assert_eq!(
compressor,
Compressor {
outdegree: 7,
blocks: vec![4],
extra_nodes: vec![5, 7, 8, 9],
left_interval: vec![7],
len_interval: vec![3],
residuals: vec![5],
}
);
Ok(())
}
#[test]
fn test_compressor3() -> anyhow::Result<()> {
let mut compressor = Compressor::new();
compressor.compress(
&[0, 1, 2, 5, 7, 8, 9, 100],
Some(&[0, 1, 2, 4, 7, 8, 9, 101]),
2,
)?;
assert_eq!(
compressor,
Compressor {
outdegree: 8,
blocks: vec![4, 1, 3],
extra_nodes: vec![5, 100],
left_interval: vec![],
len_interval: vec![],
residuals: vec![5, 100],
}
);
Ok(())
}
#[test]
fn test_writer_window_zero() -> anyhow::Result<()> {
test_compression(0, 0)?;
test_compression(0, 1)?;
test_compression(0, 2)?;
Ok(())
}
#[test]
fn test_writer_window_one() -> anyhow::Result<()> {
test_compression(1, 0)?;
test_compression(1, 1)?;
test_compression(1, 2)?;
Ok(())
}
#[test]
fn test_writer_window_two() -> anyhow::Result<()> {
test_compression(2, 0)?;
test_compression(2, 1)?;
test_compression(2, 2)?;
Ok(())
}
#[test]
fn test_writer_cnr() -> anyhow::Result<()> {
let cnr_2000 = BvGraphSeq::with_basename("../data/cnr-2000")
.endianness::<BE>()
.load()?;
let tmp_dir = Builder::new().prefix("bvcomp_test").tempdir()?;
let basename = tmp_dir.path().join("cnr-2000");
BvComp::with_basename(&basename).comp_graph::<BE>(&cnr_2000)?;
let seq_graph = BvGraphSeq::with_basename(&basename).load()?;
labels::eq_sorted(&cnr_2000, &seq_graph)?;
BvComp::with_basename(&basename).par_comp_graph::<BE>(&cnr_2000)?;
let seq_graph = BvGraphSeq::with_basename(&basename).load()?;
labels::eq_sorted(&cnr_2000, &seq_graph)?;
Ok(())
}
fn test_compression(
compression_window: usize,
min_interval_length: usize,
) -> anyhow::Result<()> {
let cnr_2000 = BvGraphSeq::with_basename("../data/cnr-2000").load()?;
let tmp_dir = Builder::new().prefix("bvcomp_test").tempdir()?;
let basename = tmp_dir.path().join("cnr-2000");
BvComp::with_basename(&basename)
.with_comp_flags(CompFlags {
compression_window,
min_interval_length,
..Default::default()
})
.comp_graph::<BE>(&cnr_2000)?;
let seq_graph = BvGraphSeq::with_basename(&basename).load()?;
labels::eq_sorted(&cnr_2000, &seq_graph)?;
Ok(())
}
}