1use crate::codecs;
2use crate::metadata::MetadataReader;
3use crate::types::*;
4
5#[derive(Debug, Clone)]
7pub enum ColumnData {
8 Boolean(Vec<u8>),
9 Int32(Vec<i32>),
10 Int64(Vec<i64>),
11 Float(Vec<f32>),
12 Double(Vec<f64>),
13 ByteArray(Vec<Vec<u8>>),
14}
15
16pub struct ParquetReader {
21 data: Vec<u8>,
22 metadata: ParquetMetadata,
23}
24
25impl ParquetReader {
26 pub fn new(data: &[u8]) -> Result<Self> {
28 let metadata = MetadataReader::read_metadata(data)?;
29 Ok(ParquetReader {
30 data: data.to_vec(),
31 metadata,
32 })
33 }
34
35 pub fn metadata(&self) -> &ParquetMetadata {
37 &self.metadata
38 }
39
40 pub fn num_rows(&self) -> i64 {
42 self.metadata.num_rows
43 }
44
45 pub fn num_columns(&self) -> usize {
47 self.metadata.num_columns
48 }
49
50 pub fn column_names(&self) -> Vec<&str> {
52 self.metadata
53 .columns
54 .iter()
55 .map(|c| c.name.as_str())
56 .collect()
57 }
58
59 pub fn read_column(&self, column_index: usize) -> Result<ColumnData> {
61 if column_index >= self.metadata.columns.len() {
62 return Err(ParquetError::ColumnOutOfRange(column_index));
63 }
64
65 let col_meta = &self.metadata.columns[column_index];
66 let offset = col_meta.data_offset as usize;
67
68 if offset >= self.data.len() {
69 return Err(ParquetError::DataError(format!(
70 "Column {} data offset {} exceeds file size {}",
71 column_index,
72 offset,
73 self.data.len()
74 )));
75 }
76
77 let compressed_size = col_meta.total_compressed_size as usize;
79 let end = std::cmp::min(offset + compressed_size, self.data.len());
80 let raw_data = &self.data[offset..end];
81
82 self.decode_column_pages(raw_data, col_meta)
84 }
85
86 fn decode_column_pages(
91 &self,
92 chunk_data: &[u8],
93 col_meta: &ColumnMetadata,
94 ) -> Result<ColumnData> {
95 let mut pos = 0;
96 let mut all_values = ColumnDataAccumulator::new(col_meta.physical_type);
97 let mut values_read: i64 = 0;
98
99 while pos < chunk_data.len() && values_read < col_meta.num_values {
100 let page_result = self.parse_page_header(&chunk_data[pos..])?;
102
103 pos += page_result.header_size;
104
105 let page_data_end = std::cmp::min(pos + page_result.compressed_size, chunk_data.len());
106 let page_data = &chunk_data[pos..page_data_end];
107
108 match page_result.page_type {
109 PageType::DictionaryPage => {
110 pos = page_data_end;
112 continue;
113 }
114 PageType::DataPage | PageType::DataPageV2 => {
115 let decompressed = if col_meta.compression == Compression::Uncompressed {
117 page_data.to_vec()
118 } else {
119 let codec = codecs::get_codec(col_meta.compression)?;
120 codec.decompress(page_data, page_result.uncompressed_size)?
121 };
122
123 let value_data = self.skip_levels(&decompressed, col_meta)?;
126
127 let num_values = page_result.num_values as usize;
129 all_values.decode_and_append(value_data, col_meta.physical_type, num_values)?;
130 values_read += page_result.num_values as i64;
131 }
132 _ => {
133 }
135 }
136
137 pos = page_data_end;
138 }
139
140 all_values.into_column_data()
141 }
142
143 fn skip_levels<'a>(
149 &self,
150 data: &'a [u8],
151 _col_meta: &ColumnMetadata,
152 ) -> Result<&'a [u8]> {
153 Ok(data)
156 }
157
158 fn parse_page_header(&self, data: &[u8]) -> Result<PageHeaderInfo> {
160 let mut pos = 0;
161
162 let (page_type_code, _bytes_read) = read_thrift_field_and_varint(data, &mut pos)?;
164
165 let (uncompressed_size, _) = read_thrift_field_and_varint(data, &mut pos)?;
167
168 let (compressed_size, _) = read_thrift_field_and_varint(data, &mut pos)?;
170
171 let page_type = PageType::from_thrift(page_type_code)?;
172
173 let mut num_values: i32 = 0;
175
176 if page_type == PageType::DataPage || page_type == PageType::DataPageV2 {
177 num_values = self.parse_data_page_header_num_values(data, &mut pos)?;
181 }
182
183 Ok(PageHeaderInfo {
184 page_type,
185 uncompressed_size: uncompressed_size as usize,
186 compressed_size: compressed_size as usize,
187 num_values,
188 header_size: pos,
189 })
190 }
191
192 fn parse_data_page_header_num_values(&self, data: &[u8], pos: &mut usize) -> Result<i32> {
194 while *pos < data.len() {
197 let byte = data[*pos];
198 if byte == 0 {
199 *pos += 1;
200 break; }
202
203 let field_type = byte & 0x0F;
204 let field_delta = (byte >> 4) & 0x0F;
205 *pos += 1;
206
207 if field_delta == 0 {
208 let _ = read_zigzag_varint(data, pos)?;
210 }
211
212 if field_type == 12 {
213 if *pos < data.len() {
216 let inner_byte = data[*pos];
217 let inner_type = inner_byte & 0x0F;
218 *pos += 1;
219
220 if inner_type == 5 {
221 let val = read_zigzag_varint(data, pos)?;
223 self.skip_remaining_struct(data, pos);
225 self.skip_remaining_struct(data, pos);
227 return Ok(val);
228 }
229 }
230 self.skip_remaining_struct(data, pos);
231 } else {
232 skip_thrift_value(data, pos, field_type)?;
233 }
234 }
235
236 Ok(0)
238 }
239
240 fn skip_remaining_struct(&self, data: &[u8], pos: &mut usize) {
242 while *pos < data.len() {
243 let byte = data[*pos];
244 if byte == 0 {
245 *pos += 1;
246 return;
247 }
248 *pos += 1;
249
250 let field_type = byte & 0x0F;
251 let field_delta = (byte >> 4) & 0x0F;
252
253 if field_delta == 0 && *pos < data.len() {
254 let _ = read_zigzag_varint(data, pos);
255 }
256
257 let _ = skip_thrift_value(data, pos, field_type);
258 }
259 }
260}
261
262struct PageHeaderInfo {
267 page_type: PageType,
268 uncompressed_size: usize,
269 compressed_size: usize,
270 num_values: i32,
271 header_size: usize,
272}
273
274fn read_thrift_field_and_varint(data: &[u8], pos: &mut usize) -> Result<(i32, usize)> {
276 if *pos >= data.len() {
277 return Err(ParquetError::InvalidFile("Unexpected end of page header".into()));
278 }
279
280 let byte = data[*pos];
281 *pos += 1;
282
283 let _field_type = byte & 0x0F;
284 let field_delta = (byte >> 4) & 0x0F;
285
286 if field_delta == 0 {
287 let _ = read_zigzag_varint(data, pos)?;
289 }
290
291 let value = read_zigzag_varint(data, pos)?;
292 Ok((value, 0))
293}
294
295fn read_zigzag_varint(data: &[u8], pos: &mut usize) -> Result<i32> {
297 let mut result: u32 = 0;
298 let mut shift: u32 = 0;
299 loop {
300 if *pos >= data.len() {
301 return Err(ParquetError::InvalidFile("Varint extends past data".into()));
302 }
303 let b = data[*pos] as u32;
304 *pos += 1;
305 result |= (b & 0x7F) << shift;
306 if b & 0x80 == 0 {
307 break;
308 }
309 shift += 7;
310 if shift >= 32 {
311 return Err(ParquetError::InvalidFile("Varint too long".into()));
312 }
313 }
314 Ok(((result >> 1) as i32) ^ -((result & 1) as i32))
316}
317
318fn skip_thrift_value(data: &[u8], pos: &mut usize, field_type: u8) -> Result<()> {
320 match field_type {
321 1 | 2 => {} 3..=6 => {
323 let _ = read_zigzag_varint(data, pos)?;
325 }
326 7 => {
327 *pos += 8;
329 }
330 8 => {
331 let len = {
333 let mut result: u32 = 0;
334 let mut shift: u32 = 0;
335 loop {
336 if *pos >= data.len() {
337 return Ok(());
338 }
339 let b = data[*pos] as u32;
340 *pos += 1;
341 result |= (b & 0x7F) << shift;
342 if b & 0x80 == 0 {
343 break;
344 }
345 shift += 7;
346 }
347 result as usize
348 };
349 *pos += len;
350 }
351 9 | 10 => {
352 if *pos >= data.len() {
354 return Ok(());
355 }
356 let header = data[*pos];
357 *pos += 1;
358 let count = ((header >> 4) & 0x0F) as usize;
359 let elem_type = header & 0x0F;
360 let actual_count = if count == 0x0F {
361
362 read_zigzag_varint(data, pos)? as usize
363 } else {
364 count
365 };
366 for _ in 0..actual_count {
367 skip_thrift_value(data, pos, elem_type)?;
368 }
369 }
370 12 => {
371 loop {
373 if *pos >= data.len() {
374 return Ok(());
375 }
376 let byte = data[*pos];
377 if byte == 0 {
378 *pos += 1;
379 return Ok(());
380 }
381 *pos += 1;
382 let ft = byte & 0x0F;
383 let fd = (byte >> 4) & 0x0F;
384 if fd == 0 {
385 let _ = read_zigzag_varint(data, pos)?;
386 }
387 skip_thrift_value(data, pos, ft)?;
388 }
389 }
390 _ => {
391 let _ = read_zigzag_varint(data, pos);
392 }
393 }
394 Ok(())
395}
396
397struct ColumnDataAccumulator {
402 booleans: Vec<u8>,
403 int32s: Vec<i32>,
404 int64s: Vec<i64>,
405 floats: Vec<f32>,
406 doubles: Vec<f64>,
407 byte_arrays: Vec<Vec<u8>>,
408 physical_type: ParquetType,
409}
410
411impl ColumnDataAccumulator {
412 fn new(physical_type: ParquetType) -> Self {
413 ColumnDataAccumulator {
414 booleans: Vec::new(),
415 int32s: Vec::new(),
416 int64s: Vec::new(),
417 floats: Vec::new(),
418 doubles: Vec::new(),
419 byte_arrays: Vec::new(),
420 physical_type,
421 }
422 }
423
424 fn decode_and_append(
425 &mut self,
426 data: &[u8],
427 physical_type: ParquetType,
428 num_values: usize,
429 ) -> Result<()> {
430 match physical_type {
431 ParquetType::Boolean => {
432 for i in 0..num_values {
434 let byte_idx = i / 8;
435 let bit_idx = i % 8;
436 if byte_idx < data.len() {
437 let val = (data[byte_idx] >> bit_idx) & 1;
438 self.booleans.push(val);
439 }
440 }
441 }
442 ParquetType::Int32 => {
443 let values = decode_plain_i32(data, num_values);
444 self.int32s.extend(values);
445 }
446 ParquetType::Int64 => {
447 let values = decode_plain_i64(data, num_values);
448 self.int64s.extend(values);
449 }
450 ParquetType::Int96 => {
451 for i in 0..num_values {
454 let offset = i * 12;
455 if offset + 12 <= data.len() {
456 let nanos = i64::from_le_bytes([
457 data[offset],
458 data[offset + 1],
459 data[offset + 2],
460 data[offset + 3],
461 data[offset + 4],
462 data[offset + 5],
463 data[offset + 6],
464 data[offset + 7],
465 ]);
466 self.int64s.push(nanos);
467 }
468 }
469 }
470 ParquetType::Float => {
471 let values = decode_plain_f32(data, num_values);
472 self.floats.extend(values);
473 }
474 ParquetType::Double => {
475 let values = decode_plain_f64(data, num_values);
476 self.doubles.extend(values);
477 }
478 ParquetType::ByteArray => {
479 let values = decode_plain_byte_array(data, num_values);
480 self.byte_arrays.extend(values);
481 }
482 ParquetType::FixedLenByteArray(len) => {
483 let fixed_len = len as usize;
484 for i in 0..num_values {
485 let start = i * fixed_len;
486 let end = start + fixed_len;
487 if end <= data.len() {
488 self.byte_arrays.push(data[start..end].to_vec());
489 }
490 }
491 }
492 }
493 Ok(())
494 }
495
496 fn into_column_data(self) -> Result<ColumnData> {
497 match self.physical_type {
498 ParquetType::Boolean => Ok(ColumnData::Boolean(self.booleans)),
499 ParquetType::Int32 => Ok(ColumnData::Int32(self.int32s)),
500 ParquetType::Int64 | ParquetType::Int96 => Ok(ColumnData::Int64(self.int64s)),
501 ParquetType::Float => Ok(ColumnData::Float(self.floats)),
502 ParquetType::Double => Ok(ColumnData::Double(self.doubles)),
503 ParquetType::ByteArray | ParquetType::FixedLenByteArray(_) => {
504 Ok(ColumnData::ByteArray(self.byte_arrays))
505 }
506 }
507 }
508}
509
510fn decode_plain_i32(data: &[u8], num_values: usize) -> Vec<i32> {
515 let mut values = Vec::with_capacity(num_values);
516 for i in 0..num_values {
517 let offset = i * 4;
518 if offset + 4 <= data.len() {
519 values.push(i32::from_le_bytes([
520 data[offset],
521 data[offset + 1],
522 data[offset + 2],
523 data[offset + 3],
524 ]));
525 }
526 }
527 values
528}
529
530fn decode_plain_i64(data: &[u8], num_values: usize) -> Vec<i64> {
531 let mut values = Vec::with_capacity(num_values);
532 for i in 0..num_values {
533 let offset = i * 8;
534 if offset + 8 <= data.len() {
535 values.push(i64::from_le_bytes([
536 data[offset],
537 data[offset + 1],
538 data[offset + 2],
539 data[offset + 3],
540 data[offset + 4],
541 data[offset + 5],
542 data[offset + 6],
543 data[offset + 7],
544 ]));
545 }
546 }
547 values
548}
549
550fn decode_plain_f32(data: &[u8], num_values: usize) -> Vec<f32> {
551 let mut values = Vec::with_capacity(num_values);
552 for i in 0..num_values {
553 let offset = i * 4;
554 if offset + 4 <= data.len() {
555 values.push(f32::from_le_bytes([
556 data[offset],
557 data[offset + 1],
558 data[offset + 2],
559 data[offset + 3],
560 ]));
561 }
562 }
563 values
564}
565
566fn decode_plain_f64(data: &[u8], num_values: usize) -> Vec<f64> {
567 let mut values = Vec::with_capacity(num_values);
568 for i in 0..num_values {
569 let offset = i * 8;
570 if offset + 8 <= data.len() {
571 values.push(f64::from_le_bytes([
572 data[offset],
573 data[offset + 1],
574 data[offset + 2],
575 data[offset + 3],
576 data[offset + 4],
577 data[offset + 5],
578 data[offset + 6],
579 data[offset + 7],
580 ]));
581 }
582 }
583 values
584}
585
586fn decode_plain_byte_array(data: &[u8], num_values: usize) -> Vec<Vec<u8>> {
587 let mut values = Vec::with_capacity(num_values);
588 let mut pos = 0;
589 for _ in 0..num_values {
590 if pos + 4 > data.len() {
591 break;
592 }
593 let len = u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
594 as usize;
595 pos += 4;
596 if pos + len > data.len() {
597 break;
598 }
599 values.push(data[pos..pos + len].to_vec());
600 pos += len;
601 }
602 values
603}