compact_thrift_parquet/
lib.rs1use std::hint::unreachable_unchecked;
2use std::io::{Read, Seek, SeekFrom, Error as IOError};
3use std::ops::Range;
4use compact_thrift_runtime::{CompactThriftProtocol, CompactThriftInputSlice, ThriftError};
5use crate::format::{ColumnChunk, ColumnIndex, FileMetaData, OffsetIndex};
6
7#[rustfmt::skip]
8pub mod format;
9mod encodings;
10mod path_in_schema;
11
12pub use encodings::EncodingSet;
13pub use path_in_schema::PathInSchema;
14
15#[derive(Debug)]
16pub enum ParquetError {
17 Thrift(ThriftError),
18 InvalidMagic,
19 Io(IOError),
20 InvalidOffset,
21 Schema(&'static str),
22 UnknownType(i32),
23}
24
25impl From<IOError> for ParquetError {
26 fn from(value: IOError) -> Self {
27 Self::Io(value)
28 }
29}
30
31impl From<ThriftError> for ParquetError {
32 fn from(value: ThriftError) -> Self {
33 Self::Thrift(value)
34 }
35}
36
37const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];
38
39#[expect(clippy::uninit_vec)]
40pub fn get_metadata_chunk<R: Read + Seek>(input: &mut R) -> Result<Vec<u8>, ParquetError> {
41 let _pos = input.seek(SeekFrom::End(-8))?;
42 let mut footer = [0_u8; 8];
43 input.read_exact(&mut footer)?;
44 if footer[4..] != PARQUET_MAGIC {
45 return Err(ParquetError::InvalidMagic);
46 }
47
48 let metadata_len = u32::from_le_bytes(footer[0..4].try_into().unwrap()) as usize;
49 input.seek(SeekFrom::End(-(metadata_len as i64) - 8))?;
50
51 let mut buf = Vec::with_capacity(metadata_len);
52 unsafe {
54 buf.set_len(metadata_len);
55 }
56
57 input.read_exact(&mut buf)?;
58 Ok(buf)
59}
60
61pub fn get_page_index_range(file_metadata: &FileMetaData) -> Result<Option<Range<usize>>, ParquetError> {
62 let mut min = usize::MAX;
63 let mut max = 0_usize;
64
65 for rg in &file_metadata.row_groups {
66 for c in &rg.columns {
67 if let (Some(oo), Some(ol), Some(co), Some(cl)) = (c.offset_index_offset, c.offset_index_length, c.column_index_offset, c.column_index_length) {
68 if oo < 0 || ol < 0 || co < 0 || cl < 0 {
69 return Err(ParquetError::InvalidOffset);
70 }
71
72 min = min.min(oo as usize);
73 max = max.max(oo as usize + ol as usize);
74
75 min = min.min(co as usize);
76 max = max.max(co as usize + cl as usize);
77 }
78 }
79 }
80
81 if min < max {
82 Ok(Some(min..max))
83 } else {
84 Ok(None)
85 }
86}
87
88#[expect(clippy::uninit_vec)]
89#[expect(clippy::type_complexity)]
90pub fn get_page_index_chunk<R: Read + Seek>(input: &mut R, file_metadata: &FileMetaData) -> Result<Option<(Vec<u8>, Range<usize>)>, ParquetError> {
91 if let Some(range) = get_page_index_range(file_metadata)? {
92 input.seek(SeekFrom::Start(range.start as u64))?;
93
94 let mut buf = Vec::with_capacity(range.len());
95 unsafe {
97 buf.set_len(range.len());
98 }
99
100 input.read_exact(&mut buf)?;
101
102 Ok(Some((buf, range)))
103 } else {
104 Ok(None)
105 }
106}
107
108fn read_page_index_for_column_chunk(chunk: &[u8], chunk_offset: usize, column_chunks: &[ColumnChunk]) -> Result<Vec<Option<(OffsetIndex, ColumnIndex)>>, ParquetError> {
109 column_chunks.iter().map(|cc| {
110 let offset_index_range = if let (Some(offset), Some(length)) = (cc.offset_index_offset, cc.offset_index_length) {
111 assert!(offset as usize >= chunk_offset);
112 assert!(length > 0);
113 let start = offset as usize - chunk_offset;
114 start..start+length as usize
115 } else {
116 return Ok(None)
117 };
118 let column_index_range = if let (Some(offset), Some(length)) = (cc.column_index_offset, cc.column_index_length) {
119 assert!(offset as usize >= chunk_offset);
120 assert!(length > 0);
121 let start = offset as usize - chunk_offset;
122 start..start+length as usize
123 } else {
124 return Ok(None)
125 };
126
127 let mut res = Ok(Some((OffsetIndex::default(), ColumnIndex::default())));
128
129 match &mut res {
130 Ok(Some((offset_index, column_index))) => {
131 offset_index.fill_thrift(&mut CompactThriftInputSlice::new(&chunk[offset_index_range]))?;
132 column_index.fill_thrift(&mut CompactThriftInputSlice::new(&chunk[column_index_range]))?;
133 }
134 _ => unsafe { unreachable_unchecked() }
135 }
136
137 res
138 }).collect()
139}
140
141#[expect(clippy::type_complexity)]
142pub fn read_page_index(chunk: &[u8], chunk_offset: usize, file_metadata: &FileMetaData) -> Result<Vec<Vec<Option<(OffsetIndex, ColumnIndex)>>>, ParquetError> {
143 file_metadata.row_groups.iter().map(|rg| {
144 read_page_index_for_column_chunk(chunk, chunk_offset, &rg.columns)
145 }).collect()
146}
147
148#[cfg(test)]
149mod tests {
150 use std::fs::File;
151 use compact_thrift_runtime::{CompactThriftProtocol, CompactThriftInputSlice};
152 use crate::format::FileMetaData;
153 use crate::get_metadata_chunk;
154
155 #[test]
156 fn test_read_metadata() {
157 let mut file = File::open("data/alltypes_tiny_pages.parquet").unwrap();
158 let metadata_chunk = get_metadata_chunk(&mut file).unwrap();
159 dbg!(metadata_chunk.len());
160
161 let mut input = CompactThriftInputSlice::new(&metadata_chunk);
162 let metadata = FileMetaData::read_thrift(&mut input).unwrap();
163 dbg!(&metadata);
164 }
165}