1use std::io::{Read, Seek};
36use std::path::PathBuf;
37
38use arrow::datatypes::{ArrowSchemaRef, Metadata};
39use arrow::io::ipc::read::{self, get_row_count};
40use arrow::record_batch::RecordBatch;
41use polars_core::prelude::*;
42use polars_utils::bool::UnsafeBool;
43use polars_utils::pl_str::PlRefStr;
44#[cfg(feature = "serde")]
45use serde::{Deserialize, Serialize};
46
47use crate::RowIndex;
48use crate::hive::materialize_hive_partitions;
49use crate::mmap::MmapBytesReader;
50use crate::predicates::PhysicalIoExpr;
51use crate::prelude::*;
52use crate::shared::{ArrowReader, finish_reader};
53
54#[derive(Clone, Debug, PartialEq, Hash)]
55#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
56#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
57pub struct IpcScanOptions {
58 #[cfg_attr(feature = "serde", serde(default))]
60 pub record_batch_statistics: bool,
61 #[cfg_attr(feature = "serde", serde(default))]
62 pub checked: UnsafeBool,
63}
64
65#[expect(clippy::derivable_impls)]
66impl Default for IpcScanOptions {
67 fn default() -> Self {
68 Self {
69 record_batch_statistics: false,
70 checked: Default::default(),
71 }
72 }
73}
74
75#[must_use]
92pub struct IpcReader<R: MmapBytesReader> {
93 pub(super) reader: R,
95 rechunk: bool,
97 pub(super) n_rows: Option<usize>,
98 pub(super) projection: Option<Vec<usize>>,
99 pub(crate) columns: Option<Vec<String>>,
100 hive_partition_columns: Option<Vec<Series>>,
101 include_file_path: Option<(PlSmallStr, PlRefStr)>,
102 pub(super) row_index: Option<RowIndex>,
103 pub(super) memory_map: Option<PathBuf>,
105 metadata: Option<read::FileMetadata>,
106 schema: Option<ArrowSchemaRef>,
107}
108
109fn check_mmap_err(err: PolarsError) -> PolarsResult<()> {
110 if let PolarsError::ComputeError(s) = &err {
111 if s.as_ref() == "memory_map can only be done on uncompressed IPC files" {
112 eprintln!(
113 "Could not memory_map compressed IPC file, defaulting to normal read. \
114 Toggle off 'memory_map' to silence this warning."
115 );
116 return Ok(());
117 }
118 }
119 Err(err)
120}
121
122impl<R: MmapBytesReader> IpcReader<R> {
123 fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> {
124 if self.metadata.is_none() {
125 let metadata = read::read_file_metadata(&mut self.reader)?;
126 self.schema = Some(metadata.schema.clone());
127 self.metadata = Some(metadata);
128 }
129 Ok(self.metadata.as_ref().unwrap())
130 }
131
132 pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
134 self.get_metadata()?;
135 Ok(self.schema.as_ref().unwrap().clone())
136 }
137
138 pub fn custom_metadata(&mut self) -> PolarsResult<Option<Arc<Metadata>>> {
140 self.get_metadata()?;
141 Ok(self
142 .metadata
143 .as_ref()
144 .and_then(|meta| meta.custom_schema_metadata.clone()))
145 }
146
147 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
149 self.n_rows = num_rows;
150 self
151 }
152
153 pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
155 self.columns = columns;
156 self
157 }
158
159 pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
160 self.hive_partition_columns = columns;
161 self
162 }
163
164 pub fn with_include_file_path(
165 mut self,
166 include_file_path: Option<(PlSmallStr, PlRefStr)>,
167 ) -> Self {
168 self.include_file_path = include_file_path;
169 self
170 }
171
172 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
174 self.row_index = row_index;
175 self
176 }
177
178 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
181 self.projection = projection;
182 self
183 }
184
185 pub fn memory_mapped(mut self, path_buf: Option<PathBuf>) -> Self {
188 self.memory_map = path_buf;
189 self
190 }
191
192 #[cfg(feature = "lazy")]
194 pub fn finish_with_scan_ops(
195 mut self,
196 predicate: Option<Arc<dyn PhysicalIoExpr>>,
197 verbose: bool,
198 ) -> PolarsResult<DataFrame> {
199 if self.memory_map.is_some() && self.reader.to_file().is_some() {
200 if verbose {
201 eprintln!("memory map ipc file")
202 }
203 match self.finish_memmapped(predicate.clone()) {
204 Ok(df) => return Ok(df),
205 Err(err) => check_mmap_err(err)?,
206 }
207 }
208 let rechunk = self.rechunk;
209 let metadata = read::read_file_metadata(&mut self.reader)?;
210
211 if let Some(columns) = &self.columns {
215 self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
216 }
217
218 let schema = if let Some(projection) = &self.projection {
219 Arc::new(apply_projection(&metadata.schema, projection))
220 } else {
221 metadata.schema.clone()
222 };
223
224 let reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
225
226 finish_reader(reader, rechunk, None, predicate, &schema, self.row_index)
227 }
228}
229
230impl<R: MmapBytesReader> ArrowReader for read::FileReader<R>
231where
232 R: Read + Seek,
233{
234 fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
235 self.next().map_or(Ok(None), |v| v.map(Some))
236 }
237}
238
239impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
240 fn new(reader: R) -> Self {
241 IpcReader {
242 reader,
243 rechunk: true,
244 n_rows: None,
245 columns: None,
246 hive_partition_columns: None,
247 include_file_path: None,
248 projection: None,
249 row_index: None,
250 memory_map: None,
251 metadata: None,
252 schema: None,
253 }
254 }
255
256 fn set_rechunk(mut self, rechunk: bool) -> Self {
257 self.rechunk = rechunk;
258 self
259 }
260
261 fn finish(mut self) -> PolarsResult<DataFrame> {
262 let reader_schema = if let Some(ref schema) = self.schema {
263 schema.clone()
264 } else {
265 self.get_metadata()?.schema.clone()
266 };
267 let reader_schema = reader_schema.as_ref();
268
269 let hive_partition_columns = self.hive_partition_columns.take();
270 let include_file_path = self.include_file_path.take();
271
272 let mut df = (|| {
275 if self.projection.as_ref().is_some_and(|x| x.is_empty()) {
276 let row_count = if let Some(v) = self.n_rows {
277 v
278 } else {
279 get_row_count(&mut self.reader)? as usize
280 };
281 let mut df = DataFrame::empty_with_height(row_count);
282
283 if let Some(ri) = &self.row_index {
284 unsafe { df.with_row_index_mut(ri.name.clone(), Some(ri.offset)) };
285 }
286 return PolarsResult::Ok(df);
287 }
288
289 if self.memory_map.is_some() && self.reader.to_file().is_some() {
290 match self.finish_memmapped(None) {
291 Ok(df) => {
292 return Ok(df);
293 },
294 Err(err) => check_mmap_err(err)?,
295 }
296 }
297 let rechunk = self.rechunk;
298 let schema = self.get_metadata()?.schema.clone();
299
300 if let Some(columns) = &self.columns {
301 let prj = columns_to_projection(columns, schema.as_ref())?;
302 self.projection = Some(prj);
303 }
304
305 let schema = if let Some(projection) = &self.projection {
306 Arc::new(apply_projection(schema.as_ref(), projection))
307 } else {
308 schema
309 };
310
311 let metadata = self.get_metadata()?.clone();
312
313 let ipc_reader =
314 read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
315 let df = finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)?;
316 Ok(df)
317 })()?;
318
319 if let Some(hive_cols) = hive_partition_columns {
320 materialize_hive_partitions(&mut df, reader_schema, Some(hive_cols.as_slice()));
321 };
322
323 if let Some((col, value)) = include_file_path {
324 unsafe {
325 df.push_column_unchecked(Column::new_scalar(
326 col,
327 Scalar::new(
328 DataType::String,
329 AnyValue::StringOwned(value.as_str().into()),
330 ),
331 df.height(),
332 ))
333 };
334 }
335
336 Ok(df)
337 }
338}