1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
use std::cmp::min;
use std::fmt::Debug;
use std::marker::PhantomData;

use better_io::BetterBufRead;

use crate::bit_reader::{BitReader, BitReaderBuilder};
use crate::constants::{FULL_BATCH_N, PAGE_PADDING};
use crate::data_types::{NumberLike, UnsignedLike};
use crate::delta::DeltaMoments;
use crate::errors::{PcoError, PcoResult};
use crate::latent_batch_decompressor::LatentBatchDecompressor;
use crate::page_meta::PageMeta;
use crate::progress::Progress;
use crate::wrapped::page_decompressor::SecondaryLatents::Nonconstant;
use crate::wrapped::SecondaryLatents::Constant;
use crate::Mode::*;
use crate::{bit_reader, int_mult_utils, ChunkMeta, Mode};
use crate::{delta, float_mult_utils};

const PERFORMANT_BUF_READ_CAPACITY: usize = 8192;

#[derive(Clone, Debug)]
pub struct State<U: UnsignedLike> {
  n_processed: usize,
  latent_batch_decompressors: Vec<LatentBatchDecompressor<U>>,
  delta_momentss: Vec<DeltaMoments<U>>, // one per latent variable
  secondary_latents: [U; FULL_BATCH_N],
}

/// Holds metadata about a page and supports decompression.
pub struct PageDecompressor<T: NumberLike, R: BetterBufRead> {
  // immutable
  n: usize,
  mode: Mode<T::Unsigned>,
  maybe_constant_secondary: Option<T::Unsigned>,
  phantom: PhantomData<T>,

  // mutable
  reader_builder: BitReaderBuilder<R>,
  state: State<T::Unsigned>,
}

#[inline(never)]
fn unsigneds_to_nums_in_place<T: NumberLike>(dst: &mut [T::Unsigned]) {
  for u in dst.iter_mut() {
    *u = T::transmute_to_unsigned(T::from_unsigned(*u));
  }
}

pub(crate) enum SecondaryLatents<'a, U: UnsignedLike> {
  Nonconstant(&'a mut [U]),
  Constant(U),
}

fn join_latents<U: UnsignedLike>(mode: Mode<U>, primary: &mut [U], secondary: SecondaryLatents<U>) {
  match mode {
    Classic => (), // we already wrote the nums to the primary dst
    FloatMult(config) => float_mult_utils::join_latents(config.base, primary, secondary),
    IntMult(gcd) => int_mult_utils::join_latents(gcd, primary, secondary),
  }
}

fn decompress_latents_w_delta<U: UnsignedLike>(
  reader: &mut BitReader,
  delta_moments: &mut DeltaMoments<U>,
  lbd: &mut LatentBatchDecompressor<U>,
  dst: &mut [U],
  n_remaining: usize,
) -> PcoResult<()> {
  let pre_delta_len = min(
    dst.len(),
    n_remaining.saturating_sub(delta_moments.order()),
  );
  lbd.decompress_latent_batch(reader, &mut dst[..pre_delta_len])?;
  delta::decode_in_place(delta_moments, &mut dst[..]);
  Ok(())
}

impl<T: NumberLike, R: BetterBufRead> PageDecompressor<T, R> {
  pub(crate) fn new(mut src: R, chunk_meta: &ChunkMeta<T::Unsigned>, n: usize) -> PcoResult<Self> {
    bit_reader::ensure_buf_read_capacity(&mut src, PERFORMANT_BUF_READ_CAPACITY);
    let mut reader_builder = BitReaderBuilder::new(src, PAGE_PADDING, 0);

    let page_meta = reader_builder
      .with_reader(|reader| PageMeta::<T::Unsigned>::parse_from(reader, chunk_meta))?;

    let mode = chunk_meta.mode;
    let delta_momentss = page_meta
      .per_var
      .iter()
      .map(|latent| latent.delta_moments.clone())
      .collect::<Vec<_>>();

    let mut latent_batch_decompressors = Vec::new();
    for latent_idx in 0..mode.n_latent_vars() {
      let chunk_latent_meta = &chunk_meta.per_latent_var[latent_idx];
      if chunk_latent_meta.bins.is_empty() && n > chunk_meta.delta_encoding_order {
        return Err(PcoError::corruption(format!(
          "unable to decompress chunk with no bins and {} deltas",
          n - chunk_meta.delta_encoding_order,
        )));
      }

      latent_batch_decompressors.push(LatentBatchDecompressor::new(
        chunk_latent_meta,
        &page_meta.per_var[latent_idx],
      )?);
    }

    let maybe_constant_secondary =
      if latent_batch_decompressors.len() >= 2 && delta_momentss[1].order() == 0 {
        latent_batch_decompressors[1].maybe_constant_value
      } else {
        None
      };

    // we don't store the whole ChunkMeta because it can get large due to bins
    Ok(Self {
      n,
      mode,
      maybe_constant_secondary,
      phantom: PhantomData,
      reader_builder,
      state: State {
        n_processed: 0,
        latent_batch_decompressors,
        delta_momentss,
        secondary_latents: [T::Unsigned::default(); FULL_BATCH_N],
      },
    })
  }

  fn decompress_batch(&mut self, primary_dst: &mut [T]) -> PcoResult<()> {
    let batch_n = primary_dst.len();
    let primary_latents = T::transmute_to_unsigned_slice(primary_dst);
    let n = self.n;
    let mode = self.mode;
    let State {
      latent_batch_decompressors,
      delta_momentss,
      secondary_latents,
      n_processed,
      ..
    } = &mut self.state;

    let secondary_latents = &mut secondary_latents[..batch_n];
    let n_latents = latent_batch_decompressors.len();

    self.reader_builder.with_reader(|reader| {
      decompress_latents_w_delta(
        reader,
        &mut delta_momentss[0],
        &mut latent_batch_decompressors[0],
        primary_latents,
        n - *n_processed,
      )
    })?;

    if n_latents >= 2 && self.maybe_constant_secondary.is_none() {
      self.reader_builder.with_reader(|reader| {
        decompress_latents_w_delta(
          reader,
          &mut delta_momentss[1],
          &mut latent_batch_decompressors[1],
          secondary_latents,
          n - *n_processed,
        )
      })?;
    }
    let secondary = match self.maybe_constant_secondary {
      Some(u) => Constant(u),
      None => Nonconstant(secondary_latents),
    };

    join_latents(mode, primary_latents, secondary);
    unsigneds_to_nums_in_place::<T>(primary_latents);

    *n_processed += batch_n;
    if *n_processed == n {
      self.reader_builder.with_reader(|reader| {
        reader.drain_empty_byte("expected trailing bits at end of page to be empty")
      })?;
    }

    Ok(())
  }

  /// Reads the next decompressed numbers into the destination, returning
  /// progress into the page and advancing along the compressed data.
  ///
  /// Will return an error if corruptions or insufficient data are found.
  ///
  /// `dst` must have length either a multiple of 256 or be at least the count
  /// of numbers remaining in the page.
  pub fn decompress(&mut self, num_dst: &mut [T]) -> PcoResult<Progress> {
    if num_dst.len() % FULL_BATCH_N != 0 && num_dst.len() < self.n_remaining() {
      return Err(PcoError::invalid_argument(format!(
        "num_dst's length must either be a multiple of {} or be \
         at least the count of numbers remaining ({} < {})",
        FULL_BATCH_N,
        num_dst.len(),
        self.n_remaining(),
      )));
    }

    let n_to_process = min(num_dst.len(), self.n_remaining());

    let mut n_processed = 0;
    while n_processed < n_to_process {
      let dst_batch_end = min(n_processed + FULL_BATCH_N, n_to_process);
      self.decompress_batch(&mut num_dst[n_processed..dst_batch_end])?;
      n_processed = dst_batch_end;
    }

    Ok(Progress {
      n_processed,
      finished: self.n_remaining() == 0,
    })
  }

  fn n_remaining(&self) -> usize {
    self.n - self.state.n_processed
  }

  /// Returns the rest of the compressed data source.
  pub fn into_src(self) -> R {
    self.reader_builder.into_inner()
  }
}