parquet2/read/
mod.rs

1mod column;
2mod compression;
3mod indexes;
4pub mod levels;
5mod metadata;
6mod page;
7#[cfg(feature = "async")]
8mod stream;
9
10use std::io::{Read, Seek, SeekFrom};
11use std::sync::Arc;
12
13pub use column::*;
14pub use compression::{decompress, BasicDecompressor, Decompressor};
15pub use metadata::{deserialize_metadata, read_metadata};
16#[cfg(feature = "async")]
17#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
18pub use page::{get_page_stream, get_page_stream_from_column_start};
19pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader};
20
21#[cfg(feature = "async")]
22#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
23pub use stream::read_metadata as read_metadata_async;
24
25use crate::metadata::{ColumnChunkMetaData, RowGroupMetaData};
26use crate::{error::Result, metadata::FileMetaData};
27
28pub use indexes::{read_columns_indexes, read_pages_locations};
29
30/// Filters row group metadata to only those row groups,
31/// for which the predicate function returns true
32pub fn filter_row_groups(
33    metadata: &FileMetaData,
34    predicate: &dyn Fn(&RowGroupMetaData, usize) -> bool,
35) -> FileMetaData {
36    let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
37    for (i, row_group_metadata) in metadata.row_groups.iter().enumerate() {
38        if predicate(row_group_metadata, i) {
39            filtered_row_groups.push(row_group_metadata.clone());
40        }
41    }
42    let mut metadata = metadata.clone();
43    metadata.row_groups = filtered_row_groups;
44    metadata
45}
46
47/// Returns a new [`PageReader`] by seeking `reader` to the begining of `column_chunk`.
48pub fn get_page_iterator<R: Read + Seek>(
49    column_chunk: &ColumnChunkMetaData,
50    mut reader: R,
51    pages_filter: Option<PageFilter>,
52    scratch: Vec<u8>,
53    max_page_size: usize,
54) -> Result<PageReader<R>> {
55    let pages_filter = pages_filter.unwrap_or_else(|| Arc::new(|_, _| true));
56
57    let (col_start, _) = column_chunk.byte_range();
58    reader.seek(SeekFrom::Start(col_start))?;
59    Ok(PageReader::new(
60        reader,
61        column_chunk,
62        pages_filter,
63        scratch,
64        max_page_size,
65    ))
66}
67
68/// Returns all [`ColumnChunkMetaData`] associated to `field_name`.
69/// For non-nested types, this returns an iterator with a single column
70pub fn get_field_columns<'a>(
71    columns: &'a [ColumnChunkMetaData],
72    field_name: &'a str,
73) -> impl Iterator<Item = &'a ColumnChunkMetaData> {
74    columns
75        .iter()
76        .filter(move |x| x.descriptor().path_in_schema[0] == field_name)
77}
78
79#[cfg(test)]
80mod tests {
81    use std::fs::File;
82
83    use crate::FallibleStreamingIterator;
84
85    use super::*;
86
87    use crate::tests::get_path;
88
89    #[test]
90    fn basic() -> Result<()> {
91        let mut testdata = get_path();
92        testdata.push("alltypes_plain.parquet");
93        let mut file = File::open(testdata).unwrap();
94
95        let metadata = read_metadata(&mut file)?;
96
97        let row_group = 0;
98        let column = 0;
99        let column_metadata = &metadata.row_groups[row_group].columns()[column];
100        let buffer = vec![];
101        let mut iter = get_page_iterator(column_metadata, &mut file, None, buffer, 1024 * 1024)?;
102
103        let dict = iter.next().unwrap().unwrap();
104        assert_eq!(dict.num_values(), 0);
105        let page = iter.next().unwrap().unwrap();
106        assert_eq!(page.num_values(), 8);
107        Ok(())
108    }
109
110    #[test]
111    fn reuse_buffer() -> Result<()> {
112        let mut testdata = get_path();
113        testdata.push("alltypes_plain.snappy.parquet");
114        let mut file = File::open(testdata).unwrap();
115
116        let metadata = read_metadata(&mut file)?;
117
118        let row_group = 0;
119        let column = 0;
120        let column_metadata = &metadata.row_groups[row_group].columns()[column];
121        let buffer = vec![0];
122        let iterator = get_page_iterator(column_metadata, &mut file, None, buffer, 1024 * 1024)?;
123
124        let buffer = vec![];
125        let mut iterator = Decompressor::new(iterator, buffer);
126
127        let _dict = iterator.next()?.unwrap();
128        let _page = iterator.next()?.unwrap();
129
130        assert!(iterator.next()?.is_none());
131        let (a, b) = iterator.into_buffers();
132        assert_eq!(a.len(), 11); // note: compressed is higher in this example.
133        assert_eq!(b.len(), 9);
134
135        Ok(())
136    }
137
138    #[test]
139    fn reuse_buffer_decompress() -> Result<()> {
140        let mut testdata = get_path();
141        testdata.push("alltypes_plain.parquet");
142        let mut file = File::open(testdata).unwrap();
143
144        let metadata = read_metadata(&mut file)?;
145
146        let row_group = 0;
147        let column = 0;
148        let column_metadata = &metadata.row_groups[row_group].columns()[column];
149        let buffer = vec![1];
150        let iterator = get_page_iterator(column_metadata, &mut file, None, buffer, 1024 * 1024)?;
151
152        let buffer = vec![];
153        let mut iterator = Decompressor::new(iterator, buffer);
154
155        // dict
156        iterator.next()?.unwrap();
157        // page
158        iterator.next()?.unwrap();
159
160        assert!(iterator.next()?.is_none());
161        let (a, b) = iterator.into_buffers();
162
163        assert_eq!(a.len(), 11);
164        assert_eq!(b.len(), 0); // the decompressed buffer is never used because it is always swapped with the other buffer.
165
166        Ok(())
167    }
168
169    #[test]
170    fn column_iter() -> Result<()> {
171        let mut testdata = get_path();
172        testdata.push("alltypes_plain.parquet");
173        let mut file = File::open(testdata).unwrap();
174
175        let metadata = read_metadata(&mut file)?;
176
177        let row_group = 0;
178        let column = 0;
179        let column_metadata = &metadata.row_groups[row_group].columns()[column];
180        let iter: Vec<_> =
181            get_page_iterator(column_metadata, &mut file, None, vec![], usize::MAX)?.collect();
182
183        let field = metadata.schema().fields()[0].clone();
184        let mut iter = ReadColumnIterator::new(field, vec![(iter, column_metadata.clone())]);
185
186        loop {
187            match iter.advance()? {
188                State::Some(mut new_iter) => {
189                    if let Some((pages, _descriptor)) = new_iter.get() {
190                        let mut iterator = BasicDecompressor::new(pages, vec![]);
191                        while let Some(_page) = iterator.next()? {
192                            // do something with it
193                        }
194                        let _internal_buffer = iterator.into_inner();
195                    }
196                    iter = new_iter;
197                }
198                State::Finished(_buffer) => {
199                    assert!(_buffer.is_empty()); // data is uncompressed => buffer is always moved
200                    break;
201                }
202            }
203        }
204        Ok(())
205    }
206
207    #[test]
208    fn basics_column_iterator() -> Result<()> {
209        let mut testdata = get_path();
210        testdata.push("alltypes_plain.parquet");
211        let mut file = File::open(testdata).unwrap();
212
213        let metadata = read_metadata(&mut file)?;
214
215        let mut iter = ColumnIterator::new(
216            file,
217            metadata.row_groups[0].columns().to_vec(),
218            None,
219            vec![],
220            usize::MAX, // we trust the file is correct
221        );
222
223        loop {
224            match iter.advance()? {
225                State::Some(mut new_iter) => {
226                    if let Some((pages, _descriptor)) = new_iter.get() {
227                        let mut iterator = BasicDecompressor::new(pages, vec![]);
228                        while let Some(_page) = iterator.next()? {
229                            // do something with it
230                        }
231                        let _internal_buffer = iterator.into_inner();
232                    }
233                    iter = new_iter;
234                }
235                State::Finished(_buffer) => {
236                    assert!(_buffer.is_empty()); // data is uncompressed => buffer is always moved
237                    break;
238                }
239            }
240        }
241        Ok(())
242    }
243}