arrow2/io/parquet/read/deserialize/primitive/
basic.rs

1use std::collections::VecDeque;
2
3use parquet2::{
4    deserialize::SliceFilteredIter,
5    encoding::{hybrid_rle, Encoding},
6    page::{split_buffer, DataPage, DictPage},
7    schema::Repetition,
8    types::decode,
9    types::NativeType as ParquetNativeType,
10};
11
12use crate::{
13    array::MutablePrimitiveArray, bitmap::MutableBitmap, datatypes::DataType, error::Result,
14    types::NativeType,
15};
16
17use super::super::utils;
18use super::super::utils::{get_selected_rows, FilteredOptionalPageValidity, OptionalPageValidity};
19use super::super::Pages;
20
21#[derive(Debug)]
22pub(super) struct FilteredRequiredValues<'a> {
23    values: SliceFilteredIter<std::slice::ChunksExact<'a, u8>>,
24}
25
26impl<'a> FilteredRequiredValues<'a> {
27    pub fn try_new<P: ParquetNativeType>(page: &'a DataPage) -> Result<Self> {
28        let (_, _, values) = split_buffer(page)?;
29        assert_eq!(values.len() % std::mem::size_of::<P>(), 0);
30
31        let values = values.chunks_exact(std::mem::size_of::<P>());
32
33        let rows = get_selected_rows(page);
34        let values = SliceFilteredIter::new(values, rows);
35
36        Ok(Self { values })
37    }
38
39    #[inline]
40    pub fn len(&self) -> usize {
41        self.values.size_hint().0
42    }
43}
44
45#[derive(Debug)]
46pub(super) struct Values<'a> {
47    pub values: std::slice::ChunksExact<'a, u8>,
48}
49
50impl<'a> Values<'a> {
51    pub fn try_new<P: ParquetNativeType>(page: &'a DataPage) -> Result<Self> {
52        let (_, _, values) = split_buffer(page)?;
53        assert_eq!(values.len() % std::mem::size_of::<P>(), 0);
54        Ok(Self {
55            values: values.chunks_exact(std::mem::size_of::<P>()),
56        })
57    }
58
59    #[inline]
60    pub fn len(&self) -> usize {
61        self.values.size_hint().0
62    }
63}
64
65#[derive(Debug)]
66pub(super) struct ValuesDictionary<'a, T>
67where
68    T: NativeType,
69{
70    pub values: hybrid_rle::HybridRleDecoder<'a>,
71    pub dict: &'a Vec<T>,
72}
73
74impl<'a, T> ValuesDictionary<'a, T>
75where
76    T: NativeType,
77{
78    pub fn try_new(page: &'a DataPage, dict: &'a Vec<T>) -> Result<Self> {
79        let values = utils::dict_indices_decoder(page)?;
80
81        Ok(Self { dict, values })
82    }
83
84    #[inline]
85    pub fn len(&self) -> usize {
86        self.values.size_hint().0
87    }
88}
89
90// The state of a `DataPage` of `Primitive` parquet primitive type
91#[derive(Debug)]
92pub(super) enum State<'a, T>
93where
94    T: NativeType,
95{
96    Optional(OptionalPageValidity<'a>, Values<'a>),
97    Required(Values<'a>),
98    RequiredDictionary(ValuesDictionary<'a, T>),
99    OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a, T>),
100    FilteredRequired(FilteredRequiredValues<'a>),
101    FilteredOptional(FilteredOptionalPageValidity<'a>, Values<'a>),
102}
103
104impl<'a, T> utils::PageState<'a> for State<'a, T>
105where
106    T: NativeType,
107{
108    fn len(&self) -> usize {
109        match self {
110            State::Optional(optional, _) => optional.len(),
111            State::Required(values) => values.len(),
112            State::RequiredDictionary(values) => values.len(),
113            State::OptionalDictionary(optional, _) => optional.len(),
114            State::FilteredRequired(values) => values.len(),
115            State::FilteredOptional(optional, _) => optional.len(),
116        }
117    }
118}
119
120#[derive(Debug)]
121pub(super) struct PrimitiveDecoder<T, P, F>
122where
123    T: NativeType,
124    P: ParquetNativeType,
125    F: Fn(P) -> T,
126{
127    phantom: std::marker::PhantomData<T>,
128    phantom_p: std::marker::PhantomData<P>,
129    pub op: F,
130}
131
132impl<T, P, F> PrimitiveDecoder<T, P, F>
133where
134    T: NativeType,
135    P: ParquetNativeType,
136    F: Fn(P) -> T,
137{
138    #[inline]
139    pub(super) fn new(op: F) -> Self {
140        Self {
141            phantom: std::marker::PhantomData,
142            phantom_p: std::marker::PhantomData,
143            op,
144        }
145    }
146}
147
148impl<T: std::fmt::Debug> utils::DecodedState for (Vec<T>, MutableBitmap) {
149    fn len(&self) -> usize {
150        self.0.len()
151    }
152}
153
154impl<'a, T, P, F> utils::Decoder<'a> for PrimitiveDecoder<T, P, F>
155where
156    T: NativeType,
157    P: ParquetNativeType,
158    F: Copy + Fn(P) -> T,
159{
160    type State = State<'a, T>;
161    type Dict = Vec<T>;
162    type DecodedState = (Vec<T>, MutableBitmap);
163
164    fn build_state(&self, page: &'a DataPage, dict: Option<&'a Self::Dict>) -> Result<Self::State> {
165        let is_optional =
166            page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
167        let is_filtered = page.selected_rows().is_some();
168
169        match (page.encoding(), dict, is_optional, is_filtered) {
170            (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
171                ValuesDictionary::try_new(page, dict).map(State::RequiredDictionary)
172            }
173            (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
174                Ok(State::OptionalDictionary(
175                    OptionalPageValidity::try_new(page)?,
176                    ValuesDictionary::try_new(page, dict)?,
177                ))
178            }
179            (Encoding::Plain, _, true, false) => {
180                let validity = OptionalPageValidity::try_new(page)?;
181                let values = Values::try_new::<P>(page)?;
182
183                Ok(State::Optional(validity, values))
184            }
185            (Encoding::Plain, _, false, false) => Ok(State::Required(Values::try_new::<P>(page)?)),
186            (Encoding::Plain, _, false, true) => {
187                FilteredRequiredValues::try_new::<P>(page).map(State::FilteredRequired)
188            }
189            (Encoding::Plain, _, true, true) => Ok(State::FilteredOptional(
190                FilteredOptionalPageValidity::try_new(page)?,
191                Values::try_new::<P>(page)?,
192            )),
193            _ => Err(utils::not_implemented(page)),
194        }
195    }
196
197    fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
198        (
199            Vec::<T>::with_capacity(capacity),
200            MutableBitmap::with_capacity(capacity),
201        )
202    }
203
204    fn extend_from_state(
205        &self,
206        state: &mut Self::State,
207        decoded: &mut Self::DecodedState,
208        remaining: usize,
209    ) {
210        let (values, validity) = decoded;
211        match state {
212            State::Optional(page_validity, page_values) => utils::extend_from_decoder(
213                validity,
214                page_validity,
215                Some(remaining),
216                values,
217                page_values.values.by_ref().map(decode).map(self.op),
218            ),
219            State::Required(page) => {
220                values.extend(
221                    page.values
222                        .by_ref()
223                        .map(decode)
224                        .map(self.op)
225                        .take(remaining),
226                );
227            }
228            State::OptionalDictionary(page_validity, page_values) => {
229                let op1 = |index: u32| page_values.dict[index as usize];
230                utils::extend_from_decoder(
231                    validity,
232                    page_validity,
233                    Some(remaining),
234                    values,
235                    &mut page_values.values.by_ref().map(|x| x.unwrap()).map(op1),
236                )
237            }
238            State::RequiredDictionary(page) => {
239                let op1 = |index: u32| page.dict[index as usize];
240                values.extend(
241                    page.values
242                        .by_ref()
243                        .map(|x| x.unwrap())
244                        .map(op1)
245                        .take(remaining),
246                );
247            }
248            State::FilteredRequired(page) => {
249                values.extend(
250                    page.values
251                        .by_ref()
252                        .map(decode)
253                        .map(self.op)
254                        .take(remaining),
255                );
256            }
257            State::FilteredOptional(page_validity, page_values) => {
258                utils::extend_from_decoder(
259                    validity,
260                    page_validity,
261                    Some(remaining),
262                    values,
263                    page_values.values.by_ref().map(decode).map(self.op),
264                );
265            }
266        }
267    }
268
269    fn deserialize_dict(&self, page: &DictPage) -> Self::Dict {
270        deserialize_plain(&page.buffer, self.op)
271    }
272}
273
274pub(super) fn finish<T: NativeType>(
275    data_type: &DataType,
276    values: Vec<T>,
277    validity: MutableBitmap,
278) -> MutablePrimitiveArray<T> {
279    let validity = if validity.is_empty() {
280        None
281    } else {
282        Some(validity)
283    };
284    MutablePrimitiveArray::try_new(data_type.clone(), values, validity).unwrap()
285}
286
287/// An [`Iterator`] adapter over [`Pages`] assumed to be encoded as primitive arrays
288#[derive(Debug)]
289pub struct Iter<T, I, P, F>
290where
291    I: Pages,
292    T: NativeType,
293    P: ParquetNativeType,
294    F: Fn(P) -> T,
295{
296    iter: I,
297    data_type: DataType,
298    items: VecDeque<(Vec<T>, MutableBitmap)>,
299    remaining: usize,
300    chunk_size: Option<usize>,
301    dict: Option<Vec<T>>,
302    op: F,
303    phantom: std::marker::PhantomData<P>,
304}
305
306impl<T, I, P, F> Iter<T, I, P, F>
307where
308    I: Pages,
309    T: NativeType,
310
311    P: ParquetNativeType,
312    F: Copy + Fn(P) -> T,
313{
314    pub fn new(
315        iter: I,
316        data_type: DataType,
317        num_rows: usize,
318        chunk_size: Option<usize>,
319        op: F,
320    ) -> Self {
321        Self {
322            iter,
323            data_type,
324            items: VecDeque::new(),
325            dict: None,
326            remaining: num_rows,
327            chunk_size,
328            op,
329            phantom: Default::default(),
330        }
331    }
332}
333
334impl<T, I, P, F> Iterator for Iter<T, I, P, F>
335where
336    I: Pages,
337    T: NativeType,
338    P: ParquetNativeType,
339    F: Copy + Fn(P) -> T,
340{
341    type Item = Result<MutablePrimitiveArray<T>>;
342
343    fn next(&mut self) -> Option<Self::Item> {
344        let maybe_state = utils::next(
345            &mut self.iter,
346            &mut self.items,
347            &mut self.dict,
348            &mut self.remaining,
349            self.chunk_size,
350            &PrimitiveDecoder::new(self.op),
351        );
352        match maybe_state {
353            utils::MaybeNext::Some(Ok((values, validity))) => {
354                Some(Ok(finish(&self.data_type, values, validity)))
355            }
356            utils::MaybeNext::Some(Err(e)) => Some(Err(e)),
357            utils::MaybeNext::None => None,
358            utils::MaybeNext::More => self.next(),
359        }
360    }
361}
362
363pub(super) fn deserialize_plain<T, P, F>(values: &[u8], op: F) -> Vec<T>
364where
365    T: NativeType,
366    P: ParquetNativeType,
367    F: Copy + Fn(P) -> T,
368{
369    values
370        .chunks_exact(std::mem::size_of::<P>())
371        .map(decode)
372        .map(op)
373        .collect::<Vec<_>>()
374}