arrow2/io/parquet/read/
file.rs1use std::io::{Read, Seek};
2
3use parquet2::indexes::FilteredPage;
4
5use crate::array::Array;
6use crate::chunk::Chunk;
7use crate::datatypes::Schema;
8use crate::error::Result;
9use crate::io::parquet::read::read_columns_many;
10
11use super::{RowGroupDeserializer, RowGroupMetaData};
12
13pub struct FileReader<R: Read + Seek> {
21 row_groups: RowGroupReader<R>,
22 remaining_rows: usize,
23 current_row_group: Option<RowGroupDeserializer>,
24}
25
26impl<R: Read + Seek> FileReader<R> {
27 pub fn new(
29 reader: R,
30 row_groups: Vec<RowGroupMetaData>,
31 schema: Schema,
32 chunk_size: Option<usize>,
33 limit: Option<usize>,
34 page_indexes: Option<Vec<Vec<Vec<Vec<FilteredPage>>>>>,
35 ) -> Self {
36 let row_groups =
37 RowGroupReader::new(reader, schema, row_groups, chunk_size, limit, page_indexes);
38
39 Self {
40 row_groups,
41 remaining_rows: limit.unwrap_or(usize::MAX),
42 current_row_group: None,
43 }
44 }
45
46 fn next_row_group(&mut self) -> Result<Option<RowGroupDeserializer>> {
47 let result = self.row_groups.next().transpose()?;
48
49 if self.current_row_group.is_some() {
51 self.remaining_rows = self.remaining_rows.saturating_sub(
52 result
53 .as_ref()
54 .map(|x| x.num_rows())
55 .unwrap_or(self.remaining_rows),
56 );
57 }
58 Ok(result)
59 }
60
61 pub fn schema(&self) -> &Schema {
63 &self.row_groups.schema
64 }
65}
66
67impl<R: Read + Seek> Iterator for FileReader<R> {
68 type Item = Result<Chunk<Box<dyn Array>>>;
69
70 fn next(&mut self) -> Option<Self::Item> {
71 if self.remaining_rows == 0 {
72 return None;
74 }
75
76 if let Some(row_group) = &mut self.current_row_group {
77 match row_group.next() {
78 None => match self.next_row_group() {
80 Ok(Some(row_group)) => {
81 self.current_row_group = Some(row_group);
82 self.next()
84 }
85 Ok(None) => {
86 self.current_row_group = None;
87 None
88 }
89 Err(e) => Some(Err(e)),
90 },
91 other => other,
92 }
93 } else {
94 match self.next_row_group() {
95 Ok(Some(row_group)) => {
96 self.current_row_group = Some(row_group);
97 self.next()
98 }
99 Ok(None) => {
100 self.current_row_group = None;
101 None
102 }
103 Err(e) => Some(Err(e)),
104 }
105 }
106 }
107}
108
109pub struct RowGroupReader<R: Read + Seek> {
115 reader: R,
116 schema: Schema,
117 row_groups: std::vec::IntoIter<RowGroupMetaData>,
118 chunk_size: Option<usize>,
119 remaining_rows: usize,
120 page_indexes: Option<std::vec::IntoIter<Vec<Vec<Vec<FilteredPage>>>>>,
121}
122
123impl<R: Read + Seek> RowGroupReader<R> {
124 pub fn new(
126 reader: R,
127 schema: Schema,
128 row_groups: Vec<RowGroupMetaData>,
129 chunk_size: Option<usize>,
130 limit: Option<usize>,
131 page_indexes: Option<Vec<Vec<Vec<Vec<FilteredPage>>>>>,
132 ) -> Self {
133 if let Some(pages) = &page_indexes {
134 assert_eq!(pages.len(), row_groups.len())
135 }
136 Self {
137 reader,
138 schema,
139 row_groups: row_groups.into_iter(),
140 chunk_size,
141 remaining_rows: limit.unwrap_or(usize::MAX),
142 page_indexes: page_indexes.map(|pages| pages.into_iter()),
143 }
144 }
145
146 #[inline]
147 fn _next(&mut self) -> Result<Option<RowGroupDeserializer>> {
148 if self.schema.fields.is_empty() {
149 return Ok(None);
150 }
151 if self.remaining_rows == 0 {
152 return Ok(None);
154 }
155
156 let row_group = if let Some(row_group) = self.row_groups.next() {
157 row_group
158 } else {
159 return Ok(None);
160 };
161
162 let pages = self.page_indexes.as_mut().and_then(|iter| iter.next());
163
164 let num_rows = pages
166 .as_ref()
167 .map(|x| {
168 x[0][0]
170 .iter()
171 .map(|page| {
172 page.selected_rows
173 .iter()
174 .map(|interval| interval.length)
175 .sum::<usize>()
176 })
177 .sum()
178 })
179 .unwrap_or_else(|| row_group.num_rows());
180
181 let column_chunks = read_columns_many(
182 &mut self.reader,
183 &row_group,
184 self.schema.fields.clone(),
185 self.chunk_size,
186 Some(self.remaining_rows),
187 pages,
188 )?;
189
190 let result = RowGroupDeserializer::new(column_chunks, num_rows, Some(self.remaining_rows));
191 self.remaining_rows = self.remaining_rows.saturating_sub(num_rows);
192 Ok(Some(result))
193 }
194}
195
196impl<R: Read + Seek> Iterator for RowGroupReader<R> {
197 type Item = Result<RowGroupDeserializer>;
198
199 fn next(&mut self) -> Option<Self::Item> {
200 self._next().transpose()
201 }
202
203 fn size_hint(&self) -> (usize, Option<usize>) {
204 self.row_groups.size_hint()
205 }
206}