Skip to main content

polars_arrow/io/ipc/read/
read_basic.rs

1use std::collections::VecDeque;
2use std::io::{Read, Seek, SeekFrom};
3
4use polars_buffer::Buffer;
5use polars_error::{PolarsResult, polars_bail, polars_ensure, polars_err};
6
7use super::super::compression;
8use super::super::endianness::is_native_little_endian;
9use super::{Compression, IpcBuffer, Node, OutOfSpecKind};
10use crate::bitmap::Bitmap;
11use crate::types::NativeType;
12
13fn read_swapped<T: NativeType, R: Read + Seek>(
14    reader: &mut R,
15    length: usize,
16    buffer: &mut Vec<T>,
17    is_little_endian: bool,
18) -> PolarsResult<()> {
19    // Slow case where we must reverse bits.
20    #[expect(clippy::slow_vector_initialization)] // Avoid alloc_zeroed, leads to syscall.
21    let mut slice = Vec::new();
22    slice.resize(length * size_of::<T>(), 0);
23    reader.read_exact(&mut slice)?;
24
25    let chunks = slice.chunks_exact(size_of::<T>());
26    if !is_little_endian {
27        // machine is little endian, file is big endian
28        buffer
29            .as_mut_slice()
30            .iter_mut()
31            .zip(chunks)
32            .try_for_each(|(slot, chunk)| {
33                let a: T::Bytes = match chunk.try_into() {
34                    Ok(a) => a,
35                    Err(_) => unreachable!(),
36                };
37                *slot = T::from_be_bytes(a);
38                PolarsResult::Ok(())
39            })?;
40    } else {
41        // machine is big endian, file is little endian
42        polars_bail!(ComputeError:
43            "Reading little endian files from big endian machines",
44        )
45    }
46    Ok(())
47}
48
49fn read_uncompressed_bytes<R: Read + Seek>(
50    reader: &mut R,
51    buffer_length: usize,
52    is_little_endian: bool,
53) -> PolarsResult<Vec<u8>> {
54    if is_native_little_endian() == is_little_endian {
55        let mut buffer = Vec::with_capacity(buffer_length);
56        let _ = reader
57            .take(buffer_length as u64)
58            .read_to_end(&mut buffer)
59            .unwrap();
60
61        polars_ensure!(buffer.len() == buffer_length, ComputeError: "Malformed IPC file: expected compressed buffer of len {buffer_length}, got {}", buffer.len());
62
63        Ok(buffer)
64    } else {
65        unreachable!()
66    }
67}
68
69fn read_uncompressed_buffer<T: NativeType, R: Read + Seek>(
70    reader: &mut R,
71    buffer_length: usize,
72    length: usize,
73    is_little_endian: bool,
74) -> PolarsResult<Vec<T>> {
75    let required_number_of_bytes = length.saturating_mul(size_of::<T>());
76    if required_number_of_bytes > buffer_length {
77        polars_bail!(
78            oos = OutOfSpecKind::InvalidBuffer {
79                length,
80                type_name: std::any::type_name::<T>(),
81                required_number_of_bytes,
82                buffer_length,
83            }
84        );
85    }
86
87    // it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
88    // see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
89    let mut buffer = vec![T::default(); length];
90
91    if is_native_little_endian() == is_little_endian {
92        // fast case where we can just copy the contents
93        let slice = bytemuck::cast_slice_mut(&mut buffer);
94        reader.read_exact(slice)?;
95    } else {
96        read_swapped(reader, length, &mut buffer, is_little_endian)?;
97    }
98    Ok(buffer)
99}
100
101fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
102    reader: &mut R,
103    buffer_length: usize,
104    // Upper bound for the number of rows to be returned.
105    row_limit: Option<usize>,
106    is_little_endian: bool,
107    compression: Compression,
108    scratch: &mut Vec<u8>,
109) -> PolarsResult<Vec<T>> {
110    if row_limit == Some(0) {
111        return Ok(vec![]);
112    }
113
114    if is_little_endian != is_native_little_endian() {
115        polars_bail!(ComputeError:
116            "Reading compressed and big endian IPC".to_string(),
117        )
118    }
119
120    // Decompress first.
121    scratch.clear();
122    scratch.try_reserve(buffer_length)?;
123    reader
124        .by_ref()
125        .take(buffer_length as u64)
126        .read_to_end(scratch)?;
127
128    polars_ensure!(scratch.len() == buffer_length, ComputeError: "Malformed IPC file: expected compressed buffer of len {buffer_length}, got {}", scratch.len());
129
130    let decompressed_len_field = i64::from_le_bytes(scratch[..8].try_into().unwrap());
131    let decompressed_bytes: usize = if decompressed_len_field == -1 {
132        buffer_length - 8
133    } else {
134        decompressed_len_field.try_into().map_err(|_| {
135            polars_err!(ComputeError: "Malformed IPC file: got invalid decompressed length {decompressed_len_field}")
136        })?
137    };
138
139    polars_ensure!(decompressed_bytes.is_multiple_of(size_of::<T>()),
140            ComputeError: "Malformed IPC file: got decompressed buffer length which is not a multiple of the data type");
141    let n_rows_in_array = decompressed_bytes / size_of::<T>();
142
143    if decompressed_len_field == -1 {
144        return Ok(bytemuck::cast_slice(&scratch[8..]).to_vec());
145    }
146
147    // It is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
148    // see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
149
150    let n_rows_exact = row_limit
151        .map(|limit| std::cmp::min(limit, n_rows_in_array))
152        .unwrap_or(n_rows_in_array);
153
154    let mut buffer = vec![T::default(); n_rows_exact];
155    let out_slice = bytemuck::cast_slice_mut(&mut buffer);
156
157    let compression = compression
158        .codec()
159        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferCompression(err)))?;
160
161    match compression {
162        arrow_format::ipc::CompressionType::Lz4Frame => {
163            compression::decompress_lz4(&scratch[8..], out_slice)?;
164        },
165        arrow_format::ipc::CompressionType::Zstd => {
166            compression::decompress_zstd(&scratch[8..], out_slice)?;
167        },
168    }
169    Ok(buffer)
170}
171
172fn read_compressed_bytes<R: Read + Seek>(
173    reader: &mut R,
174    buffer_length: usize,
175    is_little_endian: bool,
176    compression: Compression,
177    scratch: &mut Vec<u8>,
178) -> PolarsResult<Vec<u8>> {
179    read_compressed_buffer::<u8, _>(
180        reader,
181        buffer_length,
182        None,
183        is_little_endian,
184        compression,
185        scratch,
186    )
187}
188
189pub fn read_bytes<R: Read + Seek>(
190    buf: &mut VecDeque<IpcBuffer>,
191    reader: &mut R,
192    block_offset: u64,
193    is_little_endian: bool,
194    compression: Option<Compression>,
195    scratch: &mut Vec<u8>,
196) -> PolarsResult<Buffer<u8>> {
197    let buf = buf
198        .pop_front()
199        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
200
201    let offset: u64 = buf
202        .offset()
203        .try_into()
204        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
205
206    let buffer_length: usize = buf
207        .length()
208        .try_into()
209        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
210
211    reader.seek(SeekFrom::Start(block_offset + offset))?;
212
213    if let Some(compression) = compression {
214        Ok(read_compressed_bytes(
215            reader,
216            buffer_length,
217            is_little_endian,
218            compression,
219            scratch,
220        )?
221        .into())
222    } else {
223        Ok(read_uncompressed_bytes(reader, buffer_length, is_little_endian)?.into())
224    }
225}
226
227pub fn read_buffer<T: NativeType, R: Read + Seek>(
228    buf: &mut VecDeque<IpcBuffer>,
229    length: usize, // in slots
230    reader: &mut R,
231    block_offset: u64,
232    is_little_endian: bool,
233    compression: Option<Compression>,
234    scratch: &mut Vec<u8>,
235) -> PolarsResult<Buffer<T>> {
236    let buf = buf
237        .pop_front()
238        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
239
240    let offset: u64 = buf
241        .offset()
242        .try_into()
243        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
244
245    let buffer_length: usize = buf
246        .length()
247        .try_into()
248        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
249
250    reader.seek(SeekFrom::Start(block_offset + offset))?;
251
252    if let Some(compression) = compression {
253        Ok(read_compressed_buffer(
254            reader,
255            buffer_length,
256            Some(length),
257            is_little_endian,
258            compression,
259            scratch,
260        )?
261        .into())
262    } else {
263        Ok(read_uncompressed_buffer(reader, buffer_length, length, is_little_endian)?.into())
264    }
265}
266
267fn read_uncompressed_bitmap<R: Read + Seek>(
268    row_limit: usize,
269    bytes: usize,
270    reader: &mut R,
271) -> PolarsResult<Vec<u8>> {
272    if row_limit > bytes * 8 {
273        polars_bail!(
274            oos = OutOfSpecKind::InvalidBitmap {
275                length: row_limit,
276                number_of_bits: bytes * 8,
277            }
278        )
279    }
280
281    let mut buffer = vec![];
282    buffer.try_reserve(bytes)?;
283    reader
284        .by_ref()
285        .take(bytes as u64)
286        .read_to_end(&mut buffer)?;
287
288    polars_ensure!(buffer.len() == bytes, ComputeError: "Malformed IPC file: expected compressed buffer of len {bytes}, got {}", buffer.len());
289
290    Ok(buffer)
291}
292
293fn read_compressed_bitmap<R: Read + Seek>(
294    row_limit: usize,
295    bytes: usize,
296    compression: Compression,
297    reader: &mut R,
298    scratch: &mut Vec<u8>,
299) -> PolarsResult<Vec<u8>> {
300    scratch.clear();
301    scratch.try_reserve(bytes)?;
302    reader.by_ref().take(bytes as u64).read_to_end(scratch)?;
303    if scratch.len() != bytes {
304        polars_bail!(ComputeError: "Malformed IPC file: expected compressed buffer of len {bytes}, got {}", scratch.len());
305    }
306
307    let decompressed_len_field = i64::from_le_bytes(scratch[..8].try_into().unwrap());
308    let decompressed_bytes: usize = if decompressed_len_field == -1 {
309        scratch.len() - 8
310    } else {
311        decompressed_len_field.try_into().map_err(|_| {
312            polars_err!(ComputeError: "Malformed IPC file: got invalid decompressed length {decompressed_len_field}")
313        })?
314    };
315
316    // In addition to the slicing use case, we allow for excess bytes in untruncated buffers,
317    // see https://github.com/pola-rs/polars/issues/26126
318    // and https://github.com/apache/arrow/issues/48883
319    polars_ensure!(decompressed_bytes >= row_limit.div_ceil(8),
320        ComputeError: "Malformed IPC file: got unexpected decompressed output length {decompressed_bytes}, expected {}", row_limit.div_ceil(8));
321
322    if decompressed_len_field == -1 {
323        return Ok(bytemuck::cast_slice(&scratch[8..]).to_vec());
324    }
325
326    #[expect(clippy::slow_vector_initialization)] // Avoid alloc_zeroed, leads to syscall.
327    let mut buffer = Vec::new();
328    buffer.resize(decompressed_bytes, 0);
329
330    let compression = compression
331        .codec()
332        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferCompression(err)))?;
333
334    match compression {
335        arrow_format::ipc::CompressionType::Lz4Frame => {
336            compression::decompress_lz4(&scratch[8..], &mut buffer)?;
337        },
338        arrow_format::ipc::CompressionType::Zstd => {
339            compression::decompress_zstd(&scratch[8..], &mut buffer)?;
340        },
341    }
342    Ok(buffer)
343}
344
345pub fn read_bitmap<R: Read + Seek>(
346    buf: &mut VecDeque<IpcBuffer>,
347    row_limit: usize,
348    reader: &mut R,
349    block_offset: u64,
350    _: bool,
351    compression: Option<Compression>,
352    scratch: &mut Vec<u8>,
353) -> PolarsResult<Bitmap> {
354    let buf = buf
355        .pop_front()
356        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
357
358    let offset: u64 = buf
359        .offset()
360        .try_into()
361        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
362
363    let bytes: usize = buf
364        .length()
365        .try_into()
366        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
367
368    reader.seek(SeekFrom::Start(block_offset + offset))?;
369
370    let buffer = if let Some(compression) = compression {
371        read_compressed_bitmap(row_limit, bytes, compression, reader, scratch)
372    } else {
373        read_uncompressed_bitmap(row_limit, bytes, reader)
374    }?;
375
376    Bitmap::try_new(buffer, row_limit)
377}
378
379#[allow(clippy::too_many_arguments)]
380pub fn read_validity<R: Read + Seek>(
381    buffers: &mut VecDeque<IpcBuffer>,
382    field_node: Node,
383    reader: &mut R,
384    block_offset: u64,
385    is_little_endian: bool,
386    compression: Option<Compression>,
387    limit: Option<usize>,
388    scratch: &mut Vec<u8>,
389) -> PolarsResult<Option<Bitmap>> {
390    let length: usize = field_node
391        .length()
392        .try_into()
393        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
394    let row_limit = limit.map(|limit| limit.min(length)).unwrap_or(length);
395
396    Ok(if field_node.null_count() > 0 {
397        Some(read_bitmap(
398            buffers,
399            row_limit,
400            reader,
401            block_offset,
402            is_little_endian,
403            compression,
404            scratch,
405        )?)
406    } else {
407        let _ = buffers
408            .pop_front()
409            .ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
410        None
411    })
412}