pancake_db_core/compression/
traits.rs

1use pancake_db_idl::dml::FieldValue;
2use q_compress::{BitReader, BitWords, Decompressor};
3
4use crate::errors::CoreResult;
5use crate::primitives::Primitive;
6use crate::rep_levels;
7use crate::rep_levels::{RepLevelsAndAtoms, RepLevelsAndBytes};
8use crate::rep_levels::AtomNester;
9
10pub trait Codec: Send + Sync {
11  type P: Primitive;
12
13  fn compress_atoms(&self, atoms: &[<<Self as Codec>::P as Primitive>::A]) -> CoreResult<Vec<u8>>;
14  fn decompress_atoms(&self, bytes: &[u8]) -> CoreResult<Vec<<<Self as Codec>::P as Primitive>::A>>;
15}
16
17pub trait ValueCodec: Send + Sync {
18  fn compress(&self, values: &[FieldValue], nested_list_depth: u8) -> CoreResult<Vec<u8>>;
19
20  fn decompress_rep_levels(&self, bytes: &[u8]) -> CoreResult<RepLevelsAndBytes>;
21  fn decompress(&self, bytes: &[u8], nested_list_depth: u8) -> CoreResult<Vec<FieldValue>>;
22}
23
24impl<P: Primitive> ValueCodec for Box<dyn Codec<P=P>> {
25  fn compress(&self, field_values: &[FieldValue], nested_list_depth: u8) -> CoreResult<Vec<u8>> {
26    let RepLevelsAndAtoms { levels, atoms } = rep_levels::extract_levels_and_atoms::<P>(
27      field_values,
28      nested_list_depth,
29    )?;
30    let mut res = rep_levels::compress_rep_levels(levels)?;
31    res.extend(self.compress_atoms(&atoms)?);
32    Ok(res)
33  }
34
35  fn decompress_rep_levels(&self, bytes: &[u8]) -> CoreResult<RepLevelsAndBytes> {
36    let decompressor = Decompressor::<u32>::default();
37    let words = BitWords::from(bytes);
38    let mut reader = BitReader::from(&words);
39    let flags = decompressor.header(&mut reader)?;
40    let mut rep_levels = Vec::new();
41    while let Some(chunk) = decompressor.chunk(&mut reader, &flags)? {
42      rep_levels.extend(
43        chunk.nums
44          .iter()
45          .map(|&l| l as u8)
46      );
47    }
48
49    let byte_idx = reader.aligned_byte_idx()?;
50    Ok(RepLevelsAndBytes {
51      remaining_bytes: reader.read_aligned_bytes(bytes.len() - byte_idx)?.to_vec(),
52      levels: rep_levels,
53    })
54  }
55
56  fn decompress(&self, bytes: &[u8], nested_list_depth: u8) -> CoreResult<Vec<FieldValue>> {
57    let RepLevelsAndBytes { remaining_bytes, levels } = self.decompress_rep_levels(bytes)?;
58    let atoms: Vec<P::A> = self.decompress_atoms(&remaining_bytes)?;
59    let mut nester = AtomNester::<P>::from_levels_and_values(
60      levels,
61      atoms,
62      nested_list_depth,
63    );
64    nester.nested_field_values()
65  }
66}