Skip to main content

polars_io/ipc/
ipc_file.rs

1//! # (De)serializing Arrows IPC format.
2//!
3//! Arrow IPC is a [binary format](https://arrow.apache.org/docs/python/ipc.html).
4//! It is the recommended way to serialize and deserialize Polars DataFrames as this is most true
5//! to the data schema.
6//!
7//! ## Example
8//!
9//! ```rust
10//! use polars_core::prelude::*;
11//! use polars_io::prelude::*;
12//! use std::io::Cursor;
13//!
14//!
15//! let s0 = Column::new("days".into(), &[0, 1, 2, 3, 4]);
16//! let s1 = Column::new("temp".into(), &[22.1, 19.9, 7., 2., 3.]);
17//! let mut df = DataFrame::new_infer_height(vec![s0, s1]).unwrap();
18//!
19//! // Create an in memory file handler.
20//! // Vec<u8>: Read + Write
21//! // Cursor<T>: Seek
22//!
23//! let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
24//!
25//! // write to the in memory buffer
26//! IpcWriter::new(&mut buf).finish(&mut df).expect("ipc writer");
27//!
28//! // reset the buffers index after writing to the beginning of the buffer
29//! buf.set_position(0);
30//!
31//! // read the buffer into a DataFrame
32//! let df_read = IpcReader::new(buf).finish().unwrap();
33//! assert!(df.equals(&df_read));
34//! ```
35use std::io::{Read, Seek};
36use std::path::PathBuf;
37
38use arrow::datatypes::{ArrowSchemaRef, Metadata};
39use arrow::io::ipc::read::{self, get_row_count};
40use arrow::record_batch::RecordBatch;
41use polars_core::prelude::*;
42use polars_utils::bool::UnsafeBool;
43use polars_utils::pl_str::PlRefStr;
44#[cfg(feature = "serde")]
45use serde::{Deserialize, Serialize};
46
47use crate::RowIndex;
48use crate::hive::materialize_hive_partitions;
49use crate::mmap::MmapBytesReader;
50use crate::predicates::PhysicalIoExpr;
51use crate::prelude::*;
52use crate::shared::{ArrowReader, finish_reader};
53
54#[derive(Clone, Debug, PartialEq, Hash)]
55#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
56#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
57pub struct IpcScanOptions {
58    /// Read StatisticsFlags from the record batch custom metadata.
59    #[cfg_attr(feature = "serde", serde(default))]
60    pub record_batch_statistics: bool,
61    #[cfg_attr(feature = "serde", serde(default))]
62    pub checked: UnsafeBool,
63}
64
65#[expect(clippy::derivable_impls)]
66impl Default for IpcScanOptions {
67    fn default() -> Self {
68        Self {
69            record_batch_statistics: false,
70            checked: Default::default(),
71        }
72    }
73}
74
75/// Read Arrows IPC format into a DataFrame
76///
77/// # Example
78/// ```
79/// use polars_core::prelude::*;
80/// use std::fs::File;
81/// use polars_io::ipc::IpcReader;
82/// use polars_io::SerReader;
83///
84/// fn example() -> PolarsResult<DataFrame> {
85///     let file = File::open("file.ipc").expect("file not found");
86///
87///     IpcReader::new(file)
88///         .finish()
89/// }
90/// ```
91#[must_use]
92pub struct IpcReader<R: MmapBytesReader> {
93    /// File or Stream object
94    pub(super) reader: R,
95    /// Aggregates chunks afterwards to a single chunk.
96    rechunk: bool,
97    pub(super) n_rows: Option<usize>,
98    pub(super) projection: Option<Vec<usize>>,
99    pub(crate) columns: Option<Vec<String>>,
100    hive_partition_columns: Option<Vec<Series>>,
101    include_file_path: Option<(PlSmallStr, PlRefStr)>,
102    pub(super) row_index: Option<RowIndex>,
103    // Stores the as key semaphore to make sure we don't write to the memory mapped file.
104    pub(super) memory_map: Option<PathBuf>,
105    metadata: Option<read::FileMetadata>,
106    schema: Option<ArrowSchemaRef>,
107}
108
109fn check_mmap_err(err: PolarsError) -> PolarsResult<()> {
110    if let PolarsError::ComputeError(s) = &err {
111        if s.as_ref() == "memory_map can only be done on uncompressed IPC files" {
112            eprintln!(
113                "Could not memory_map compressed IPC file, defaulting to normal read. \
114                Toggle off 'memory_map' to silence this warning."
115            );
116            return Ok(());
117        }
118    }
119    Err(err)
120}
121
122impl<R: MmapBytesReader> IpcReader<R> {
123    fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> {
124        if self.metadata.is_none() {
125            let metadata = read::read_file_metadata(&mut self.reader)?;
126            self.schema = Some(metadata.schema.clone());
127            self.metadata = Some(metadata);
128        }
129        Ok(self.metadata.as_ref().unwrap())
130    }
131
132    /// Get arrow schema of the Ipc File.
133    pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
134        self.get_metadata()?;
135        Ok(self.schema.as_ref().unwrap().clone())
136    }
137
138    /// Get schema-level custom metadata of the Ipc file
139    pub fn custom_metadata(&mut self) -> PolarsResult<Option<Arc<Metadata>>> {
140        self.get_metadata()?;
141        Ok(self
142            .metadata
143            .as_ref()
144            .and_then(|meta| meta.custom_schema_metadata.clone()))
145    }
146
147    /// Stop reading when `n` rows are read.
148    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
149        self.n_rows = num_rows;
150        self
151    }
152
153    /// Columns to select/ project
154    pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
155        self.columns = columns;
156        self
157    }
158
159    pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
160        self.hive_partition_columns = columns;
161        self
162    }
163
164    pub fn with_include_file_path(
165        mut self,
166        include_file_path: Option<(PlSmallStr, PlRefStr)>,
167    ) -> Self {
168        self.include_file_path = include_file_path;
169        self
170    }
171
172    /// Add a row index column.
173    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
174        self.row_index = row_index;
175        self
176    }
177
178    /// Set the reader's column projection. This counts from 0, meaning that
179    /// `vec![0, 4]` would select the 1st and 5th column.
180    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
181        self.projection = projection;
182        self
183    }
184
185    /// Set if the file is to be memory_mapped. Only works with uncompressed files.
186    /// The file name must be passed to register the memory mapped file.
187    pub fn memory_mapped(mut self, path_buf: Option<PathBuf>) -> Self {
188        self.memory_map = path_buf;
189        self
190    }
191
192    // todo! hoist to lazy crate
193    #[cfg(feature = "lazy")]
194    pub fn finish_with_scan_ops(
195        mut self,
196        predicate: Option<Arc<dyn PhysicalIoExpr>>,
197        verbose: bool,
198    ) -> PolarsResult<DataFrame> {
199        if self.memory_map.is_some() && self.reader.to_file().is_some() {
200            if verbose {
201                eprintln!("memory map ipc file")
202            }
203            match self.finish_memmapped(predicate.clone()) {
204                Ok(df) => return Ok(df),
205                Err(err) => check_mmap_err(err)?,
206            }
207        }
208        let rechunk = self.rechunk;
209        let metadata = read::read_file_metadata(&mut self.reader)?;
210
211        // NOTE: For some code paths this already happened. See
212        // https://github.com/pola-rs/polars/pull/14984#discussion_r1520125000
213        // where this was introduced.
214        if let Some(columns) = &self.columns {
215            self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
216        }
217
218        let schema = if let Some(projection) = &self.projection {
219            Arc::new(apply_projection(&metadata.schema, projection))
220        } else {
221            metadata.schema.clone()
222        };
223
224        let reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
225
226        finish_reader(reader, rechunk, None, predicate, &schema, self.row_index)
227    }
228}
229
230impl<R: MmapBytesReader> ArrowReader for read::FileReader<R>
231where
232    R: Read + Seek,
233{
234    fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
235        self.next().map_or(Ok(None), |v| v.map(Some))
236    }
237}
238
239impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
240    fn new(reader: R) -> Self {
241        IpcReader {
242            reader,
243            rechunk: true,
244            n_rows: None,
245            columns: None,
246            hive_partition_columns: None,
247            include_file_path: None,
248            projection: None,
249            row_index: None,
250            memory_map: None,
251            metadata: None,
252            schema: None,
253        }
254    }
255
256    fn set_rechunk(mut self, rechunk: bool) -> Self {
257        self.rechunk = rechunk;
258        self
259    }
260
261    fn finish(mut self) -> PolarsResult<DataFrame> {
262        let reader_schema = if let Some(ref schema) = self.schema {
263            schema.clone()
264        } else {
265            self.get_metadata()?.schema.clone()
266        };
267        let reader_schema = reader_schema.as_ref();
268
269        let hive_partition_columns = self.hive_partition_columns.take();
270        let include_file_path = self.include_file_path.take();
271
272        // In case only hive columns are projected, the df would be empty, but we need the row count
273        // of the file in order to project the correct number of rows for the hive columns.
274        let mut df = (|| {
275            if self.projection.as_ref().is_some_and(|x| x.is_empty()) {
276                let row_count = if let Some(v) = self.n_rows {
277                    v
278                } else {
279                    get_row_count(&mut self.reader)? as usize
280                };
281                let mut df = DataFrame::empty_with_height(row_count);
282
283                if let Some(ri) = &self.row_index {
284                    unsafe { df.with_row_index_mut(ri.name.clone(), Some(ri.offset)) };
285                }
286                return PolarsResult::Ok(df);
287            }
288
289            if self.memory_map.is_some() && self.reader.to_file().is_some() {
290                match self.finish_memmapped(None) {
291                    Ok(df) => {
292                        return Ok(df);
293                    },
294                    Err(err) => check_mmap_err(err)?,
295                }
296            }
297            let rechunk = self.rechunk;
298            let schema = self.get_metadata()?.schema.clone();
299
300            if let Some(columns) = &self.columns {
301                let prj = columns_to_projection(columns, schema.as_ref())?;
302                self.projection = Some(prj);
303            }
304
305            let schema = if let Some(projection) = &self.projection {
306                Arc::new(apply_projection(schema.as_ref(), projection))
307            } else {
308                schema
309            };
310
311            let metadata = self.get_metadata()?.clone();
312
313            let ipc_reader =
314                read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
315            let df = finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)?;
316            Ok(df)
317        })()?;
318
319        if let Some(hive_cols) = hive_partition_columns {
320            materialize_hive_partitions(&mut df, reader_schema, Some(hive_cols.as_slice()));
321        };
322
323        if let Some((col, value)) = include_file_path {
324            unsafe {
325                df.push_column_unchecked(Column::new_scalar(
326                    col,
327                    Scalar::new(
328                        DataType::String,
329                        AnyValue::StringOwned(value.as_str().into()),
330                    ),
331                    df.height(),
332                ))
333            };
334        }
335
336        Ok(df)
337    }
338}