1use std::cmp::min;
2use std::fmt::Debug;
3use std::marker::PhantomData;
4
5use better_io::BetterBufRead;
6
7use crate::bit_reader;
8use crate::bit_reader::BitReaderBuilder;
9use crate::constants::{FULL_BATCH_N, PAGE_PADDING};
10use crate::data_types::Number;
11use crate::errors::{PcoError, PcoResult};
12use crate::latent_page_decompressor::DynLatentPageDecompressor;
13use crate::macros::match_latent_enum;
14use crate::metadata::page::PageMeta;
15use crate::metadata::per_latent_var::{PerLatentVar, PerLatentVarBuilder};
16use crate::metadata::{ChunkMeta, DeltaEncoding, DynBins, DynLatents, Mode};
17use crate::progress::Progress;
18
19const PERFORMANT_BUF_READ_CAPACITY: usize = 8192;
20
21#[derive(Debug)]
22struct LatentScratch {
23 is_constant: bool,
24 dst: DynLatents,
25}
26
27struct PageDecompressorInner<R: BetterBufRead> {
28 n: usize,
30 mode: Mode,
31 delta_encoding: DeltaEncoding,
32
33 reader_builder: BitReaderBuilder<R>,
35 n_processed: usize,
36 latent_decompressors: PerLatentVar<DynLatentPageDecompressor>,
37 delta_scratch: Option<LatentScratch>,
38 secondary_scratch: Option<LatentScratch>,
39}
40
41pub struct PageDecompressor<T: Number, R: BetterBufRead> {
43 inner: PageDecompressorInner<R>,
44 phantom: PhantomData<T>,
45}
46
47fn convert_from_latents_to_numbers<T: Number>(dst: &mut [T]) {
48 for l_and_dst in dst {
50 *l_and_dst = T::from_latent_ordered(l_and_dst.transmute_to_latent());
51 }
52}
53
54fn make_latent_scratch(lpd: Option<&DynLatentPageDecompressor>) -> Option<LatentScratch> {
55 let lpd = lpd?;
56
57 match_latent_enum!(
58 lpd,
59 DynLatentPageDecompressor<L>(inner) => {
60 let maybe_constant_value = inner.maybe_constant_value;
61 Some(LatentScratch {
62 is_constant: maybe_constant_value.is_some(),
63 dst: DynLatents::new(vec![maybe_constant_value.unwrap_or_default(); FULL_BATCH_N]).unwrap(),
64 })
65 }
66 )
67}
68
69fn make_latent_decompressors(
70 chunk_meta: &ChunkMeta,
71 page_meta: &PageMeta,
72 n: usize,
73) -> PcoResult<PerLatentVar<DynLatentPageDecompressor>> {
74 let mut states = PerLatentVarBuilder::default();
75 for (key, (chunk_latent_var_meta, page_latent_var_meta)) in chunk_meta
76 .per_latent_var
77 .as_ref()
78 .zip_exact(page_meta.per_latent_var.as_ref())
79 .enumerated()
80 {
81 let var_delta_encoding = chunk_meta.delta_encoding.for_latent_var(key);
82 let n_in_body = n.saturating_sub(var_delta_encoding.n_latents_per_state());
83 let state = match_latent_enum!(
84 &chunk_latent_var_meta.bins,
85 DynBins<L>(bins) => {
86 let delta_state = page_latent_var_meta
87 .delta_state
88 .downcast_ref::<L>()
89 .unwrap()
90 .clone();
91
92 if bins.is_empty() && n_in_body > 0 {
93 return Err(PcoError::corruption(format!(
94 "unable to decompress chunk with no bins and {} latents",
95 n_in_body
96 )));
97 }
98
99 DynLatentPageDecompressor::create(
100 chunk_latent_var_meta.ans_size_log,
101 bins,
102 var_delta_encoding,
103 page_latent_var_meta.ans_final_state_idxs,
104 delta_state,
105 )?
106 }
107 );
108
109 states.set(key, state);
110 }
111 Ok(states.into())
112}
113
114impl<R: BetterBufRead> PageDecompressorInner<R> {
115 pub(crate) fn new(mut src: R, chunk_meta: &ChunkMeta, n: usize) -> PcoResult<Self> {
116 bit_reader::ensure_buf_read_capacity(&mut src, PERFORMANT_BUF_READ_CAPACITY);
117 let mut reader_builder = BitReaderBuilder::new(src, PAGE_PADDING, 0);
118
119 let page_meta =
120 reader_builder.with_reader(|reader| unsafe { PageMeta::read_from(reader, chunk_meta) })?;
121
122 let mode = chunk_meta.mode;
123 let latent_decompressors = make_latent_decompressors(chunk_meta, &page_meta, n)?;
124
125 let delta_scratch = make_latent_scratch(latent_decompressors.delta.as_ref());
126 let secondary_scratch = make_latent_scratch(latent_decompressors.secondary.as_ref());
127
128 Ok(Self {
130 n,
131 mode,
132 delta_encoding: chunk_meta.delta_encoding,
133 reader_builder,
134 n_processed: 0,
135 latent_decompressors,
136 delta_scratch,
137 secondary_scratch,
138 })
139 }
140
141 fn n_remaining(&self) -> usize {
142 self.n - self.n_processed
143 }
144}
145
146impl<T: Number, R: BetterBufRead> PageDecompressor<T, R> {
147 #[inline(never)]
148 pub(crate) fn new(src: R, chunk_meta: &ChunkMeta, n: usize) -> PcoResult<Self> {
149 Ok(Self {
150 inner: PageDecompressorInner::new(src, chunk_meta, n)?,
151 phantom: PhantomData::<T>,
152 })
153 }
154
155 fn decompress_batch(&mut self, dst: &mut [T]) -> PcoResult<()> {
156 let batch_n = dst.len();
157 let inner = &mut self.inner;
158 let n = inner.n;
159 let n_remaining = inner.n_remaining();
160 let mode = inner.mode;
161
162 if let Some(LatentScratch {
164 is_constant: false,
165 dst,
166 }) = &mut inner.delta_scratch
167 {
168 let dyn_lpd = inner.latent_decompressors.delta.as_mut().unwrap();
169 let limit = min(
170 n_remaining.saturating_sub(inner.delta_encoding.n_latents_per_state()),
171 batch_n,
172 );
173 inner.reader_builder.with_reader(|reader| unsafe {
174 match_latent_enum!(
175 dyn_lpd,
176 DynLatentPageDecompressor<L>(lpd) => {
177 lpd.decompress_batch_pre_delta(
182 reader,
183 &mut dst.downcast_mut::<L>().unwrap()[..limit]
184 )
185 }
186 );
187 Ok(())
188 })?;
189 }
190 let delta_latents = inner.delta_scratch.as_ref().map(|scratch| &scratch.dst);
191
192 inner.reader_builder.with_reader(|reader| unsafe {
194 let primary_dst = T::transmute_to_latents(dst);
195 let dyn_lpd = inner
196 .latent_decompressors
197 .primary
198 .downcast_mut::<T::L>()
199 .unwrap();
200 dyn_lpd.decompress_batch(
201 delta_latents,
202 n_remaining,
203 reader,
204 primary_dst,
205 )
206 })?;
207
208 if let Some(LatentScratch {
210 is_constant: false,
211 dst,
212 }) = &mut inner.secondary_scratch
213 {
214 let dyn_lpd = inner.latent_decompressors.secondary.as_mut().unwrap();
215 inner.reader_builder.with_reader(|reader| unsafe {
216 match_latent_enum!(
217 dyn_lpd,
218 DynLatentPageDecompressor<L>(lpd) => {
219 lpd.decompress_batch(
222 delta_latents,
223 n_remaining,
224 reader,
225 &mut dst.downcast_mut::<L>().unwrap()[..batch_n]
226 )
227 }
228 )
229 })?;
230 }
231
232 T::join_latents(
233 mode,
234 T::transmute_to_latents(dst),
235 inner.secondary_scratch.as_ref().map(|scratch| &scratch.dst),
236 );
237 convert_from_latents_to_numbers(dst);
238
239 inner.n_processed += batch_n;
240 if inner.n_processed == n {
241 inner.reader_builder.with_reader(|reader| {
242 reader.drain_empty_byte("expected trailing bits at end of page to be empty")
243 })?;
244 }
245
246 Ok(())
247 }
248
249 pub fn decompress(&mut self, num_dst: &mut [T]) -> PcoResult<Progress> {
257 let n_remaining = self.inner.n_remaining();
258 if !num_dst.len().is_multiple_of(FULL_BATCH_N) && num_dst.len() < n_remaining {
259 return Err(PcoError::invalid_argument(format!(
260 "num_dst's length must either be a multiple of {} or be \
261 at least the count of numbers remaining ({} < {})",
262 FULL_BATCH_N,
263 num_dst.len(),
264 n_remaining,
265 )));
266 }
267
268 let n_to_process = min(num_dst.len(), n_remaining);
269
270 let mut n_processed = 0;
271 while n_processed < n_to_process {
272 let dst_batch_end = min(n_processed + FULL_BATCH_N, n_to_process);
273 self.decompress_batch(&mut num_dst[n_processed..dst_batch_end])?;
274 n_processed = dst_batch_end;
275 }
276
277 Ok(Progress {
278 n_processed,
279 finished: self.inner.n_remaining() == 0,
280 })
281 }
282
283 pub fn into_src(self) -> R {
285 self.inner.reader_builder.into_inner()
286 }
287}