polars_arrow/io/ipc/read/
read_basic.rs1use std::collections::VecDeque;
2use std::io::{Read, Seek, SeekFrom};
3
4use polars_buffer::Buffer;
5use polars_error::{PolarsResult, polars_bail, polars_ensure, polars_err};
6
7use super::super::compression;
8use super::super::endianness::is_native_little_endian;
9use super::{Compression, IpcBuffer, Node, OutOfSpecKind};
10use crate::bitmap::Bitmap;
11use crate::types::NativeType;
12
13fn read_swapped<T: NativeType, R: Read + Seek>(
14 reader: &mut R,
15 length: usize,
16 buffer: &mut Vec<T>,
17 is_little_endian: bool,
18) -> PolarsResult<()> {
19 #[expect(clippy::slow_vector_initialization)] let mut slice = Vec::new();
22 slice.resize(length * size_of::<T>(), 0);
23 reader.read_exact(&mut slice)?;
24
25 let chunks = slice.chunks_exact(size_of::<T>());
26 if !is_little_endian {
27 buffer
29 .as_mut_slice()
30 .iter_mut()
31 .zip(chunks)
32 .try_for_each(|(slot, chunk)| {
33 let a: T::Bytes = match chunk.try_into() {
34 Ok(a) => a,
35 Err(_) => unreachable!(),
36 };
37 *slot = T::from_be_bytes(a);
38 PolarsResult::Ok(())
39 })?;
40 } else {
41 polars_bail!(ComputeError:
43 "Reading little endian files from big endian machines",
44 )
45 }
46 Ok(())
47}
48
49fn read_uncompressed_bytes<R: Read + Seek>(
50 reader: &mut R,
51 buffer_length: usize,
52 is_little_endian: bool,
53) -> PolarsResult<Vec<u8>> {
54 if is_native_little_endian() == is_little_endian {
55 let mut buffer = Vec::with_capacity(buffer_length);
56 let _ = reader
57 .take(buffer_length as u64)
58 .read_to_end(&mut buffer)
59 .unwrap();
60
61 polars_ensure!(buffer.len() == buffer_length, ComputeError: "Malformed IPC file: expected compressed buffer of len {buffer_length}, got {}", buffer.len());
62
63 Ok(buffer)
64 } else {
65 unreachable!()
66 }
67}
68
69fn read_uncompressed_buffer<T: NativeType, R: Read + Seek>(
70 reader: &mut R,
71 buffer_length: usize,
72 length: usize,
73 is_little_endian: bool,
74) -> PolarsResult<Vec<T>> {
75 let required_number_of_bytes = length.saturating_mul(size_of::<T>());
76 if required_number_of_bytes > buffer_length {
77 polars_bail!(
78 oos = OutOfSpecKind::InvalidBuffer {
79 length,
80 type_name: std::any::type_name::<T>(),
81 required_number_of_bytes,
82 buffer_length,
83 }
84 );
85 }
86
87 let mut buffer = vec![T::default(); length];
90
91 if is_native_little_endian() == is_little_endian {
92 let slice = bytemuck::cast_slice_mut(&mut buffer);
94 reader.read_exact(slice)?;
95 } else {
96 read_swapped(reader, length, &mut buffer, is_little_endian)?;
97 }
98 Ok(buffer)
99}
100
101fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
102 reader: &mut R,
103 buffer_length: usize,
104 row_limit: Option<usize>,
106 is_little_endian: bool,
107 compression: Compression,
108 scratch: &mut Vec<u8>,
109) -> PolarsResult<Vec<T>> {
110 if row_limit == Some(0) {
111 return Ok(vec![]);
112 }
113
114 if is_little_endian != is_native_little_endian() {
115 polars_bail!(ComputeError:
116 "Reading compressed and big endian IPC".to_string(),
117 )
118 }
119
120 scratch.clear();
122 scratch.try_reserve(buffer_length)?;
123 reader
124 .by_ref()
125 .take(buffer_length as u64)
126 .read_to_end(scratch)?;
127
128 polars_ensure!(scratch.len() == buffer_length, ComputeError: "Malformed IPC file: expected compressed buffer of len {buffer_length}, got {}", scratch.len());
129
130 let decompressed_len_field = i64::from_le_bytes(scratch[..8].try_into().unwrap());
131 let decompressed_bytes: usize = if decompressed_len_field == -1 {
132 buffer_length - 8
133 } else {
134 decompressed_len_field.try_into().map_err(|_| {
135 polars_err!(ComputeError: "Malformed IPC file: got invalid decompressed length {decompressed_len_field}")
136 })?
137 };
138
139 polars_ensure!(decompressed_bytes.is_multiple_of(size_of::<T>()),
140 ComputeError: "Malformed IPC file: got decompressed buffer length which is not a multiple of the data type");
141 let n_rows_in_array = decompressed_bytes / size_of::<T>();
142
143 if decompressed_len_field == -1 {
144 return Ok(bytemuck::cast_slice(&scratch[8..]).to_vec());
145 }
146
147 let n_rows_exact = row_limit
151 .map(|limit| std::cmp::min(limit, n_rows_in_array))
152 .unwrap_or(n_rows_in_array);
153
154 let mut buffer = vec![T::default(); n_rows_exact];
155 let out_slice = bytemuck::cast_slice_mut(&mut buffer);
156
157 let compression = compression
158 .codec()
159 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferCompression(err)))?;
160
161 match compression {
162 arrow_format::ipc::CompressionType::Lz4Frame => {
163 compression::decompress_lz4(&scratch[8..], out_slice)?;
164 },
165 arrow_format::ipc::CompressionType::Zstd => {
166 compression::decompress_zstd(&scratch[8..], out_slice)?;
167 },
168 }
169 Ok(buffer)
170}
171
172fn read_compressed_bytes<R: Read + Seek>(
173 reader: &mut R,
174 buffer_length: usize,
175 is_little_endian: bool,
176 compression: Compression,
177 scratch: &mut Vec<u8>,
178) -> PolarsResult<Vec<u8>> {
179 read_compressed_buffer::<u8, _>(
180 reader,
181 buffer_length,
182 None,
183 is_little_endian,
184 compression,
185 scratch,
186 )
187}
188
189pub fn read_bytes<R: Read + Seek>(
190 buf: &mut VecDeque<IpcBuffer>,
191 reader: &mut R,
192 block_offset: u64,
193 is_little_endian: bool,
194 compression: Option<Compression>,
195 scratch: &mut Vec<u8>,
196) -> PolarsResult<Buffer<u8>> {
197 let buf = buf
198 .pop_front()
199 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
200
201 let offset: u64 = buf
202 .offset()
203 .try_into()
204 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
205
206 let buffer_length: usize = buf
207 .length()
208 .try_into()
209 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
210
211 reader.seek(SeekFrom::Start(block_offset + offset))?;
212
213 if let Some(compression) = compression {
214 Ok(read_compressed_bytes(
215 reader,
216 buffer_length,
217 is_little_endian,
218 compression,
219 scratch,
220 )?
221 .into())
222 } else {
223 Ok(read_uncompressed_bytes(reader, buffer_length, is_little_endian)?.into())
224 }
225}
226
227pub fn read_buffer<T: NativeType, R: Read + Seek>(
228 buf: &mut VecDeque<IpcBuffer>,
229 length: usize, reader: &mut R,
231 block_offset: u64,
232 is_little_endian: bool,
233 compression: Option<Compression>,
234 scratch: &mut Vec<u8>,
235) -> PolarsResult<Buffer<T>> {
236 let buf = buf
237 .pop_front()
238 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
239
240 let offset: u64 = buf
241 .offset()
242 .try_into()
243 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
244
245 let buffer_length: usize = buf
246 .length()
247 .try_into()
248 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
249
250 reader.seek(SeekFrom::Start(block_offset + offset))?;
251
252 if let Some(compression) = compression {
253 Ok(read_compressed_buffer(
254 reader,
255 buffer_length,
256 Some(length),
257 is_little_endian,
258 compression,
259 scratch,
260 )?
261 .into())
262 } else {
263 Ok(read_uncompressed_buffer(reader, buffer_length, length, is_little_endian)?.into())
264 }
265}
266
267fn read_uncompressed_bitmap<R: Read + Seek>(
268 row_limit: usize,
269 bytes: usize,
270 reader: &mut R,
271) -> PolarsResult<Vec<u8>> {
272 if row_limit > bytes * 8 {
273 polars_bail!(
274 oos = OutOfSpecKind::InvalidBitmap {
275 length: row_limit,
276 number_of_bits: bytes * 8,
277 }
278 )
279 }
280
281 let mut buffer = vec![];
282 buffer.try_reserve(bytes)?;
283 reader
284 .by_ref()
285 .take(bytes as u64)
286 .read_to_end(&mut buffer)?;
287
288 polars_ensure!(buffer.len() == bytes, ComputeError: "Malformed IPC file: expected compressed buffer of len {bytes}, got {}", buffer.len());
289
290 Ok(buffer)
291}
292
293fn read_compressed_bitmap<R: Read + Seek>(
294 row_limit: usize,
295 bytes: usize,
296 compression: Compression,
297 reader: &mut R,
298 scratch: &mut Vec<u8>,
299) -> PolarsResult<Vec<u8>> {
300 scratch.clear();
301 scratch.try_reserve(bytes)?;
302 reader.by_ref().take(bytes as u64).read_to_end(scratch)?;
303 if scratch.len() != bytes {
304 polars_bail!(ComputeError: "Malformed IPC file: expected compressed buffer of len {bytes}, got {}", scratch.len());
305 }
306
307 let decompressed_len_field = i64::from_le_bytes(scratch[..8].try_into().unwrap());
308 let decompressed_bytes: usize = if decompressed_len_field == -1 {
309 scratch.len() - 8
310 } else {
311 decompressed_len_field.try_into().map_err(|_| {
312 polars_err!(ComputeError: "Malformed IPC file: got invalid decompressed length {decompressed_len_field}")
313 })?
314 };
315
316 polars_ensure!(decompressed_bytes >= row_limit.div_ceil(8),
320 ComputeError: "Malformed IPC file: got unexpected decompressed output length {decompressed_bytes}, expected {}", row_limit.div_ceil(8));
321
322 if decompressed_len_field == -1 {
323 return Ok(bytemuck::cast_slice(&scratch[8..]).to_vec());
324 }
325
326 #[expect(clippy::slow_vector_initialization)] let mut buffer = Vec::new();
328 buffer.resize(decompressed_bytes, 0);
329
330 let compression = compression
331 .codec()
332 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferCompression(err)))?;
333
334 match compression {
335 arrow_format::ipc::CompressionType::Lz4Frame => {
336 compression::decompress_lz4(&scratch[8..], &mut buffer)?;
337 },
338 arrow_format::ipc::CompressionType::Zstd => {
339 compression::decompress_zstd(&scratch[8..], &mut buffer)?;
340 },
341 }
342 Ok(buffer)
343}
344
345pub fn read_bitmap<R: Read + Seek>(
346 buf: &mut VecDeque<IpcBuffer>,
347 row_limit: usize,
348 reader: &mut R,
349 block_offset: u64,
350 _: bool,
351 compression: Option<Compression>,
352 scratch: &mut Vec<u8>,
353) -> PolarsResult<Bitmap> {
354 let buf = buf
355 .pop_front()
356 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
357
358 let offset: u64 = buf
359 .offset()
360 .try_into()
361 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
362
363 let bytes: usize = buf
364 .length()
365 .try_into()
366 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
367
368 reader.seek(SeekFrom::Start(block_offset + offset))?;
369
370 let buffer = if let Some(compression) = compression {
371 read_compressed_bitmap(row_limit, bytes, compression, reader, scratch)
372 } else {
373 read_uncompressed_bitmap(row_limit, bytes, reader)
374 }?;
375
376 Bitmap::try_new(buffer, row_limit)
377}
378
379#[allow(clippy::too_many_arguments)]
380pub fn read_validity<R: Read + Seek>(
381 buffers: &mut VecDeque<IpcBuffer>,
382 field_node: Node,
383 reader: &mut R,
384 block_offset: u64,
385 is_little_endian: bool,
386 compression: Option<Compression>,
387 limit: Option<usize>,
388 scratch: &mut Vec<u8>,
389) -> PolarsResult<Option<Bitmap>> {
390 let length: usize = field_node
391 .length()
392 .try_into()
393 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
394 let row_limit = limit.map(|limit| limit.min(length)).unwrap_or(length);
395
396 Ok(if field_node.null_count() > 0 {
397 Some(read_bitmap(
398 buffers,
399 row_limit,
400 reader,
401 block_offset,
402 is_little_endian,
403 compression,
404 scratch,
405 )?)
406 } else {
407 let _ = buffers
408 .pop_front()
409 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
410 None
411 })
412}