1use std::collections::VecDeque;
2
3use parquet2::{
4 deserialize::SliceFilteredIter,
5 encoding::{hybrid_rle, Encoding},
6 page::{split_buffer, DataPage, DictPage},
7 schema::Repetition,
8 types::decode,
9 types::NativeType as ParquetNativeType,
10};
11
12use crate::{
13 array::MutablePrimitiveArray, bitmap::MutableBitmap, datatypes::DataType, error::Result,
14 types::NativeType,
15};
16
17use super::super::utils;
18use super::super::utils::{get_selected_rows, FilteredOptionalPageValidity, OptionalPageValidity};
19use super::super::Pages;
20
21#[derive(Debug)]
22pub(super) struct FilteredRequiredValues<'a> {
23 values: SliceFilteredIter<std::slice::ChunksExact<'a, u8>>,
24}
25
26impl<'a> FilteredRequiredValues<'a> {
27 pub fn try_new<P: ParquetNativeType>(page: &'a DataPage) -> Result<Self> {
28 let (_, _, values) = split_buffer(page)?;
29 assert_eq!(values.len() % std::mem::size_of::<P>(), 0);
30
31 let values = values.chunks_exact(std::mem::size_of::<P>());
32
33 let rows = get_selected_rows(page);
34 let values = SliceFilteredIter::new(values, rows);
35
36 Ok(Self { values })
37 }
38
39 #[inline]
40 pub fn len(&self) -> usize {
41 self.values.size_hint().0
42 }
43}
44
45#[derive(Debug)]
46pub(super) struct Values<'a> {
47 pub values: std::slice::ChunksExact<'a, u8>,
48}
49
50impl<'a> Values<'a> {
51 pub fn try_new<P: ParquetNativeType>(page: &'a DataPage) -> Result<Self> {
52 let (_, _, values) = split_buffer(page)?;
53 assert_eq!(values.len() % std::mem::size_of::<P>(), 0);
54 Ok(Self {
55 values: values.chunks_exact(std::mem::size_of::<P>()),
56 })
57 }
58
59 #[inline]
60 pub fn len(&self) -> usize {
61 self.values.size_hint().0
62 }
63}
64
65#[derive(Debug)]
66pub(super) struct ValuesDictionary<'a, T>
67where
68 T: NativeType,
69{
70 pub values: hybrid_rle::HybridRleDecoder<'a>,
71 pub dict: &'a Vec<T>,
72}
73
74impl<'a, T> ValuesDictionary<'a, T>
75where
76 T: NativeType,
77{
78 pub fn try_new(page: &'a DataPage, dict: &'a Vec<T>) -> Result<Self> {
79 let values = utils::dict_indices_decoder(page)?;
80
81 Ok(Self { dict, values })
82 }
83
84 #[inline]
85 pub fn len(&self) -> usize {
86 self.values.size_hint().0
87 }
88}
89
90#[derive(Debug)]
92pub(super) enum State<'a, T>
93where
94 T: NativeType,
95{
96 Optional(OptionalPageValidity<'a>, Values<'a>),
97 Required(Values<'a>),
98 RequiredDictionary(ValuesDictionary<'a, T>),
99 OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a, T>),
100 FilteredRequired(FilteredRequiredValues<'a>),
101 FilteredOptional(FilteredOptionalPageValidity<'a>, Values<'a>),
102}
103
104impl<'a, T> utils::PageState<'a> for State<'a, T>
105where
106 T: NativeType,
107{
108 fn len(&self) -> usize {
109 match self {
110 State::Optional(optional, _) => optional.len(),
111 State::Required(values) => values.len(),
112 State::RequiredDictionary(values) => values.len(),
113 State::OptionalDictionary(optional, _) => optional.len(),
114 State::FilteredRequired(values) => values.len(),
115 State::FilteredOptional(optional, _) => optional.len(),
116 }
117 }
118}
119
120#[derive(Debug)]
121pub(super) struct PrimitiveDecoder<T, P, F>
122where
123 T: NativeType,
124 P: ParquetNativeType,
125 F: Fn(P) -> T,
126{
127 phantom: std::marker::PhantomData<T>,
128 phantom_p: std::marker::PhantomData<P>,
129 pub op: F,
130}
131
132impl<T, P, F> PrimitiveDecoder<T, P, F>
133where
134 T: NativeType,
135 P: ParquetNativeType,
136 F: Fn(P) -> T,
137{
138 #[inline]
139 pub(super) fn new(op: F) -> Self {
140 Self {
141 phantom: std::marker::PhantomData,
142 phantom_p: std::marker::PhantomData,
143 op,
144 }
145 }
146}
147
148impl<T: std::fmt::Debug> utils::DecodedState for (Vec<T>, MutableBitmap) {
149 fn len(&self) -> usize {
150 self.0.len()
151 }
152}
153
154impl<'a, T, P, F> utils::Decoder<'a> for PrimitiveDecoder<T, P, F>
155where
156 T: NativeType,
157 P: ParquetNativeType,
158 F: Copy + Fn(P) -> T,
159{
160 type State = State<'a, T>;
161 type Dict = Vec<T>;
162 type DecodedState = (Vec<T>, MutableBitmap);
163
164 fn build_state(&self, page: &'a DataPage, dict: Option<&'a Self::Dict>) -> Result<Self::State> {
165 let is_optional =
166 page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
167 let is_filtered = page.selected_rows().is_some();
168
169 match (page.encoding(), dict, is_optional, is_filtered) {
170 (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
171 ValuesDictionary::try_new(page, dict).map(State::RequiredDictionary)
172 }
173 (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
174 Ok(State::OptionalDictionary(
175 OptionalPageValidity::try_new(page)?,
176 ValuesDictionary::try_new(page, dict)?,
177 ))
178 }
179 (Encoding::Plain, _, true, false) => {
180 let validity = OptionalPageValidity::try_new(page)?;
181 let values = Values::try_new::<P>(page)?;
182
183 Ok(State::Optional(validity, values))
184 }
185 (Encoding::Plain, _, false, false) => Ok(State::Required(Values::try_new::<P>(page)?)),
186 (Encoding::Plain, _, false, true) => {
187 FilteredRequiredValues::try_new::<P>(page).map(State::FilteredRequired)
188 }
189 (Encoding::Plain, _, true, true) => Ok(State::FilteredOptional(
190 FilteredOptionalPageValidity::try_new(page)?,
191 Values::try_new::<P>(page)?,
192 )),
193 _ => Err(utils::not_implemented(page)),
194 }
195 }
196
197 fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
198 (
199 Vec::<T>::with_capacity(capacity),
200 MutableBitmap::with_capacity(capacity),
201 )
202 }
203
204 fn extend_from_state(
205 &self,
206 state: &mut Self::State,
207 decoded: &mut Self::DecodedState,
208 remaining: usize,
209 ) {
210 let (values, validity) = decoded;
211 match state {
212 State::Optional(page_validity, page_values) => utils::extend_from_decoder(
213 validity,
214 page_validity,
215 Some(remaining),
216 values,
217 page_values.values.by_ref().map(decode).map(self.op),
218 ),
219 State::Required(page) => {
220 values.extend(
221 page.values
222 .by_ref()
223 .map(decode)
224 .map(self.op)
225 .take(remaining),
226 );
227 }
228 State::OptionalDictionary(page_validity, page_values) => {
229 let op1 = |index: u32| page_values.dict[index as usize];
230 utils::extend_from_decoder(
231 validity,
232 page_validity,
233 Some(remaining),
234 values,
235 &mut page_values.values.by_ref().map(|x| x.unwrap()).map(op1),
236 )
237 }
238 State::RequiredDictionary(page) => {
239 let op1 = |index: u32| page.dict[index as usize];
240 values.extend(
241 page.values
242 .by_ref()
243 .map(|x| x.unwrap())
244 .map(op1)
245 .take(remaining),
246 );
247 }
248 State::FilteredRequired(page) => {
249 values.extend(
250 page.values
251 .by_ref()
252 .map(decode)
253 .map(self.op)
254 .take(remaining),
255 );
256 }
257 State::FilteredOptional(page_validity, page_values) => {
258 utils::extend_from_decoder(
259 validity,
260 page_validity,
261 Some(remaining),
262 values,
263 page_values.values.by_ref().map(decode).map(self.op),
264 );
265 }
266 }
267 }
268
269 fn deserialize_dict(&self, page: &DictPage) -> Self::Dict {
270 deserialize_plain(&page.buffer, self.op)
271 }
272}
273
274pub(super) fn finish<T: NativeType>(
275 data_type: &DataType,
276 values: Vec<T>,
277 validity: MutableBitmap,
278) -> MutablePrimitiveArray<T> {
279 let validity = if validity.is_empty() {
280 None
281 } else {
282 Some(validity)
283 };
284 MutablePrimitiveArray::try_new(data_type.clone(), values, validity).unwrap()
285}
286
287#[derive(Debug)]
289pub struct Iter<T, I, P, F>
290where
291 I: Pages,
292 T: NativeType,
293 P: ParquetNativeType,
294 F: Fn(P) -> T,
295{
296 iter: I,
297 data_type: DataType,
298 items: VecDeque<(Vec<T>, MutableBitmap)>,
299 remaining: usize,
300 chunk_size: Option<usize>,
301 dict: Option<Vec<T>>,
302 op: F,
303 phantom: std::marker::PhantomData<P>,
304}
305
306impl<T, I, P, F> Iter<T, I, P, F>
307where
308 I: Pages,
309 T: NativeType,
310
311 P: ParquetNativeType,
312 F: Copy + Fn(P) -> T,
313{
314 pub fn new(
315 iter: I,
316 data_type: DataType,
317 num_rows: usize,
318 chunk_size: Option<usize>,
319 op: F,
320 ) -> Self {
321 Self {
322 iter,
323 data_type,
324 items: VecDeque::new(),
325 dict: None,
326 remaining: num_rows,
327 chunk_size,
328 op,
329 phantom: Default::default(),
330 }
331 }
332}
333
334impl<T, I, P, F> Iterator for Iter<T, I, P, F>
335where
336 I: Pages,
337 T: NativeType,
338 P: ParquetNativeType,
339 F: Copy + Fn(P) -> T,
340{
341 type Item = Result<MutablePrimitiveArray<T>>;
342
343 fn next(&mut self) -> Option<Self::Item> {
344 let maybe_state = utils::next(
345 &mut self.iter,
346 &mut self.items,
347 &mut self.dict,
348 &mut self.remaining,
349 self.chunk_size,
350 &PrimitiveDecoder::new(self.op),
351 );
352 match maybe_state {
353 utils::MaybeNext::Some(Ok((values, validity))) => {
354 Some(Ok(finish(&self.data_type, values, validity)))
355 }
356 utils::MaybeNext::Some(Err(e)) => Some(Err(e)),
357 utils::MaybeNext::None => None,
358 utils::MaybeNext::More => self.next(),
359 }
360 }
361}
362
363pub(super) fn deserialize_plain<T, P, F>(values: &[u8], op: F) -> Vec<T>
364where
365 T: NativeType,
366 P: ParquetNativeType,
367 F: Copy + Fn(P) -> T,
368{
369 values
370 .chunks_exact(std::mem::size_of::<P>())
371 .map(decode)
372 .map(op)
373 .collect::<Vec<_>>()
374}