pco/wrapped/
page_decompressor.rs

1use std::cmp::min;
2use std::fmt::Debug;
3use std::marker::PhantomData;
4
5use better_io::BetterBufRead;
6
7use crate::bit_reader;
8use crate::bit_reader::BitReaderBuilder;
9use crate::constants::{FULL_BATCH_N, PAGE_PADDING};
10use crate::data_types::Number;
11use crate::errors::{PcoError, PcoResult};
12use crate::latent_page_decompressor::DynLatentPageDecompressor;
13use crate::macros::match_latent_enum;
14use crate::metadata::page::PageMeta;
15use crate::metadata::per_latent_var::{PerLatentVar, PerLatentVarBuilder};
16use crate::metadata::{ChunkMeta, DeltaEncoding, DynBins, DynLatents, Mode};
17use crate::progress::Progress;
18
19const PERFORMANT_BUF_READ_CAPACITY: usize = 8192;
20
21#[derive(Debug)]
22struct LatentScratch {
23  is_constant: bool,
24  dst: DynLatents,
25}
26
27struct PageDecompressorInner<R: BetterBufRead> {
28  // immutable
29  n: usize,
30  mode: Mode,
31  delta_encoding: DeltaEncoding,
32
33  // mutable
34  reader_builder: BitReaderBuilder<R>,
35  n_processed: usize,
36  latent_decompressors: PerLatentVar<DynLatentPageDecompressor>,
37  delta_scratch: Option<LatentScratch>,
38  secondary_scratch: Option<LatentScratch>,
39}
40
41/// Holds metadata about a page and supports decompression.
42pub struct PageDecompressor<T: Number, R: BetterBufRead> {
43  inner: PageDecompressorInner<R>,
44  phantom: PhantomData<T>,
45}
46
47fn convert_from_latents_to_numbers<T: Number>(dst: &mut [T]) {
48  // we wrote the joined latents to dst, so we can convert them in place
49  for l_and_dst in dst {
50    *l_and_dst = T::from_latent_ordered(l_and_dst.transmute_to_latent());
51  }
52}
53
54fn make_latent_scratch(lpd: Option<&DynLatentPageDecompressor>) -> Option<LatentScratch> {
55  let lpd = lpd?;
56
57  match_latent_enum!(
58    lpd,
59    DynLatentPageDecompressor<L>(inner) => {
60      let maybe_constant_value = inner.maybe_constant_value;
61      Some(LatentScratch {
62        is_constant: maybe_constant_value.is_some(),
63        dst: DynLatents::new(vec![maybe_constant_value.unwrap_or_default(); FULL_BATCH_N]).unwrap(),
64      })
65    }
66  )
67}
68
69fn make_latent_decompressors(
70  chunk_meta: &ChunkMeta,
71  page_meta: &PageMeta,
72  n: usize,
73) -> PcoResult<PerLatentVar<DynLatentPageDecompressor>> {
74  let mut states = PerLatentVarBuilder::default();
75  for (key, (chunk_latent_var_meta, page_latent_var_meta)) in chunk_meta
76    .per_latent_var
77    .as_ref()
78    .zip_exact(page_meta.per_latent_var.as_ref())
79    .enumerated()
80  {
81    let var_delta_encoding = chunk_meta.delta_encoding.for_latent_var(key);
82    let n_in_body = n.saturating_sub(var_delta_encoding.n_latents_per_state());
83    let state = match_latent_enum!(
84      &chunk_latent_var_meta.bins,
85      DynBins<L>(bins) => {
86        let delta_state = page_latent_var_meta
87          .delta_state
88          .downcast_ref::<L>()
89          .unwrap()
90          .clone();
91
92        if bins.is_empty() && n_in_body > 0 {
93          return Err(PcoError::corruption(format!(
94            "unable to decompress chunk with no bins and {} latents",
95            n_in_body
96          )));
97        }
98
99        DynLatentPageDecompressor::create(
100          chunk_latent_var_meta.ans_size_log,
101          bins,
102          var_delta_encoding,
103          page_latent_var_meta.ans_final_state_idxs,
104          delta_state,
105        )?
106      }
107    );
108
109    states.set(key, state);
110  }
111  Ok(states.into())
112}
113
114impl<R: BetterBufRead> PageDecompressorInner<R> {
115  pub(crate) fn new(mut src: R, chunk_meta: &ChunkMeta, n: usize) -> PcoResult<Self> {
116    bit_reader::ensure_buf_read_capacity(&mut src, PERFORMANT_BUF_READ_CAPACITY);
117    let mut reader_builder = BitReaderBuilder::new(src, PAGE_PADDING, 0);
118
119    let page_meta =
120      reader_builder.with_reader(|reader| unsafe { PageMeta::read_from(reader, chunk_meta) })?;
121
122    let mode = chunk_meta.mode;
123    let latent_decompressors = make_latent_decompressors(chunk_meta, &page_meta, n)?;
124
125    let delta_scratch = make_latent_scratch(latent_decompressors.delta.as_ref());
126    let secondary_scratch = make_latent_scratch(latent_decompressors.secondary.as_ref());
127
128    // we don't store the whole ChunkMeta because it can get large due to bins
129    Ok(Self {
130      n,
131      mode,
132      delta_encoding: chunk_meta.delta_encoding,
133      reader_builder,
134      n_processed: 0,
135      latent_decompressors,
136      delta_scratch,
137      secondary_scratch,
138    })
139  }
140
141  fn n_remaining(&self) -> usize {
142    self.n - self.n_processed
143  }
144}
145
146impl<T: Number, R: BetterBufRead> PageDecompressor<T, R> {
147  #[inline(never)]
148  pub(crate) fn new(src: R, chunk_meta: &ChunkMeta, n: usize) -> PcoResult<Self> {
149    Ok(Self {
150      inner: PageDecompressorInner::new(src, chunk_meta, n)?,
151      phantom: PhantomData::<T>,
152    })
153  }
154
155  fn decompress_batch(&mut self, dst: &mut [T]) -> PcoResult<()> {
156    let batch_n = dst.len();
157    let inner = &mut self.inner;
158    let n = inner.n;
159    let n_remaining = inner.n_remaining();
160    let mode = inner.mode;
161
162    // DELTA LATENTS
163    if let Some(LatentScratch {
164      is_constant: false,
165      dst,
166    }) = &mut inner.delta_scratch
167    {
168      let dyn_lpd = inner.latent_decompressors.delta.as_mut().unwrap();
169      let limit = min(
170        n_remaining.saturating_sub(inner.delta_encoding.n_latents_per_state()),
171        batch_n,
172      );
173      inner.reader_builder.with_reader(|reader| unsafe {
174        match_latent_enum!(
175          dyn_lpd,
176          DynLatentPageDecompressor<L>(lpd) => {
177            // Delta latents only line up with pre-delta length of the other
178            // latents.
179            // We never apply delta encoding to delta latents, so we just
180            // skip straight to the inner LatentBatchDecompressor
181            lpd.decompress_batch_pre_delta(
182              reader,
183              &mut dst.downcast_mut::<L>().unwrap()[..limit]
184            )
185          }
186        );
187        Ok(())
188      })?;
189    }
190    let delta_latents = inner.delta_scratch.as_ref().map(|scratch| &scratch.dst);
191
192    // PRIMARY LATENTS
193    inner.reader_builder.with_reader(|reader| unsafe {
194      let primary_dst = T::transmute_to_latents(dst);
195      let dyn_lpd = inner
196        .latent_decompressors
197        .primary
198        .downcast_mut::<T::L>()
199        .unwrap();
200      dyn_lpd.decompress_batch(
201        delta_latents,
202        n_remaining,
203        reader,
204        primary_dst,
205      )
206    })?;
207
208    // SECONDARY LATENTS
209    if let Some(LatentScratch {
210      is_constant: false,
211      dst,
212    }) = &mut inner.secondary_scratch
213    {
214      let dyn_lpd = inner.latent_decompressors.secondary.as_mut().unwrap();
215      inner.reader_builder.with_reader(|reader| unsafe {
216        match_latent_enum!(
217          dyn_lpd,
218          DynLatentPageDecompressor<L>(lpd) => {
219            // We never apply delta encoding to delta latents, so we just
220            // skip straight to the inner LatentBatchDecompressor
221            lpd.decompress_batch(
222              delta_latents,
223              n_remaining,
224              reader,
225              &mut dst.downcast_mut::<L>().unwrap()[..batch_n]
226            )
227          }
228        )
229      })?;
230    }
231
232    T::join_latents(
233      mode,
234      T::transmute_to_latents(dst),
235      inner.secondary_scratch.as_ref().map(|scratch| &scratch.dst),
236    );
237    convert_from_latents_to_numbers(dst);
238
239    inner.n_processed += batch_n;
240    if inner.n_processed == n {
241      inner.reader_builder.with_reader(|reader| {
242        reader.drain_empty_byte("expected trailing bits at end of page to be empty")
243      })?;
244    }
245
246    Ok(())
247  }
248
249  /// Reads the next decompressed numbers into the destination, returning
250  /// progress into the page and advancing along the compressed data.
251  ///
252  /// Will return an error if corruptions or insufficient data are found.
253  ///
254  /// `dst` must have length either a multiple of 256 or be at least the count
255  /// of numbers remaining in the page.
256  pub fn decompress(&mut self, num_dst: &mut [T]) -> PcoResult<Progress> {
257    let n_remaining = self.inner.n_remaining();
258    if !num_dst.len().is_multiple_of(FULL_BATCH_N) && num_dst.len() < n_remaining {
259      return Err(PcoError::invalid_argument(format!(
260        "num_dst's length must either be a multiple of {} or be \
261         at least the count of numbers remaining ({} < {})",
262        FULL_BATCH_N,
263        num_dst.len(),
264        n_remaining,
265      )));
266    }
267
268    let n_to_process = min(num_dst.len(), n_remaining);
269
270    let mut n_processed = 0;
271    while n_processed < n_to_process {
272      let dst_batch_end = min(n_processed + FULL_BATCH_N, n_to_process);
273      self.decompress_batch(&mut num_dst[n_processed..dst_batch_end])?;
274      n_processed = dst_batch_end;
275    }
276
277    Ok(Progress {
278      n_processed,
279      finished: self.inner.n_remaining() == 0,
280    })
281  }
282
283  /// Returns the rest of the compressed data source.
284  pub fn into_src(self) -> R {
285    self.inner.reader_builder.into_inner()
286  }
287}