arrow2/io/parquet/read/
file.rs

1use std::io::{Read, Seek};
2
3use parquet2::indexes::FilteredPage;
4
5use crate::array::Array;
6use crate::chunk::Chunk;
7use crate::datatypes::Schema;
8use crate::error::Result;
9use crate::io::parquet::read::read_columns_many;
10
11use super::{RowGroupDeserializer, RowGroupMetaData};
12
13/// An iterator of [`Chunk`]s coming from row groups of a parquet file.
14///
15/// This can be thought of a flatten chain of [`Iterator<Item=Chunk>`] - each row group is sequentially
16/// mapped to an [`Iterator<Item=Chunk>`] and each iterator is iterated upon until either the limit
17/// or the last iterator ends.
18/// # Implementation
19/// This iterator is single threaded on both IO-bounded and CPU-bounded tasks, and mixes them.
20pub struct FileReader<R: Read + Seek> {
21    row_groups: RowGroupReader<R>,
22    remaining_rows: usize,
23    current_row_group: Option<RowGroupDeserializer>,
24}
25
26impl<R: Read + Seek> FileReader<R> {
27    /// Returns a new [`FileReader`].
28    pub fn new(
29        reader: R,
30        row_groups: Vec<RowGroupMetaData>,
31        schema: Schema,
32        chunk_size: Option<usize>,
33        limit: Option<usize>,
34        page_indexes: Option<Vec<Vec<Vec<Vec<FilteredPage>>>>>,
35    ) -> Self {
36        let row_groups =
37            RowGroupReader::new(reader, schema, row_groups, chunk_size, limit, page_indexes);
38
39        Self {
40            row_groups,
41            remaining_rows: limit.unwrap_or(usize::MAX),
42            current_row_group: None,
43        }
44    }
45
46    fn next_row_group(&mut self) -> Result<Option<RowGroupDeserializer>> {
47        let result = self.row_groups.next().transpose()?;
48
49        // If current_row_group is None, then there will be no elements to remove.
50        if self.current_row_group.is_some() {
51            self.remaining_rows = self.remaining_rows.saturating_sub(
52                result
53                    .as_ref()
54                    .map(|x| x.num_rows())
55                    .unwrap_or(self.remaining_rows),
56            );
57        }
58        Ok(result)
59    }
60
61    /// Returns the [`Schema`] associated to this file.
62    pub fn schema(&self) -> &Schema {
63        &self.row_groups.schema
64    }
65}
66
67impl<R: Read + Seek> Iterator for FileReader<R> {
68    type Item = Result<Chunk<Box<dyn Array>>>;
69
70    fn next(&mut self) -> Option<Self::Item> {
71        if self.remaining_rows == 0 {
72            // reached the limit
73            return None;
74        }
75
76        if let Some(row_group) = &mut self.current_row_group {
77            match row_group.next() {
78                // no more chunks in the current row group => try a new one
79                None => match self.next_row_group() {
80                    Ok(Some(row_group)) => {
81                        self.current_row_group = Some(row_group);
82                        // new found => pull again
83                        self.next()
84                    }
85                    Ok(None) => {
86                        self.current_row_group = None;
87                        None
88                    }
89                    Err(e) => Some(Err(e)),
90                },
91                other => other,
92            }
93        } else {
94            match self.next_row_group() {
95                Ok(Some(row_group)) => {
96                    self.current_row_group = Some(row_group);
97                    self.next()
98                }
99                Ok(None) => {
100                    self.current_row_group = None;
101                    None
102                }
103                Err(e) => Some(Err(e)),
104            }
105        }
106    }
107}
108
109/// An [`Iterator<Item=RowGroupDeserializer>`] from row groups of a parquet file.
110///
111/// # Implementation
112/// Advancing this iterator is IO-bounded - each iteration reads all the column chunks from the file
113/// to memory and attaches [`RowGroupDeserializer`] to them so that they can be iterated in chunks.
114pub struct RowGroupReader<R: Read + Seek> {
115    reader: R,
116    schema: Schema,
117    row_groups: std::vec::IntoIter<RowGroupMetaData>,
118    chunk_size: Option<usize>,
119    remaining_rows: usize,
120    page_indexes: Option<std::vec::IntoIter<Vec<Vec<Vec<FilteredPage>>>>>,
121}
122
123impl<R: Read + Seek> RowGroupReader<R> {
124    /// Returns a new [`RowGroupReader`]
125    pub fn new(
126        reader: R,
127        schema: Schema,
128        row_groups: Vec<RowGroupMetaData>,
129        chunk_size: Option<usize>,
130        limit: Option<usize>,
131        page_indexes: Option<Vec<Vec<Vec<Vec<FilteredPage>>>>>,
132    ) -> Self {
133        if let Some(pages) = &page_indexes {
134            assert_eq!(pages.len(), row_groups.len())
135        }
136        Self {
137            reader,
138            schema,
139            row_groups: row_groups.into_iter(),
140            chunk_size,
141            remaining_rows: limit.unwrap_or(usize::MAX),
142            page_indexes: page_indexes.map(|pages| pages.into_iter()),
143        }
144    }
145
146    #[inline]
147    fn _next(&mut self) -> Result<Option<RowGroupDeserializer>> {
148        if self.schema.fields.is_empty() {
149            return Ok(None);
150        }
151        if self.remaining_rows == 0 {
152            // reached the limit
153            return Ok(None);
154        }
155
156        let row_group = if let Some(row_group) = self.row_groups.next() {
157            row_group
158        } else {
159            return Ok(None);
160        };
161
162        let pages = self.page_indexes.as_mut().and_then(|iter| iter.next());
163
164        // the number of rows depends on whether indexes are selected or not.
165        let num_rows = pages
166            .as_ref()
167            .map(|x| {
168                // first field, first column within that field
169                x[0][0]
170                    .iter()
171                    .map(|page| {
172                        page.selected_rows
173                            .iter()
174                            .map(|interval| interval.length)
175                            .sum::<usize>()
176                    })
177                    .sum()
178            })
179            .unwrap_or_else(|| row_group.num_rows());
180
181        let column_chunks = read_columns_many(
182            &mut self.reader,
183            &row_group,
184            self.schema.fields.clone(),
185            self.chunk_size,
186            Some(self.remaining_rows),
187            pages,
188        )?;
189
190        let result = RowGroupDeserializer::new(column_chunks, num_rows, Some(self.remaining_rows));
191        self.remaining_rows = self.remaining_rows.saturating_sub(num_rows);
192        Ok(Some(result))
193    }
194}
195
196impl<R: Read + Seek> Iterator for RowGroupReader<R> {
197    type Item = Result<RowGroupDeserializer>;
198
199    fn next(&mut self) -> Option<Self::Item> {
200        self._next().transpose()
201    }
202
203    fn size_hint(&self) -> (usize, Option<usize>) {
204        self.row_groups.size_hint()
205    }
206}