Skip to main content

parquet_lite/
reader.rs

1use crate::codecs;
2use crate::metadata::MetadataReader;
3use crate::types::*;
4
5/// Decoded column data variants
6#[derive(Debug, Clone)]
7pub enum ColumnData {
8    Boolean(Vec<u8>),
9    Int32(Vec<i32>),
10    Int64(Vec<i64>),
11    Float(Vec<f32>),
12    Double(Vec<f64>),
13    ByteArray(Vec<Vec<u8>>),
14}
15
16/// Main Parquet file reader.
17///
18/// Holds a reference to the raw file bytes and parsed metadata.
19/// Columns are read on demand via `read_column()`.
20pub struct ParquetReader {
21    data: Vec<u8>,
22    metadata: ParquetMetadata,
23}
24
25impl ParquetReader {
26    /// Create a new reader from raw Parquet file bytes.
27    pub fn new(data: &[u8]) -> Result<Self> {
28        let metadata = MetadataReader::read_metadata(data)?;
29        Ok(ParquetReader {
30            data: data.to_vec(),
31            metadata,
32        })
33    }
34
35    /// Get parsed metadata
36    pub fn metadata(&self) -> &ParquetMetadata {
37        &self.metadata
38    }
39
40    /// Total number of rows
41    pub fn num_rows(&self) -> i64 {
42        self.metadata.num_rows
43    }
44
45    /// Number of columns
46    pub fn num_columns(&self) -> usize {
47        self.metadata.num_columns
48    }
49
50    /// Column names
51    pub fn column_names(&self) -> Vec<&str> {
52        self.metadata
53            .columns
54            .iter()
55            .map(|c| c.name.as_str())
56            .collect()
57    }
58
59    /// Read and decode a single column by index.
60    pub fn read_column(&self, column_index: usize) -> Result<ColumnData> {
61        if column_index >= self.metadata.columns.len() {
62            return Err(ParquetError::ColumnOutOfRange(column_index));
63        }
64
65        let col_meta = &self.metadata.columns[column_index];
66        let offset = col_meta.data_offset as usize;
67
68        if offset >= self.data.len() {
69            return Err(ParquetError::DataError(format!(
70                "Column {} data offset {} exceeds file size {}",
71                column_index,
72                offset,
73                self.data.len()
74            )));
75        }
76
77        // Read column chunk data — may span multiple pages
78        let compressed_size = col_meta.total_compressed_size as usize;
79        let end = std::cmp::min(offset + compressed_size, self.data.len());
80        let raw_data = &self.data[offset..end];
81
82        // Parse pages within this column chunk
83        self.decode_column_pages(raw_data, col_meta)
84    }
85
86    /// Decode pages within a column chunk.
87    ///
88    /// Each page has a header (Thrift-encoded) followed by page data.
89    /// We read data pages and concatenate the decoded values.
90    fn decode_column_pages(
91        &self,
92        chunk_data: &[u8],
93        col_meta: &ColumnMetadata,
94    ) -> Result<ColumnData> {
95        let mut pos = 0;
96        let mut all_values = ColumnDataAccumulator::new(col_meta.physical_type);
97        let mut values_read: i64 = 0;
98
99        while pos < chunk_data.len() && values_read < col_meta.num_values {
100            // Parse page header (Thrift Compact Protocol)
101            let page_result = self.parse_page_header(&chunk_data[pos..])?;
102
103            pos += page_result.header_size;
104
105            let page_data_end = std::cmp::min(pos + page_result.compressed_size, chunk_data.len());
106            let page_data = &chunk_data[pos..page_data_end];
107
108            match page_result.page_type {
109                PageType::DictionaryPage => {
110                    // Skip dictionary pages for now (plain encoding doesn't use them)
111                    pos = page_data_end;
112                    continue;
113                }
114                PageType::DataPage | PageType::DataPageV2 => {
115                    // Decompress if needed
116                    let decompressed = if col_meta.compression == Compression::Uncompressed {
117                        page_data.to_vec()
118                    } else {
119                        let codec = codecs::get_codec(col_meta.compression)?;
120                        codec.decompress(page_data, page_result.uncompressed_size)?
121                    };
122
123                    // For DataPageV1, skip repetition & definition level data
124                    // In flat schemas with required columns, these are absent or minimal
125                    let value_data = self.skip_levels(&decompressed, col_meta)?;
126
127                    // Decode the values
128                    let num_values = page_result.num_values as usize;
129                    all_values.decode_and_append(value_data, col_meta.physical_type, num_values)?;
130                    values_read += page_result.num_values as i64;
131                }
132                _ => {
133                    // Skip index pages, etc.
134                }
135            }
136
137            pos = page_data_end;
138        }
139
140        all_values.into_column_data()
141    }
142
143    /// Skip repetition and definition level data in a data page.
144    ///
145    /// For flat schemas with all-required columns, levels are either absent
146    /// or encoded with bit-width 0 (which takes 0 bytes). This is a
147    /// simplified implementation that works for the common case.
148    fn skip_levels<'a>(
149        &self,
150        data: &'a [u8],
151        _col_meta: &ColumnMetadata,
152    ) -> Result<&'a [u8]> {
153        // In plain flat schemas, there are no repetition/definition levels
154        // The entire page data is values
155        Ok(data)
156    }
157
158    /// Parse a page header from the Thrift stream.
159    fn parse_page_header(&self, data: &[u8]) -> Result<PageHeaderInfo> {
160        let mut pos = 0;
161
162        // Read page_type (field 1, i32 zigzag)
163        let (page_type_code, _bytes_read) = read_thrift_field_and_varint(data, &mut pos)?;
164
165        // Read uncompressed_page_size (field 2, i32 zigzag)
166        let (uncompressed_size, _) = read_thrift_field_and_varint(data, &mut pos)?;
167
168        // Read compressed_page_size (field 3, i32 zigzag)
169        let (compressed_size, _) = read_thrift_field_and_varint(data, &mut pos)?;
170
171        let page_type = PageType::from_thrift(page_type_code)?;
172
173        // For data pages, read the DataPageHeader struct to get num_values
174        let mut num_values: i32 = 0;
175
176        if page_type == PageType::DataPage || page_type == PageType::DataPageV2 {
177            // Field 5 (DataPageHeader) or field 8 (DataPageHeaderV2) is a struct
178            // We need to skip to it and parse num_values from it
179            // For simplicity, scan for the next struct field and parse num_values
180            num_values = self.parse_data_page_header_num_values(data, &mut pos)?;
181        }
182
183        Ok(PageHeaderInfo {
184            page_type,
185            uncompressed_size: uncompressed_size as usize,
186            compressed_size: compressed_size as usize,
187            num_values,
188            header_size: pos,
189        })
190    }
191
192    /// Extract num_values from a DataPageHeader or DataPageHeaderV2.
193    fn parse_data_page_header_num_values(&self, data: &[u8], pos: &mut usize) -> Result<i32> {
194        // Skip to the struct field (field 5 for DataPageHeader)
195        // Read the field header byte
196        while *pos < data.len() {
197            let byte = data[*pos];
198            if byte == 0 {
199                *pos += 1;
200                break; // stop byte — shouldn't happen here but be safe
201            }
202
203            let field_type = byte & 0x0F;
204            let field_delta = (byte >> 4) & 0x0F;
205            *pos += 1;
206
207            if field_delta == 0 {
208                // Long form field ID
209                let _ = read_zigzag_varint(data, pos)?;
210            }
211
212            if field_type == 12 {
213                // Struct — this is our DataPageHeader
214                // First field inside should be num_values (field 1, i32)
215                if *pos < data.len() {
216                    let inner_byte = data[*pos];
217                    let inner_type = inner_byte & 0x0F;
218                    *pos += 1;
219
220                    if inner_type == 5 {
221                        // i32 zigzag
222                        let val = read_zigzag_varint(data, pos)?;
223                        // Skip the rest of the struct
224                        self.skip_remaining_struct(data, pos);
225                        // Skip any remaining fields in the page header
226                        self.skip_remaining_struct(data, pos);
227                        return Ok(val);
228                    }
229                }
230                self.skip_remaining_struct(data, pos);
231            } else {
232                skip_thrift_value(data, pos, field_type)?;
233            }
234        }
235
236        // Fallback: couldn't parse num_values, estimate from metadata
237        Ok(0)
238    }
239
240    /// Skip remaining fields in a Thrift struct until the stop byte.
241    fn skip_remaining_struct(&self, data: &[u8], pos: &mut usize) {
242        while *pos < data.len() {
243            let byte = data[*pos];
244            if byte == 0 {
245                *pos += 1;
246                return;
247            }
248            *pos += 1;
249
250            let field_type = byte & 0x0F;
251            let field_delta = (byte >> 4) & 0x0F;
252
253            if field_delta == 0 && *pos < data.len() {
254                let _ = read_zigzag_varint(data, pos);
255            }
256
257            let _ = skip_thrift_value(data, pos, field_type);
258        }
259    }
260}
261
262// ---------------------------------------------------------------------------
263// Page header parsing helpers
264// ---------------------------------------------------------------------------
265
266struct PageHeaderInfo {
267    page_type: PageType,
268    uncompressed_size: usize,
269    compressed_size: usize,
270    num_values: i32,
271    header_size: usize,
272}
273
274/// Read a Thrift field header byte + zigzag varint value.
275fn read_thrift_field_and_varint(data: &[u8], pos: &mut usize) -> Result<(i32, usize)> {
276    if *pos >= data.len() {
277        return Err(ParquetError::InvalidFile("Unexpected end of page header".into()));
278    }
279
280    let byte = data[*pos];
281    *pos += 1;
282
283    let _field_type = byte & 0x0F;
284    let field_delta = (byte >> 4) & 0x0F;
285
286    if field_delta == 0 {
287        // Long form — field ID follows
288        let _ = read_zigzag_varint(data, pos)?;
289    }
290
291    let value = read_zigzag_varint(data, pos)?;
292    Ok((value, 0))
293}
294
295/// Read a zigzag-encoded varint from data at pos.
296fn read_zigzag_varint(data: &[u8], pos: &mut usize) -> Result<i32> {
297    let mut result: u32 = 0;
298    let mut shift: u32 = 0;
299    loop {
300        if *pos >= data.len() {
301            return Err(ParquetError::InvalidFile("Varint extends past data".into()));
302        }
303        let b = data[*pos] as u32;
304        *pos += 1;
305        result |= (b & 0x7F) << shift;
306        if b & 0x80 == 0 {
307            break;
308        }
309        shift += 7;
310        if shift >= 32 {
311            return Err(ParquetError::InvalidFile("Varint too long".into()));
312        }
313    }
314    // Zigzag decode
315    Ok(((result >> 1) as i32) ^ -((result & 1) as i32))
316}
317
318/// Skip a Thrift value of the given type.
319fn skip_thrift_value(data: &[u8], pos: &mut usize, field_type: u8) -> Result<()> {
320    match field_type {
321        1 | 2 => {} // bool
322        3..=6 => {
323            // i8/i16/i32/i64 varint
324            let _ = read_zigzag_varint(data, pos)?;
325        }
326        7 => {
327            // double — 8 bytes
328            *pos += 8;
329        }
330        8 => {
331            // binary/string
332            let len = {
333                let mut result: u32 = 0;
334                let mut shift: u32 = 0;
335                loop {
336                    if *pos >= data.len() {
337                        return Ok(());
338                    }
339                    let b = data[*pos] as u32;
340                    *pos += 1;
341                    result |= (b & 0x7F) << shift;
342                    if b & 0x80 == 0 {
343                        break;
344                    }
345                    shift += 7;
346                }
347                result as usize
348            };
349            *pos += len;
350        }
351        9 | 10 => {
352            // list/set
353            if *pos >= data.len() {
354                return Ok(());
355            }
356            let header = data[*pos];
357            *pos += 1;
358            let count = ((header >> 4) & 0x0F) as usize;
359            let elem_type = header & 0x0F;
360            let actual_count = if count == 0x0F {
361                
362                read_zigzag_varint(data, pos)? as usize
363            } else {
364                count
365            };
366            for _ in 0..actual_count {
367                skip_thrift_value(data, pos, elem_type)?;
368            }
369        }
370        12 => {
371            // struct
372            loop {
373                if *pos >= data.len() {
374                    return Ok(());
375                }
376                let byte = data[*pos];
377                if byte == 0 {
378                    *pos += 1;
379                    return Ok(());
380                }
381                *pos += 1;
382                let ft = byte & 0x0F;
383                let fd = (byte >> 4) & 0x0F;
384                if fd == 0 {
385                    let _ = read_zigzag_varint(data, pos)?;
386                }
387                skip_thrift_value(data, pos, ft)?;
388            }
389        }
390        _ => {
391            let _ = read_zigzag_varint(data, pos);
392        }
393    }
394    Ok(())
395}
396
397// ---------------------------------------------------------------------------
398// Column data accumulator for building up values across pages
399// ---------------------------------------------------------------------------
400
401struct ColumnDataAccumulator {
402    booleans: Vec<u8>,
403    int32s: Vec<i32>,
404    int64s: Vec<i64>,
405    floats: Vec<f32>,
406    doubles: Vec<f64>,
407    byte_arrays: Vec<Vec<u8>>,
408    physical_type: ParquetType,
409}
410
411impl ColumnDataAccumulator {
412    fn new(physical_type: ParquetType) -> Self {
413        ColumnDataAccumulator {
414            booleans: Vec::new(),
415            int32s: Vec::new(),
416            int64s: Vec::new(),
417            floats: Vec::new(),
418            doubles: Vec::new(),
419            byte_arrays: Vec::new(),
420            physical_type,
421        }
422    }
423
424    fn decode_and_append(
425        &mut self,
426        data: &[u8],
427        physical_type: ParquetType,
428        num_values: usize,
429    ) -> Result<()> {
430        match physical_type {
431            ParquetType::Boolean => {
432                // Booleans are bit-packed
433                for i in 0..num_values {
434                    let byte_idx = i / 8;
435                    let bit_idx = i % 8;
436                    if byte_idx < data.len() {
437                        let val = (data[byte_idx] >> bit_idx) & 1;
438                        self.booleans.push(val);
439                    }
440                }
441            }
442            ParquetType::Int32 => {
443                let values = decode_plain_i32(data, num_values);
444                self.int32s.extend(values);
445            }
446            ParquetType::Int64 => {
447                let values = decode_plain_i64(data, num_values);
448                self.int64s.extend(values);
449            }
450            ParquetType::Int96 => {
451                // Int96 = 12 bytes each, typically timestamps
452                // Convert to i64 nanoseconds
453                for i in 0..num_values {
454                    let offset = i * 12;
455                    if offset + 12 <= data.len() {
456                        let nanos = i64::from_le_bytes([
457                            data[offset],
458                            data[offset + 1],
459                            data[offset + 2],
460                            data[offset + 3],
461                            data[offset + 4],
462                            data[offset + 5],
463                            data[offset + 6],
464                            data[offset + 7],
465                        ]);
466                        self.int64s.push(nanos);
467                    }
468                }
469            }
470            ParquetType::Float => {
471                let values = decode_plain_f32(data, num_values);
472                self.floats.extend(values);
473            }
474            ParquetType::Double => {
475                let values = decode_plain_f64(data, num_values);
476                self.doubles.extend(values);
477            }
478            ParquetType::ByteArray => {
479                let values = decode_plain_byte_array(data, num_values);
480                self.byte_arrays.extend(values);
481            }
482            ParquetType::FixedLenByteArray(len) => {
483                let fixed_len = len as usize;
484                for i in 0..num_values {
485                    let start = i * fixed_len;
486                    let end = start + fixed_len;
487                    if end <= data.len() {
488                        self.byte_arrays.push(data[start..end].to_vec());
489                    }
490                }
491            }
492        }
493        Ok(())
494    }
495
496    fn into_column_data(self) -> Result<ColumnData> {
497        match self.physical_type {
498            ParquetType::Boolean => Ok(ColumnData::Boolean(self.booleans)),
499            ParquetType::Int32 => Ok(ColumnData::Int32(self.int32s)),
500            ParquetType::Int64 | ParquetType::Int96 => Ok(ColumnData::Int64(self.int64s)),
501            ParquetType::Float => Ok(ColumnData::Float(self.floats)),
502            ParquetType::Double => Ok(ColumnData::Double(self.doubles)),
503            ParquetType::ByteArray | ParquetType::FixedLenByteArray(_) => {
504                Ok(ColumnData::ByteArray(self.byte_arrays))
505            }
506        }
507    }
508}
509
510// ---------------------------------------------------------------------------
511// Plain encoding decoders
512// ---------------------------------------------------------------------------
513
514fn decode_plain_i32(data: &[u8], num_values: usize) -> Vec<i32> {
515    let mut values = Vec::with_capacity(num_values);
516    for i in 0..num_values {
517        let offset = i * 4;
518        if offset + 4 <= data.len() {
519            values.push(i32::from_le_bytes([
520                data[offset],
521                data[offset + 1],
522                data[offset + 2],
523                data[offset + 3],
524            ]));
525        }
526    }
527    values
528}
529
530fn decode_plain_i64(data: &[u8], num_values: usize) -> Vec<i64> {
531    let mut values = Vec::with_capacity(num_values);
532    for i in 0..num_values {
533        let offset = i * 8;
534        if offset + 8 <= data.len() {
535            values.push(i64::from_le_bytes([
536                data[offset],
537                data[offset + 1],
538                data[offset + 2],
539                data[offset + 3],
540                data[offset + 4],
541                data[offset + 5],
542                data[offset + 6],
543                data[offset + 7],
544            ]));
545        }
546    }
547    values
548}
549
550fn decode_plain_f32(data: &[u8], num_values: usize) -> Vec<f32> {
551    let mut values = Vec::with_capacity(num_values);
552    for i in 0..num_values {
553        let offset = i * 4;
554        if offset + 4 <= data.len() {
555            values.push(f32::from_le_bytes([
556                data[offset],
557                data[offset + 1],
558                data[offset + 2],
559                data[offset + 3],
560            ]));
561        }
562    }
563    values
564}
565
566fn decode_plain_f64(data: &[u8], num_values: usize) -> Vec<f64> {
567    let mut values = Vec::with_capacity(num_values);
568    for i in 0..num_values {
569        let offset = i * 8;
570        if offset + 8 <= data.len() {
571            values.push(f64::from_le_bytes([
572                data[offset],
573                data[offset + 1],
574                data[offset + 2],
575                data[offset + 3],
576                data[offset + 4],
577                data[offset + 5],
578                data[offset + 6],
579                data[offset + 7],
580            ]));
581        }
582    }
583    values
584}
585
586fn decode_plain_byte_array(data: &[u8], num_values: usize) -> Vec<Vec<u8>> {
587    let mut values = Vec::with_capacity(num_values);
588    let mut pos = 0;
589    for _ in 0..num_values {
590        if pos + 4 > data.len() {
591            break;
592        }
593        let len = u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
594            as usize;
595        pos += 4;
596        if pos + len > data.len() {
597            break;
598        }
599        values.push(data[pos..pos + len].to_vec());
600        pos += len;
601    }
602    values
603}