Skip to main content

compact_thrift_parquet/
lib.rs

1use std::hint::unreachable_unchecked;
2use std::io::{Read, Seek, SeekFrom, Error as IOError};
3use std::ops::Range;
4use compact_thrift_runtime::{CompactThriftProtocol, CompactThriftInputSlice, ThriftError};
5use crate::format::{ColumnChunk, ColumnIndex, FileMetaData, OffsetIndex};
6
7#[rustfmt::skip]
8pub mod format;
9mod encodings;
10mod path_in_schema;
11
12pub use encodings::EncodingSet;
13pub use path_in_schema::PathInSchema;
14
15#[derive(Debug)]
16pub enum ParquetError {
17    Thrift(ThriftError),
18    InvalidMagic,
19    Io(IOError),
20    InvalidOffset,
21    Schema(&'static str),
22    UnknownType(i32),
23}
24
25impl From<IOError> for ParquetError {
26    fn from(value: IOError) -> Self {
27        Self::Io(value)
28    }
29}
30
31impl From<ThriftError> for ParquetError {
32    fn from(value: ThriftError) -> Self {
33        Self::Thrift(value)
34    }
35}
36
37const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];
38
39#[expect(clippy::uninit_vec)]
40pub fn get_metadata_chunk<R: Read + Seek>(input: &mut R) -> Result<Vec<u8>, ParquetError> {
41    let _pos = input.seek(SeekFrom::End(-8))?;
42    let mut footer = [0_u8; 8];
43    input.read_exact(&mut footer)?;
44    if footer[4..] != PARQUET_MAGIC {
45        return Err(ParquetError::InvalidMagic);
46    }
47
48    let metadata_len = u32::from_le_bytes(footer[0..4].try_into().unwrap()) as usize;
49    input.seek(SeekFrom::End(-(metadata_len as i64) - 8))?;
50
51    let mut buf = Vec::with_capacity(metadata_len);
52    // Safety: Readers should not look at uninitialized bytes, only write into the slice
53    unsafe {
54        buf.set_len(metadata_len);
55    }
56
57    input.read_exact(&mut buf)?;
58    Ok(buf)
59}
60
61pub fn get_page_index_range(file_metadata: &FileMetaData) -> Result<Option<Range<usize>>, ParquetError> {
62    let mut min = usize::MAX;
63    let mut max = 0_usize;
64
65    for rg in &file_metadata.row_groups {
66        for c in &rg.columns {
67            if let (Some(oo), Some(ol), Some(co), Some(cl)) = (c.offset_index_offset, c.offset_index_length, c.column_index_offset, c.column_index_length) {
68                if oo < 0 || ol < 0 || co < 0 || cl < 0 {
69                    return Err(ParquetError::InvalidOffset);
70                }
71
72                min = min.min(oo as usize);
73                max = max.max(oo as usize + ol as usize);
74
75                min = min.min(co as usize);
76                max = max.max(co as usize + cl as usize);
77            }
78        }
79    }
80
81    if min < max {
82        Ok(Some(min..max))
83    } else {
84        Ok(None)
85    }
86}
87
88#[expect(clippy::uninit_vec)]
89#[expect(clippy::type_complexity)]
90pub fn get_page_index_chunk<R: Read + Seek>(input: &mut R, file_metadata: &FileMetaData) -> Result<Option<(Vec<u8>, Range<usize>)>, ParquetError> {
91    if let Some(range) = get_page_index_range(file_metadata)? {
92        input.seek(SeekFrom::Start(range.start as u64))?;
93
94        let mut buf = Vec::with_capacity(range.len());
95        // Safety: Readers should not look at uninitialized bytes, only write into the slice
96        unsafe {
97            buf.set_len(range.len());
98        }
99
100        input.read_exact(&mut buf)?;
101
102        Ok(Some((buf, range)))
103    } else {
104        Ok(None)
105    }
106}
107
108fn read_page_index_for_column_chunk(chunk: &[u8], chunk_offset: usize, column_chunks: &[ColumnChunk]) -> Result<Vec<Option<(OffsetIndex, ColumnIndex)>>, ParquetError> {
109    column_chunks.iter().map(|cc| {
110        let offset_index_range = if let (Some(offset), Some(length)) = (cc.offset_index_offset, cc.offset_index_length) {
111            assert!(offset as usize >= chunk_offset);
112            assert!(length > 0);
113            let start = offset as usize - chunk_offset;
114            start..start+length as usize
115        } else {
116            return Ok(None)
117        };
118        let column_index_range = if let (Some(offset), Some(length)) = (cc.column_index_offset, cc.column_index_length) {
119            assert!(offset as usize >= chunk_offset);
120            assert!(length > 0);
121            let start = offset as usize - chunk_offset;
122            start..start+length as usize
123        } else {
124            return Ok(None)
125        };
126
127        let mut res = Ok(Some((OffsetIndex::default(), ColumnIndex::default())));
128
129        match &mut res {
130            Ok(Some((offset_index, column_index))) => {
131                offset_index.fill_thrift(&mut CompactThriftInputSlice::new(&chunk[offset_index_range]))?;
132                column_index.fill_thrift(&mut CompactThriftInputSlice::new(&chunk[column_index_range]))?;
133            }
134            _ => unsafe { unreachable_unchecked() }
135        }
136
137        res
138    }).collect()
139}
140
141#[expect(clippy::type_complexity)]
142pub fn read_page_index(chunk: &[u8], chunk_offset: usize, file_metadata: &FileMetaData) -> Result<Vec<Vec<Option<(OffsetIndex, ColumnIndex)>>>, ParquetError> {
143    file_metadata.row_groups.iter().map(|rg| {
144        read_page_index_for_column_chunk(chunk, chunk_offset, &rg.columns)
145    }).collect()
146}
147
148#[cfg(test)]
149mod tests {
150    use std::fs::File;
151    use compact_thrift_runtime::{CompactThriftProtocol, CompactThriftInputSlice};
152    use crate::format::FileMetaData;
153    use crate::get_metadata_chunk;
154
155    #[test]
156    fn test_read_metadata() {
157        let mut file = File::open("data/alltypes_tiny_pages.parquet").unwrap();
158        let metadata_chunk = get_metadata_chunk(&mut file).unwrap();
159        dbg!(metadata_chunk.len());
160
161        let mut input = CompactThriftInputSlice::new(&metadata_chunk);
162        let metadata = FileMetaData::read_thrift(&mut input).unwrap();
163        dbg!(&metadata);
164    }
165}