polars_arrow/io/ipc/read/
reader.rs1use std::io::{Read, Seek};
2
3use arrow_format::ipc::KeyValueRef;
4use polars_error::{PolarsResult, polars_err};
5use polars_utils::bool::UnsafeBool;
6
7use super::common::*;
8use super::file::{get_message_from_block, get_message_from_block_offset, get_record_batch};
9use super::{Dictionaries, FileMetadata, read_batch, read_file_dictionaries};
10use crate::array::Array;
11use crate::datatypes::ArrowSchema;
12use crate::record_batch::RecordBatchT;
13
14pub struct FileReader<R: Read + Seek> {
16 reader: R,
17 metadata: FileMetadata,
18 dictionaries: Option<Dictionaries>,
20 current_block: usize,
21 projection: Option<ProjectionInfo>,
22 remaining: usize,
23 data_scratch: Vec<u8>,
24 message_scratch: Vec<u8>,
25 checked: UnsafeBool,
26}
27
28impl<R: Read + Seek> FileReader<R> {
29 pub fn new(
33 reader: R,
34 metadata: FileMetadata,
35 projection: Option<Vec<usize>>,
36 limit: Option<usize>,
37 ) -> Self {
38 let projection =
39 projection.map(|projection| prepare_projection(&metadata.schema, projection));
40 Self {
41 reader,
42 metadata,
43 dictionaries: Default::default(),
44 projection,
45 remaining: limit.unwrap_or(usize::MAX),
46 current_block: 0,
47 data_scratch: Default::default(),
48 message_scratch: Default::default(),
49 checked: Default::default(),
50 }
51 }
52
53 pub unsafe fn unchecked(mut self) -> Self {
57 unsafe {
58 self.checked = UnsafeBool::new_false();
59 }
60 self
61 }
62
63 pub fn new_with_projection_info(
67 reader: R,
68 metadata: FileMetadata,
69 projection: Option<ProjectionInfo>,
70 limit: Option<usize>,
71 ) -> Self {
72 Self {
73 reader,
74 metadata,
75 dictionaries: Default::default(),
76 projection,
77 remaining: limit.unwrap_or(usize::MAX),
78 current_block: 0,
79 data_scratch: Default::default(),
80 message_scratch: Default::default(),
81 checked: Default::default(),
82 }
83 }
84
85 pub fn schema(&self) -> &ArrowSchema {
87 self.projection
88 .as_ref()
89 .map(|x| &x.schema)
90 .unwrap_or(&self.metadata.schema)
91 }
92
93 pub fn metadata(&self) -> &FileMetadata {
95 &self.metadata
96 }
97
98 pub fn into_inner(self) -> R {
100 self.reader
101 }
102
103 pub fn set_current_block(&mut self, idx: usize) {
104 self.current_block = idx;
105 }
106
107 pub fn get_current_block(&self) -> usize {
108 self.current_block
109 }
110
111 pub fn take_projection_info(&mut self) -> Option<ProjectionInfo> {
114 std::mem::take(&mut self.projection)
115 }
116
117 pub fn take_scratches(&mut self) -> (Vec<u8>, Vec<u8>) {
120 (
121 std::mem::take(&mut self.data_scratch),
122 std::mem::take(&mut self.message_scratch),
123 )
124 }
125
126 pub fn set_scratches(&mut self, scratches: (Vec<u8>, Vec<u8>)) {
129 (self.data_scratch, self.message_scratch) = scratches;
130 }
131
132 pub fn read_dictionaries(&mut self) -> PolarsResult<()> {
133 if self.dictionaries.is_none() {
134 self.dictionaries = Some(read_file_dictionaries(
135 &mut self.reader,
136 &self.metadata,
137 &mut self.data_scratch,
138 self.checked,
139 )?);
140 };
141 Ok(())
142 }
143
144 pub fn skip_blocks_till_limit(&mut self, offset: u64) -> PolarsResult<u64> {
150 let mut remaining_offset = offset;
151
152 for (i, block) in self.metadata.blocks.iter().enumerate() {
153 let message =
154 get_message_from_block(&mut self.reader, block, &mut self.message_scratch)?;
155 let record_batch = get_record_batch(message)?;
156
157 let length = record_batch.length()?;
158 let length = length as u64;
159
160 if length > remaining_offset {
161 self.current_block = i;
162 return Ok(remaining_offset);
163 }
164
165 remaining_offset -= length;
166 }
167
168 self.current_block = self.metadata.blocks.len();
169 Ok(remaining_offset)
170 }
171
172 pub fn next_record_batch(
173 &mut self,
174 ) -> Option<PolarsResult<arrow_format::ipc::RecordBatchRef<'_>>> {
175 let block = self.metadata.blocks.get(self.current_block)?;
176 self.current_block += 1;
177 let message = get_message_from_block(&mut self.reader, block, &mut self.message_scratch);
178 Some(message.and_then(|m| get_record_batch(m)))
179 }
180}
181
182impl<R: Read + Seek> Iterator for FileReader<R> {
183 type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;
184
185 fn next(&mut self) -> Option<Self::Item> {
186 if self.current_block == self.metadata.blocks.len() {
188 return None;
189 }
190
191 match self.read_dictionaries() {
192 Ok(_) => {},
193 Err(e) => return Some(Err(e)),
194 };
195
196 let block = self.current_block;
197 self.current_block += 1;
198
199 let chunk = read_batch(
200 &mut self.reader,
201 self.dictionaries.as_ref().unwrap(),
202 &self.metadata,
203 self.projection.as_ref().map(|x| x.columns.as_ref()),
204 Some(self.remaining),
205 block,
206 false,
207 &mut self.message_scratch,
208 &mut self.data_scratch,
209 self.checked,
210 );
211 self.remaining -= chunk.as_ref().map(|x| x.len()).unwrap_or_default();
212
213 let chunk = if let Some(ProjectionInfo { map, .. }) = &self.projection {
214 chunk.map(|chunk| apply_projection(chunk, map))
216 } else {
217 chunk
218 };
219 Some(chunk)
220 }
221}
222
223pub struct BlockReader<R: Read + Seek> {
227 pub reader: R,
228}
229
230impl<R: Read + Seek> BlockReader<R> {
231 pub fn new(reader: R) -> Self {
232 Self { reader }
233 }
234
235 pub fn record_batch_num_rows(&mut self, message_scratch: &mut Vec<u8>) -> PolarsResult<usize> {
237 let offset: u64 = 0;
238
239 let message = get_message_from_block_offset(&mut self.reader, offset, message_scratch)?;
240 let batch = get_record_batch(message)?;
241 let out = batch.length().map(|l| usize::try_from(l).unwrap())?;
242 Ok(out)
243 }
244
245 pub fn record_batch_custom_metadata<'a>(
247 &mut self,
248 message_scratch: &'a mut Vec<u8>,
249 ) -> PolarsResult<Option<Vec<KeyValueRef<'a>>>> {
250 let offset: u64 = 0;
251 let message = get_message_from_block_offset(&mut self.reader, offset, message_scratch)?;
252 let custom_metadata = message.custom_metadata()?;
253
254 custom_metadata
255 .map(|kv_results| {
256 kv_results
257 .into_iter()
258 .map(|res| {
259 res.map_err(|e| {
260 polars_err!(
261 ComputeError:
262 "failed to get KeyValue from IPC custom metadata: {}",
263 e
264 )
265 })
266 })
267 .collect::<Result<Vec<KeyValueRef>, _>>()
268 })
269 .transpose()
270 }
271}