polars_arrow/io/ipc/read/
file.rs1use std::io::{Read, Seek, SeekFrom};
2use std::sync::Arc;
3
4use arrow_format::ipc::FooterRef;
5use arrow_format::ipc::planus::ReadAsRoot;
6use polars_error::{PolarsResult, polars_bail, polars_err};
7use polars_utils::aliases::{InitHashMaps, PlHashMap};
8use polars_utils::bool::UnsafeBool;
9
10use super::super::{ARROW_MAGIC_V1, ARROW_MAGIC_V2, CONTINUATION_MARKER};
11use super::common::*;
12use super::schema::fb_to_schema;
13use super::{Dictionaries, OutOfSpecKind, SendableIterator};
14use crate::array::Array;
15use crate::datatypes::{ArrowSchemaRef, Metadata};
16use crate::io::ipc::IpcSchema;
17use crate::record_batch::RecordBatchT;
18
19#[derive(Debug, Clone)]
21pub struct FileMetadata {
22 pub schema: ArrowSchemaRef,
24
25 pub custom_schema_metadata: Option<Arc<Metadata>>,
27
28 pub ipc_schema: IpcSchema,
30
31 pub blocks: Vec<arrow_format::ipc::Block>,
35
36 pub dictionaries: Option<Vec<arrow_format::ipc::Block>>,
38
39 pub size: u64,
41}
42
43pub fn get_row_count<R: Read + Seek>(reader: &mut R) -> PolarsResult<i64> {
45 let (_, footer_len) = read_footer_len(reader)?;
46 let footer = read_footer(reader, footer_len)?;
47 let (_, blocks) = deserialize_footer_blocks(&footer)?;
48
49 get_row_count_from_blocks(reader, &blocks)
50}
51
52pub fn get_row_count_from_blocks<R: Read + Seek>(
54 reader: &mut R,
55 blocks: &[arrow_format::ipc::Block],
56) -> PolarsResult<i64> {
57 let mut message_scratch: Vec<u8> = Default::default();
58
59 blocks
60 .iter()
61 .map(|block| {
62 let message = get_message_from_block(reader, block, &mut message_scratch)?;
63 let record_batch = get_record_batch(message)?;
64 record_batch.length().map_err(|e| e.into())
65 })
66 .sum()
67}
68
69pub(crate) fn get_dictionary_batch<'a>(
70 message: &'a arrow_format::ipc::MessageRef,
71) -> PolarsResult<arrow_format::ipc::DictionaryBatchRef<'a>> {
72 let header = message
73 .header()
74 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
75 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
76 match header {
77 arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => Ok(batch),
78 _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
79 }
80}
81
82#[allow(clippy::too_many_arguments)]
83pub fn read_dictionary_block<R: Read + Seek>(
84 reader: &mut R,
85 metadata: &FileMetadata,
86 block: &arrow_format::ipc::Block,
87 force_zero_offset: bool,
90 dictionaries: &mut Dictionaries,
91 message_scratch: &mut Vec<u8>,
92 dictionary_scratch: &mut Vec<u8>,
93 checked: UnsafeBool,
94) -> PolarsResult<()> {
95 let offset: u64 = if force_zero_offset {
96 0
97 } else {
98 block
99 .offset
100 .try_into()
101 .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?
102 };
103
104 let length: u64 = block
105 .meta_data_length
106 .try_into()
107 .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
108
109 let message = get_message_from_block_offset(reader, offset, message_scratch)?;
110 let batch = get_dictionary_batch(&message)?;
111
112 read_dictionary(
113 batch,
114 &metadata.schema,
115 &metadata.ipc_schema,
116 dictionaries,
117 reader,
118 offset + length,
119 dictionary_scratch,
120 checked,
121 )
122}
123
124pub fn read_file_dictionaries<R: Read + Seek>(
127 reader: &mut R,
128 metadata: &FileMetadata,
129 scratch: &mut Vec<u8>,
130 checked: UnsafeBool,
131) -> PolarsResult<Dictionaries> {
132 let mut dictionaries = Default::default();
133
134 let blocks = if let Some(blocks) = &metadata.dictionaries {
135 blocks
136 } else {
137 return Ok(PlHashMap::new());
138 };
139 let mut message_scratch = Default::default();
141
142 for block in blocks {
143 read_dictionary_block(
144 reader,
145 metadata,
146 block,
147 false,
148 &mut dictionaries,
149 &mut message_scratch,
150 scratch,
151 checked,
152 )?;
153 }
154 Ok(dictionaries)
155}
156
157pub(super) fn decode_footer_len(footer: [u8; 10], end: u64) -> PolarsResult<(u64, usize)> {
158 let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());
159
160 if footer[4..] != ARROW_MAGIC_V2 {
161 if footer[..4] == ARROW_MAGIC_V1 {
162 polars_bail!(ComputeError: "feather v1 not supported");
163 }
164 return Err(polars_err!(oos = OutOfSpecKind::InvalidFooter));
165 }
166 let footer_len = footer_len
167 .try_into()
168 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
169
170 Ok((end, footer_len))
171}
172
173fn read_footer_len<R: Read + Seek>(reader: &mut R) -> PolarsResult<(u64, usize)> {
175 let end = reader.seek(SeekFrom::End(-10))? + 10;
177
178 let mut footer: [u8; 10] = [0; 10];
179
180 reader.read_exact(&mut footer)?;
181 decode_footer_len(footer, end)
182}
183
184fn read_footer<R: Read + Seek>(reader: &mut R, footer_len: usize) -> PolarsResult<Vec<u8>> {
185 reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
187
188 let mut serialized_footer = vec![];
189 serialized_footer.try_reserve(footer_len)?;
190 reader
191 .by_ref()
192 .take(footer_len as u64)
193 .read_to_end(&mut serialized_footer)?;
194 Ok(serialized_footer)
195}
196
197fn deserialize_footer_blocks(
198 footer_data: &[u8],
199) -> PolarsResult<(FooterRef<'_>, Vec<arrow_format::ipc::Block>)> {
200 let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data)
201 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))?;
202
203 let blocks = footer
204 .record_batches()
205 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
206 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;
207
208 let blocks = blocks
209 .iter()
210 .map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref)))
211 .collect::<PolarsResult<Vec<_>>>()?;
212 Ok((footer, blocks))
213}
214
215pub(super) fn deserialize_footer_ref(footer_data: &[u8]) -> PolarsResult<FooterRef<'_>> {
216 arrow_format::ipc::FooterRef::read_as_root(footer_data)
217 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))
218}
219
220pub(super) fn deserialize_schema_ref_from_footer(
221 footer: arrow_format::ipc::FooterRef<'_>,
222) -> PolarsResult<arrow_format::ipc::SchemaRef<'_>> {
223 footer
224 .schema()
225 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferSchema(err)))?
226 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingSchema))
227}
228
229pub(super) fn iter_recordbatch_blocks_from_footer(
231 footer: arrow_format::ipc::FooterRef<'_>,
232) -> PolarsResult<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_> {
233 let blocks = footer
234 .record_batches()
235 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
236 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;
237
238 Ok(blocks
239 .into_iter()
240 .map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref))))
241}
242
243pub(super) fn iter_dictionary_blocks_from_footer(
244 footer: arrow_format::ipc::FooterRef<'_>,
245) -> PolarsResult<Option<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_>>
246{
247 let dictionaries = footer
248 .dictionaries()
249 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferDictionaries(err)))?;
250
251 Ok(dictionaries.map(|dicts| {
252 dicts
253 .into_iter()
254 .map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref)))
255 }))
256}
257
258pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMetadata> {
259 let footer = deserialize_footer_ref(footer_data)?;
260 let blocks = iter_recordbatch_blocks_from_footer(footer)?.collect::<PolarsResult<Vec<_>>>()?;
261 let dictionaries = iter_dictionary_blocks_from_footer(footer)?
262 .map(|dicts| dicts.collect::<PolarsResult<Vec<_>>>())
263 .transpose()?;
264 let ipc_schema = deserialize_schema_ref_from_footer(footer)?;
265 let (schema, ipc_schema, custom_schema_metadata) = fb_to_schema(ipc_schema)?;
266
267 Ok(FileMetadata {
268 schema: Arc::new(schema),
269 ipc_schema,
270 blocks,
271 dictionaries,
272 size,
273 custom_schema_metadata: custom_schema_metadata.map(Arc::new),
274 })
275}
276
277pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetadata> {
279 let start = reader.stream_position()?;
280 let (end, footer_len) = read_footer_len(reader)?;
281 let serialized_footer = read_footer(reader, footer_len)?;
282 deserialize_footer(&serialized_footer, end - start)
283}
284
285pub(crate) fn get_record_batch(
286 message: arrow_format::ipc::MessageRef,
287) -> PolarsResult<arrow_format::ipc::RecordBatchRef> {
288 let header = message
289 .header()
290 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
291 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
292 match header {
293 arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => Ok(batch),
294 _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
295 }
296}
297
298pub fn get_message_from_block_offset<'a, R: Read + Seek>(
299 reader: &mut R,
300 offset: u64,
301 message_scratch: &'a mut Vec<u8>,
302) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
303 reader.seek(SeekFrom::Start(offset))?;
304 let mut meta_buf = [0; 4];
305 reader.read_exact(&mut meta_buf)?;
306 if meta_buf == CONTINUATION_MARKER {
307 reader.read_exact(&mut meta_buf)?;
309 }
310
311 let meta_len = i32::from_le_bytes(meta_buf)
312 .try_into()
313 .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
314
315 message_scratch.clear();
316 message_scratch.try_reserve(meta_len)?;
317 reader
318 .by_ref()
319 .take(meta_len as u64)
320 .read_to_end(message_scratch)?;
321
322 arrow_format::ipc::MessageRef::read_as_root(message_scratch)
323 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))
324}
325
326pub(super) fn get_message_from_block<'a, R: Read + Seek>(
327 reader: &mut R,
328 block: &arrow_format::ipc::Block,
329 message_scratch: &'a mut Vec<u8>,
330) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
331 let offset: u64 = block
332 .offset
333 .try_into()
334 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
335
336 get_message_from_block_offset(reader, offset, message_scratch)
337}
338
339#[allow(clippy::too_many_arguments)]
347pub fn read_batch<R: Read + Seek>(
348 reader: &mut R,
349 dictionaries: &Dictionaries,
350 metadata: &FileMetadata,
351 projection: Option<&[usize]>,
352 limit: Option<usize>,
353 index: usize,
354 force_zero_offset: bool,
356 message_scratch: &mut Vec<u8>,
357 data_scratch: &mut Vec<u8>,
358 checked: UnsafeBool,
359) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
360 let block = metadata.blocks[index];
361
362 let offset: u64 = if force_zero_offset {
363 0
364 } else {
365 block
366 .offset
367 .try_into()
368 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?
369 };
370
371 let length: u64 = block
372 .meta_data_length
373 .try_into()
374 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
375
376 let message = get_message_from_block_offset(reader, offset, message_scratch)?;
377 let batch = get_record_batch(message)?;
378
379 read_record_batch(
380 batch,
381 &metadata.schema,
382 &metadata.ipc_schema,
383 projection,
384 limit,
385 dictionaries,
386 message
387 .version()
388 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferVersion(err)))?,
389 reader,
390 offset + length,
391 data_scratch,
392 checked,
393 )
394}