1mod column;
2mod compression;
3mod indexes;
4pub mod levels;
5mod metadata;
6mod page;
7#[cfg(feature = "async")]
8mod stream;
9
10use std::io::{Read, Seek, SeekFrom};
11use std::sync::Arc;
12
13pub use column::*;
14pub use compression::{decompress, BasicDecompressor, Decompressor};
15pub use metadata::{deserialize_metadata, read_metadata};
16#[cfg(feature = "async")]
17#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
18pub use page::{get_page_stream, get_page_stream_from_column_start};
19pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader};
20
21#[cfg(feature = "async")]
22#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
23pub use stream::read_metadata as read_metadata_async;
24
25use crate::metadata::{ColumnChunkMetaData, RowGroupMetaData};
26use crate::{error::Result, metadata::FileMetaData};
27
28pub use indexes::{read_columns_indexes, read_pages_locations};
29
30pub fn filter_row_groups(
33 metadata: &FileMetaData,
34 predicate: &dyn Fn(&RowGroupMetaData, usize) -> bool,
35) -> FileMetaData {
36 let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
37 for (i, row_group_metadata) in metadata.row_groups.iter().enumerate() {
38 if predicate(row_group_metadata, i) {
39 filtered_row_groups.push(row_group_metadata.clone());
40 }
41 }
42 let mut metadata = metadata.clone();
43 metadata.row_groups = filtered_row_groups;
44 metadata
45}
46
47pub fn get_page_iterator<R: Read + Seek>(
49 column_chunk: &ColumnChunkMetaData,
50 mut reader: R,
51 pages_filter: Option<PageFilter>,
52 scratch: Vec<u8>,
53 max_page_size: usize,
54) -> Result<PageReader<R>> {
55 let pages_filter = pages_filter.unwrap_or_else(|| Arc::new(|_, _| true));
56
57 let (col_start, _) = column_chunk.byte_range();
58 reader.seek(SeekFrom::Start(col_start))?;
59 Ok(PageReader::new(
60 reader,
61 column_chunk,
62 pages_filter,
63 scratch,
64 max_page_size,
65 ))
66}
67
68pub fn get_field_columns<'a>(
71 columns: &'a [ColumnChunkMetaData],
72 field_name: &'a str,
73) -> impl Iterator<Item = &'a ColumnChunkMetaData> {
74 columns
75 .iter()
76 .filter(move |x| x.descriptor().path_in_schema[0] == field_name)
77}
78
79#[cfg(test)]
80mod tests {
81 use std::fs::File;
82
83 use crate::FallibleStreamingIterator;
84
85 use super::*;
86
87 use crate::tests::get_path;
88
89 #[test]
90 fn basic() -> Result<()> {
91 let mut testdata = get_path();
92 testdata.push("alltypes_plain.parquet");
93 let mut file = File::open(testdata).unwrap();
94
95 let metadata = read_metadata(&mut file)?;
96
97 let row_group = 0;
98 let column = 0;
99 let column_metadata = &metadata.row_groups[row_group].columns()[column];
100 let buffer = vec![];
101 let mut iter = get_page_iterator(column_metadata, &mut file, None, buffer, 1024 * 1024)?;
102
103 let dict = iter.next().unwrap().unwrap();
104 assert_eq!(dict.num_values(), 0);
105 let page = iter.next().unwrap().unwrap();
106 assert_eq!(page.num_values(), 8);
107 Ok(())
108 }
109
110 #[test]
111 fn reuse_buffer() -> Result<()> {
112 let mut testdata = get_path();
113 testdata.push("alltypes_plain.snappy.parquet");
114 let mut file = File::open(testdata).unwrap();
115
116 let metadata = read_metadata(&mut file)?;
117
118 let row_group = 0;
119 let column = 0;
120 let column_metadata = &metadata.row_groups[row_group].columns()[column];
121 let buffer = vec![0];
122 let iterator = get_page_iterator(column_metadata, &mut file, None, buffer, 1024 * 1024)?;
123
124 let buffer = vec![];
125 let mut iterator = Decompressor::new(iterator, buffer);
126
127 let _dict = iterator.next()?.unwrap();
128 let _page = iterator.next()?.unwrap();
129
130 assert!(iterator.next()?.is_none());
131 let (a, b) = iterator.into_buffers();
132 assert_eq!(a.len(), 11); assert_eq!(b.len(), 9);
134
135 Ok(())
136 }
137
138 #[test]
139 fn reuse_buffer_decompress() -> Result<()> {
140 let mut testdata = get_path();
141 testdata.push("alltypes_plain.parquet");
142 let mut file = File::open(testdata).unwrap();
143
144 let metadata = read_metadata(&mut file)?;
145
146 let row_group = 0;
147 let column = 0;
148 let column_metadata = &metadata.row_groups[row_group].columns()[column];
149 let buffer = vec![1];
150 let iterator = get_page_iterator(column_metadata, &mut file, None, buffer, 1024 * 1024)?;
151
152 let buffer = vec![];
153 let mut iterator = Decompressor::new(iterator, buffer);
154
155 iterator.next()?.unwrap();
157 iterator.next()?.unwrap();
159
160 assert!(iterator.next()?.is_none());
161 let (a, b) = iterator.into_buffers();
162
163 assert_eq!(a.len(), 11);
164 assert_eq!(b.len(), 0); Ok(())
167 }
168
169 #[test]
170 fn column_iter() -> Result<()> {
171 let mut testdata = get_path();
172 testdata.push("alltypes_plain.parquet");
173 let mut file = File::open(testdata).unwrap();
174
175 let metadata = read_metadata(&mut file)?;
176
177 let row_group = 0;
178 let column = 0;
179 let column_metadata = &metadata.row_groups[row_group].columns()[column];
180 let iter: Vec<_> =
181 get_page_iterator(column_metadata, &mut file, None, vec![], usize::MAX)?.collect();
182
183 let field = metadata.schema().fields()[0].clone();
184 let mut iter = ReadColumnIterator::new(field, vec![(iter, column_metadata.clone())]);
185
186 loop {
187 match iter.advance()? {
188 State::Some(mut new_iter) => {
189 if let Some((pages, _descriptor)) = new_iter.get() {
190 let mut iterator = BasicDecompressor::new(pages, vec![]);
191 while let Some(_page) = iterator.next()? {
192 }
194 let _internal_buffer = iterator.into_inner();
195 }
196 iter = new_iter;
197 }
198 State::Finished(_buffer) => {
199 assert!(_buffer.is_empty()); break;
201 }
202 }
203 }
204 Ok(())
205 }
206
207 #[test]
208 fn basics_column_iterator() -> Result<()> {
209 let mut testdata = get_path();
210 testdata.push("alltypes_plain.parquet");
211 let mut file = File::open(testdata).unwrap();
212
213 let metadata = read_metadata(&mut file)?;
214
215 let mut iter = ColumnIterator::new(
216 file,
217 metadata.row_groups[0].columns().to_vec(),
218 None,
219 vec![],
220 usize::MAX, );
222
223 loop {
224 match iter.advance()? {
225 State::Some(mut new_iter) => {
226 if let Some((pages, _descriptor)) = new_iter.get() {
227 let mut iterator = BasicDecompressor::new(pages, vec![]);
228 while let Some(_page) = iterator.next()? {
229 }
231 let _internal_buffer = iterator.into_inner();
232 }
233 iter = new_iter;
234 }
235 State::Finished(_buffer) => {
236 assert!(_buffer.is_empty()); break;
238 }
239 }
240 }
241 Ok(())
242 }
243}