Skip to main content

polars_arrow/io/ipc/read/
reader.rs

1use std::io::{Read, Seek};
2
3use arrow_format::ipc::KeyValueRef;
4use polars_error::{PolarsResult, polars_err};
5use polars_utils::bool::UnsafeBool;
6
7use super::common::*;
8use super::file::{get_message_from_block, get_message_from_block_offset, get_record_batch};
9use super::{Dictionaries, FileMetadata, read_batch, read_file_dictionaries};
10use crate::array::Array;
11use crate::datatypes::ArrowSchema;
12use crate::record_batch::RecordBatchT;
13
14/// An iterator of [`RecordBatchT`]s from an Arrow IPC file.
15pub struct FileReader<R: Read + Seek> {
16    reader: R,
17    metadata: FileMetadata,
18    // the dictionaries are going to be read
19    dictionaries: Option<Dictionaries>,
20    current_block: usize,
21    projection: Option<ProjectionInfo>,
22    remaining: usize,
23    data_scratch: Vec<u8>,
24    message_scratch: Vec<u8>,
25    checked: UnsafeBool,
26}
27
28impl<R: Read + Seek> FileReader<R> {
29    /// Creates a new [`FileReader`]. Use `projection` to only take certain columns.
30    /// # Panic
31    /// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
32    pub fn new(
33        reader: R,
34        metadata: FileMetadata,
35        projection: Option<Vec<usize>>,
36        limit: Option<usize>,
37    ) -> Self {
38        let projection =
39            projection.map(|projection| prepare_projection(&metadata.schema, projection));
40        Self {
41            reader,
42            metadata,
43            dictionaries: Default::default(),
44            projection,
45            remaining: limit.unwrap_or(usize::MAX),
46            current_block: 0,
47            data_scratch: Default::default(),
48            message_scratch: Default::default(),
49            checked: Default::default(),
50        }
51    }
52
53    /// # Safety
54    /// Don't do expensive checks.
55    /// This means the data source has to be trusted to be correct.
56    pub unsafe fn unchecked(mut self) -> Self {
57        unsafe {
58            self.checked = UnsafeBool::new_false();
59        }
60        self
61    }
62
63    /// Creates a new [`FileReader`]. Use `projection` to only take certain columns.
64    /// # Panic
65    /// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
66    pub fn new_with_projection_info(
67        reader: R,
68        metadata: FileMetadata,
69        projection: Option<ProjectionInfo>,
70        limit: Option<usize>,
71    ) -> Self {
72        Self {
73            reader,
74            metadata,
75            dictionaries: Default::default(),
76            projection,
77            remaining: limit.unwrap_or(usize::MAX),
78            current_block: 0,
79            data_scratch: Default::default(),
80            message_scratch: Default::default(),
81            checked: Default::default(),
82        }
83    }
84
85    /// Return the schema of the file
86    pub fn schema(&self) -> &ArrowSchema {
87        self.projection
88            .as_ref()
89            .map(|x| &x.schema)
90            .unwrap_or(&self.metadata.schema)
91    }
92
93    /// Returns the [`FileMetadata`]
94    pub fn metadata(&self) -> &FileMetadata {
95        &self.metadata
96    }
97
98    /// Consumes this FileReader, returning the underlying reader
99    pub fn into_inner(self) -> R {
100        self.reader
101    }
102
103    pub fn set_current_block(&mut self, idx: usize) {
104        self.current_block = idx;
105    }
106
107    pub fn get_current_block(&self) -> usize {
108        self.current_block
109    }
110
111    /// Get the inner memory scratches so they can be reused in a new writer.
112    /// This can be utilized to save memory allocations for performance reasons.
113    pub fn take_projection_info(&mut self) -> Option<ProjectionInfo> {
114        std::mem::take(&mut self.projection)
115    }
116
117    /// Get the inner memory scratches so they can be reused in a new writer.
118    /// This can be utilized to save memory allocations for performance reasons.
119    pub fn take_scratches(&mut self) -> (Vec<u8>, Vec<u8>) {
120        (
121            std::mem::take(&mut self.data_scratch),
122            std::mem::take(&mut self.message_scratch),
123        )
124    }
125
126    /// Set the inner memory scratches so they can be reused in a new writer.
127    /// This can be utilized to save memory allocations for performance reasons.
128    pub fn set_scratches(&mut self, scratches: (Vec<u8>, Vec<u8>)) {
129        (self.data_scratch, self.message_scratch) = scratches;
130    }
131
132    pub fn read_dictionaries(&mut self) -> PolarsResult<()> {
133        if self.dictionaries.is_none() {
134            self.dictionaries = Some(read_file_dictionaries(
135                &mut self.reader,
136                &self.metadata,
137                &mut self.data_scratch,
138                self.checked,
139            )?);
140        };
141        Ok(())
142    }
143
144    /// Skip over blocks until we have seen at most `offset` rows, returning how many rows we are
145    /// still too see.  
146    ///
147    /// This will never go over the `offset`. Meaning that if the `offset < current_block.len()`,
148    /// the block will not be skipped.
149    pub fn skip_blocks_till_limit(&mut self, offset: u64) -> PolarsResult<u64> {
150        let mut remaining_offset = offset;
151
152        for (i, block) in self.metadata.blocks.iter().enumerate() {
153            let message =
154                get_message_from_block(&mut self.reader, block, &mut self.message_scratch)?;
155            let record_batch = get_record_batch(message)?;
156
157            let length = record_batch.length()?;
158            let length = length as u64;
159
160            if length > remaining_offset {
161                self.current_block = i;
162                return Ok(remaining_offset);
163            }
164
165            remaining_offset -= length;
166        }
167
168        self.current_block = self.metadata.blocks.len();
169        Ok(remaining_offset)
170    }
171
172    pub fn next_record_batch(
173        &mut self,
174    ) -> Option<PolarsResult<arrow_format::ipc::RecordBatchRef<'_>>> {
175        let block = self.metadata.blocks.get(self.current_block)?;
176        self.current_block += 1;
177        let message = get_message_from_block(&mut self.reader, block, &mut self.message_scratch);
178        Some(message.and_then(|m| get_record_batch(m)))
179    }
180}
181
182impl<R: Read + Seek> Iterator for FileReader<R> {
183    type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;
184
185    fn next(&mut self) -> Option<Self::Item> {
186        // get current block
187        if self.current_block == self.metadata.blocks.len() {
188            return None;
189        }
190
191        match self.read_dictionaries() {
192            Ok(_) => {},
193            Err(e) => return Some(Err(e)),
194        };
195
196        let block = self.current_block;
197        self.current_block += 1;
198
199        let chunk = read_batch(
200            &mut self.reader,
201            self.dictionaries.as_ref().unwrap(),
202            &self.metadata,
203            self.projection.as_ref().map(|x| x.columns.as_ref()),
204            Some(self.remaining),
205            block,
206            false,
207            &mut self.message_scratch,
208            &mut self.data_scratch,
209            self.checked,
210        );
211        self.remaining -= chunk.as_ref().map(|x| x.len()).unwrap_or_default();
212
213        let chunk = if let Some(ProjectionInfo { map, .. }) = &self.projection {
214            // re-order according to projection
215            chunk.map(|chunk| apply_projection(chunk, map))
216        } else {
217            chunk
218        };
219        Some(chunk)
220    }
221}
222
223/// A reader that has access to exactly one standalone IPC Block of an Arrow IPC file.
224/// The block contains either a `RecordBatch` or a `DictionaryBatch`.
225/// The `dictionaries` field must be initialized prior to decoding a `RecordBatch`.
226pub struct BlockReader<R: Read + Seek> {
227    pub reader: R,
228}
229
230impl<R: Read + Seek> BlockReader<R> {
231    pub fn new(reader: R) -> Self {
232        Self { reader }
233    }
234
235    /// Reads the record batch header and returns its length (i.e., number of rows).
236    pub fn record_batch_num_rows(&mut self, message_scratch: &mut Vec<u8>) -> PolarsResult<usize> {
237        let offset: u64 = 0;
238
239        let message = get_message_from_block_offset(&mut self.reader, offset, message_scratch)?;
240        let batch = get_record_batch(message)?;
241        let out = batch.length().map(|l| usize::try_from(l).unwrap())?;
242        Ok(out)
243    }
244
245    /// Reads the record batch header and returns the custom_metadata.
246    pub fn record_batch_custom_metadata<'a>(
247        &mut self,
248        message_scratch: &'a mut Vec<u8>,
249    ) -> PolarsResult<Option<Vec<KeyValueRef<'a>>>> {
250        let offset: u64 = 0;
251        let message = get_message_from_block_offset(&mut self.reader, offset, message_scratch)?;
252        let custom_metadata = message.custom_metadata()?;
253
254        custom_metadata
255            .map(|kv_results| {
256                kv_results
257                    .into_iter()
258                    .map(|res| {
259                        res.map_err(|e| {
260                            polars_err!(
261                                ComputeError:
262                                "failed to get KeyValue from IPC custom metadata: {}",
263                                e
264                            )
265                        })
266                    })
267                    .collect::<Result<Vec<KeyValueRef>, _>>()
268            })
269            .transpose()
270    }
271}