Skip to main content

polars_arrow/io/ipc/read/
file.rs

1use std::io::{Read, Seek, SeekFrom};
2use std::sync::Arc;
3
4use arrow_format::ipc::FooterRef;
5use arrow_format::ipc::planus::ReadAsRoot;
6use polars_error::{PolarsResult, polars_bail, polars_err};
7use polars_utils::aliases::{InitHashMaps, PlHashMap};
8use polars_utils::bool::UnsafeBool;
9
10use super::super::{ARROW_MAGIC_V1, ARROW_MAGIC_V2, CONTINUATION_MARKER};
11use super::common::*;
12use super::schema::fb_to_schema;
13use super::{Dictionaries, OutOfSpecKind, SendableIterator};
14use crate::array::Array;
15use crate::datatypes::{ArrowSchemaRef, Metadata};
16use crate::io::ipc::IpcSchema;
17use crate::record_batch::RecordBatchT;
18
19/// Metadata of an Arrow IPC file, written in the footer of the file.
20#[derive(Debug, Clone)]
21pub struct FileMetadata {
22    /// The schema that is read from the file footer
23    pub schema: ArrowSchemaRef,
24
25    /// The custom metadata that is read from the schema
26    pub custom_schema_metadata: Option<Arc<Metadata>>,
27
28    /// The files' [`IpcSchema`]
29    pub ipc_schema: IpcSchema,
30
31    /// The blocks in the file
32    ///
33    /// A block indicates the regions in the file to read to get data
34    pub blocks: Vec<arrow_format::ipc::Block>,
35
36    /// Dictionaries associated to each dict_id
37    pub dictionaries: Option<Vec<arrow_format::ipc::Block>>,
38
39    /// The total size of the file in bytes
40    pub size: u64,
41}
42
43/// Read the row count by summing the length of the of the record batches
44pub fn get_row_count<R: Read + Seek>(reader: &mut R) -> PolarsResult<i64> {
45    let (_, footer_len) = read_footer_len(reader)?;
46    let footer = read_footer(reader, footer_len)?;
47    let (_, blocks) = deserialize_footer_blocks(&footer)?;
48
49    get_row_count_from_blocks(reader, &blocks)
50}
51
52///  Read the row count by summing the length of the of the record batches in blocks
53pub fn get_row_count_from_blocks<R: Read + Seek>(
54    reader: &mut R,
55    blocks: &[arrow_format::ipc::Block],
56) -> PolarsResult<i64> {
57    let mut message_scratch: Vec<u8> = Default::default();
58
59    blocks
60        .iter()
61        .map(|block| {
62            let message = get_message_from_block(reader, block, &mut message_scratch)?;
63            let record_batch = get_record_batch(message)?;
64            record_batch.length().map_err(|e| e.into())
65        })
66        .sum()
67}
68
69pub(crate) fn get_dictionary_batch<'a>(
70    message: &'a arrow_format::ipc::MessageRef,
71) -> PolarsResult<arrow_format::ipc::DictionaryBatchRef<'a>> {
72    let header = message
73        .header()
74        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
75        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
76    match header {
77        arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => Ok(batch),
78        _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
79    }
80}
81
82#[allow(clippy::too_many_arguments)]
83pub fn read_dictionary_block<R: Read + Seek>(
84    reader: &mut R,
85    metadata: &FileMetadata,
86    block: &arrow_format::ipc::Block,
87    // When true, the underlying reader bytestream represents a standalone IPC Block
88    // rather than a complete IPC File.
89    force_zero_offset: bool,
90    dictionaries: &mut Dictionaries,
91    message_scratch: &mut Vec<u8>,
92    dictionary_scratch: &mut Vec<u8>,
93    checked: UnsafeBool,
94) -> PolarsResult<()> {
95    let offset: u64 = if force_zero_offset {
96        0
97    } else {
98        block
99            .offset
100            .try_into()
101            .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?
102    };
103
104    let length: u64 = block
105        .meta_data_length
106        .try_into()
107        .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
108
109    let message = get_message_from_block_offset(reader, offset, message_scratch)?;
110    let batch = get_dictionary_batch(&message)?;
111
112    read_dictionary(
113        batch,
114        &metadata.schema,
115        &metadata.ipc_schema,
116        dictionaries,
117        reader,
118        offset + length,
119        dictionary_scratch,
120        checked,
121    )
122}
123
124/// Reads all file's dictionaries, if any
125/// This function is IO-bounded
126pub fn read_file_dictionaries<R: Read + Seek>(
127    reader: &mut R,
128    metadata: &FileMetadata,
129    scratch: &mut Vec<u8>,
130    checked: UnsafeBool,
131) -> PolarsResult<Dictionaries> {
132    let mut dictionaries = Default::default();
133
134    let blocks = if let Some(blocks) = &metadata.dictionaries {
135        blocks
136    } else {
137        return Ok(PlHashMap::new());
138    };
139    // use a temporary smaller scratch for the messages
140    let mut message_scratch = Default::default();
141
142    for block in blocks {
143        read_dictionary_block(
144            reader,
145            metadata,
146            block,
147            false,
148            &mut dictionaries,
149            &mut message_scratch,
150            scratch,
151            checked,
152        )?;
153    }
154    Ok(dictionaries)
155}
156
157pub(super) fn decode_footer_len(footer: [u8; 10], end: u64) -> PolarsResult<(u64, usize)> {
158    let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());
159
160    if footer[4..] != ARROW_MAGIC_V2 {
161        if footer[..4] == ARROW_MAGIC_V1 {
162            polars_bail!(ComputeError: "feather v1 not supported");
163        }
164        return Err(polars_err!(oos = OutOfSpecKind::InvalidFooter));
165    }
166    let footer_len = footer_len
167        .try_into()
168        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
169
170    Ok((end, footer_len))
171}
172
173/// Reads the footer's length and magic number in footer
174fn read_footer_len<R: Read + Seek>(reader: &mut R) -> PolarsResult<(u64, usize)> {
175    // read footer length and magic number in footer
176    let end = reader.seek(SeekFrom::End(-10))? + 10;
177
178    let mut footer: [u8; 10] = [0; 10];
179
180    reader.read_exact(&mut footer)?;
181    decode_footer_len(footer, end)
182}
183
184fn read_footer<R: Read + Seek>(reader: &mut R, footer_len: usize) -> PolarsResult<Vec<u8>> {
185    // read footer
186    reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
187
188    let mut serialized_footer = vec![];
189    serialized_footer.try_reserve(footer_len)?;
190    reader
191        .by_ref()
192        .take(footer_len as u64)
193        .read_to_end(&mut serialized_footer)?;
194    Ok(serialized_footer)
195}
196
197fn deserialize_footer_blocks(
198    footer_data: &[u8],
199) -> PolarsResult<(FooterRef<'_>, Vec<arrow_format::ipc::Block>)> {
200    let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data)
201        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))?;
202
203    let blocks = footer
204        .record_batches()
205        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
206        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;
207
208    let blocks = blocks
209        .iter()
210        .map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref)))
211        .collect::<PolarsResult<Vec<_>>>()?;
212    Ok((footer, blocks))
213}
214
215pub(super) fn deserialize_footer_ref(footer_data: &[u8]) -> PolarsResult<FooterRef<'_>> {
216    arrow_format::ipc::FooterRef::read_as_root(footer_data)
217        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))
218}
219
220pub(super) fn deserialize_schema_ref_from_footer(
221    footer: arrow_format::ipc::FooterRef<'_>,
222) -> PolarsResult<arrow_format::ipc::SchemaRef<'_>> {
223    footer
224        .schema()
225        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferSchema(err)))?
226        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingSchema))
227}
228
229/// Get the IPC blocks from the footer containing record batches
230pub(super) fn iter_recordbatch_blocks_from_footer(
231    footer: arrow_format::ipc::FooterRef<'_>,
232) -> PolarsResult<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_> {
233    let blocks = footer
234        .record_batches()
235        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
236        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;
237
238    Ok(blocks
239        .into_iter()
240        .map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref))))
241}
242
243pub(super) fn iter_dictionary_blocks_from_footer(
244    footer: arrow_format::ipc::FooterRef<'_>,
245) -> PolarsResult<Option<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_>>
246{
247    let dictionaries = footer
248        .dictionaries()
249        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferDictionaries(err)))?;
250
251    Ok(dictionaries.map(|dicts| {
252        dicts
253            .into_iter()
254            .map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref)))
255    }))
256}
257
258pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMetadata> {
259    let footer = deserialize_footer_ref(footer_data)?;
260    let blocks = iter_recordbatch_blocks_from_footer(footer)?.collect::<PolarsResult<Vec<_>>>()?;
261    let dictionaries = iter_dictionary_blocks_from_footer(footer)?
262        .map(|dicts| dicts.collect::<PolarsResult<Vec<_>>>())
263        .transpose()?;
264    let ipc_schema = deserialize_schema_ref_from_footer(footer)?;
265    let (schema, ipc_schema, custom_schema_metadata) = fb_to_schema(ipc_schema)?;
266
267    Ok(FileMetadata {
268        schema: Arc::new(schema),
269        ipc_schema,
270        blocks,
271        dictionaries,
272        size,
273        custom_schema_metadata: custom_schema_metadata.map(Arc::new),
274    })
275}
276
277/// Read the Arrow IPC file's metadata
278pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetadata> {
279    let start = reader.stream_position()?;
280    let (end, footer_len) = read_footer_len(reader)?;
281    let serialized_footer = read_footer(reader, footer_len)?;
282    deserialize_footer(&serialized_footer, end - start)
283}
284
285pub(crate) fn get_record_batch(
286    message: arrow_format::ipc::MessageRef,
287) -> PolarsResult<arrow_format::ipc::RecordBatchRef> {
288    let header = message
289        .header()
290        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
291        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
292    match header {
293        arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => Ok(batch),
294        _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
295    }
296}
297
298pub fn get_message_from_block_offset<'a, R: Read + Seek>(
299    reader: &mut R,
300    offset: u64,
301    message_scratch: &'a mut Vec<u8>,
302) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
303    reader.seek(SeekFrom::Start(offset))?;
304    let mut meta_buf = [0; 4];
305    reader.read_exact(&mut meta_buf)?;
306    if meta_buf == CONTINUATION_MARKER {
307        // continuation marker encountered, read message next
308        reader.read_exact(&mut meta_buf)?;
309    }
310
311    let meta_len = i32::from_le_bytes(meta_buf)
312        .try_into()
313        .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
314
315    message_scratch.clear();
316    message_scratch.try_reserve(meta_len)?;
317    reader
318        .by_ref()
319        .take(meta_len as u64)
320        .read_to_end(message_scratch)?;
321
322    arrow_format::ipc::MessageRef::read_as_root(message_scratch)
323        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))
324}
325
326pub(super) fn get_message_from_block<'a, R: Read + Seek>(
327    reader: &mut R,
328    block: &arrow_format::ipc::Block,
329    message_scratch: &'a mut Vec<u8>,
330) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
331    let offset: u64 = block
332        .offset
333        .try_into()
334        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
335
336    get_message_from_block_offset(reader, offset, message_scratch)
337}
338
339/// Reads the record batch at position `index` from the reader.
340///
341/// This function is useful for random access to the file. For example, if
342/// you have indexed the file somewhere else, this allows pruning
343/// certain parts of the file.
344/// # Panics
345/// This function panics iff `index >= metadata.blocks.len()`
346#[allow(clippy::too_many_arguments)]
347pub fn read_batch<R: Read + Seek>(
348    reader: &mut R,
349    dictionaries: &Dictionaries,
350    metadata: &FileMetadata,
351    projection: Option<&[usize]>,
352    limit: Option<usize>,
353    index: usize,
354    // When true, the reader object is handled as an IPC Block.
355    force_zero_offset: bool,
356    message_scratch: &mut Vec<u8>,
357    data_scratch: &mut Vec<u8>,
358    checked: UnsafeBool,
359) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
360    let block = metadata.blocks[index];
361
362    let offset: u64 = if force_zero_offset {
363        0
364    } else {
365        block
366            .offset
367            .try_into()
368            .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?
369    };
370
371    let length: u64 = block
372        .meta_data_length
373        .try_into()
374        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
375
376    let message = get_message_from_block_offset(reader, offset, message_scratch)?;
377    let batch = get_record_batch(message)?;
378
379    read_record_batch(
380        batch,
381        &metadata.schema,
382        &metadata.ipc_schema,
383        projection,
384        limit,
385        dictionaries,
386        message
387            .version()
388            .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferVersion(err)))?,
389        reader,
390        offset + length,
391        data_scratch,
392        checked,
393    )
394}