polars_plan/dsl/file_scan/
mod.rs1use std::hash::Hash;
2use std::sync::Mutex;
3
4use polars_io::cloud::CloudOptions;
5#[cfg(feature = "csv")]
6use polars_io::csv::read::CsvReadOptions;
7#[cfg(feature = "ipc")]
8use polars_io::ipc::IpcScanOptions;
9#[cfg(feature = "parquet")]
10use polars_io::parquet::metadata::FileMetadataRef;
11#[cfg(feature = "parquet")]
12use polars_io::parquet::read::ParquetOptions;
13use polars_io::{HiveOptions, RowIndex};
14use polars_utils::slice_enum::Slice;
15#[cfg(feature = "serde")]
16use serde::{Deserialize, Serialize};
17use strum_macros::IntoStaticStr;
18
19use super::*;
20
21#[cfg(feature = "python")]
22pub mod python_dataset;
23#[cfg(feature = "python")]
24pub use python_dataset::{DATASET_PROVIDER_VTABLE, PythonDatasetProviderVTable};
25
26bitflags::bitflags! {
27 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
28 pub struct ScanFlags : u32 {
29 const SPECIALIZED_PREDICATE_FILTER = 0x01;
30 }
31}
32
33#[derive(Clone, Debug, IntoStaticStr)]
34#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
35pub enum FileScan {
37 #[cfg(feature = "csv")]
38 Csv { options: CsvReadOptions },
39
40 #[cfg(feature = "json")]
41 NDJson { options: NDJsonReadOptions },
42
43 #[cfg(feature = "parquet")]
44 Parquet {
45 options: ParquetOptions,
46 #[cfg_attr(feature = "serde", serde(skip))]
47 metadata: Option<FileMetadataRef>,
48 },
49
50 #[cfg(feature = "ipc")]
51 Ipc {
52 options: IpcScanOptions,
53 #[cfg_attr(feature = "serde", serde(skip))]
54 metadata: Option<Arc<arrow::io::ipc::read::FileMetadata>>,
55 },
56
57 #[cfg(feature = "python")]
58 PythonDataset {
59 dataset_object: Arc<python_dataset::PythonDatasetProvider>,
60
61 #[cfg_attr(feature = "serde", serde(skip, default))]
62 cached_ir: Arc<Mutex<Option<ExpandedDataset>>>,
63 },
64
65 #[cfg_attr(feature = "serde", serde(skip))]
66 Anonymous {
67 options: Arc<AnonymousScanOptions>,
68 function: Arc<dyn AnonymousScan>,
69 },
70}
71
72impl FileScan {
73 pub fn flags(&self) -> ScanFlags {
74 match self {
75 #[cfg(feature = "csv")]
76 Self::Csv { .. } => ScanFlags::empty(),
77 #[cfg(feature = "ipc")]
78 Self::Ipc { .. } => ScanFlags::empty(),
79 #[cfg(feature = "parquet")]
80 Self::Parquet { .. } => ScanFlags::SPECIALIZED_PREDICATE_FILTER,
81 #[cfg(feature = "json")]
82 Self::NDJson { .. } => ScanFlags::empty(),
83 #[allow(unreachable_patterns)]
84 _ => ScanFlags::empty(),
85 }
86 }
87
88 pub(crate) fn sort_projection(&self, _has_row_index: bool) -> bool {
89 match self {
90 #[cfg(feature = "csv")]
91 Self::Csv { .. } => true,
92 #[cfg(feature = "ipc")]
93 Self::Ipc { .. } => _has_row_index,
94 #[cfg(feature = "parquet")]
95 Self::Parquet { .. } => false,
96 #[allow(unreachable_patterns)]
97 _ => false,
98 }
99 }
100
101 pub fn streamable(&self) -> bool {
102 match self {
103 #[cfg(feature = "csv")]
104 Self::Csv { .. } => true,
105 #[cfg(feature = "ipc")]
106 Self::Ipc { .. } => false,
107 #[cfg(feature = "parquet")]
108 Self::Parquet { .. } => true,
109 #[cfg(feature = "json")]
110 Self::NDJson { .. } => false,
111 #[allow(unreachable_patterns)]
112 _ => false,
113 }
114 }
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
118#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
119pub enum MissingColumnsPolicy {
120 #[default]
121 Raise,
122 Insert,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
127#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
128pub enum CastColumnsPolicy {
129 #[default]
131 ErrorOnMismatch,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
135#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
136pub enum ExtraColumnsPolicy {
137 #[default]
139 Raise,
140 Ignore,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
145#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
146pub struct UnifiedScanArgs {
147 pub schema: Option<SchemaRef>,
150 pub cloud_options: Option<CloudOptions>,
151 pub hive_options: HiveOptions,
152
153 pub rechunk: bool,
154 pub cache: bool,
155 pub glob: bool,
156
157 pub projection: Option<Arc<[PlSmallStr]>>,
158 pub row_index: Option<RowIndex>,
159 pub pre_slice: Option<Slice>,
161
162 pub cast_columns_policy: CastColumnsPolicy,
163 pub missing_columns_policy: MissingColumnsPolicy,
164 pub include_file_paths: Option<PlSmallStr>,
165}
166
167mod _file_scan_eq_hash {
170 use std::hash::{Hash, Hasher};
171 use std::sync::Arc;
172
173 use super::FileScan;
174
175 impl PartialEq for FileScan {
176 fn eq(&self, other: &Self) -> bool {
177 FileScanEqHashWrap::from(self) == FileScanEqHashWrap::from(other)
178 }
179 }
180
181 impl Eq for FileScan {}
182
183 impl Hash for FileScan {
184 fn hash<H: Hasher>(&self, state: &mut H) {
185 FileScanEqHashWrap::from(self).hash(state)
186 }
187 }
188
189 #[derive(PartialEq, Hash)]
193 pub enum FileScanEqHashWrap<'a> {
194 #[cfg(feature = "csv")]
195 Csv {
196 options: &'a polars_io::csv::read::CsvReadOptions,
197 },
198
199 #[cfg(feature = "json")]
200 NDJson {
201 options: &'a crate::prelude::NDJsonReadOptions,
202 },
203
204 #[cfg(feature = "parquet")]
205 Parquet {
206 options: &'a polars_io::prelude::ParquetOptions,
207 metadata: Option<usize>,
208 },
209
210 #[cfg(feature = "ipc")]
211 Ipc {
212 options: &'a polars_io::prelude::IpcScanOptions,
213 metadata: Option<usize>,
214 },
215
216 #[cfg(feature = "python")]
217 PythonDataset {
218 dataset_object: usize,
219 cached_ir: usize,
220 },
221
222 Anonymous {
223 options: &'a crate::dsl::AnonymousScanOptions,
224 function: usize,
225 },
226
227 #[expect(unused)]
229 Phantom(&'a ()),
230 }
231
232 impl<'a> From<&'a FileScan> for FileScanEqHashWrap<'a> {
233 fn from(value: &'a FileScan) -> Self {
234 match value {
235 #[cfg(feature = "csv")]
236 FileScan::Csv { options } => FileScanEqHashWrap::Csv { options },
237
238 #[cfg(feature = "json")]
239 FileScan::NDJson { options } => FileScanEqHashWrap::NDJson { options },
240
241 #[cfg(feature = "parquet")]
242 FileScan::Parquet { options, metadata } => FileScanEqHashWrap::Parquet {
243 options,
244 metadata: metadata.as_ref().map(arc_as_ptr),
245 },
246
247 #[cfg(feature = "ipc")]
248 FileScan::Ipc { options, metadata } => FileScanEqHashWrap::Ipc {
249 options,
250 metadata: metadata.as_ref().map(arc_as_ptr),
251 },
252
253 #[cfg(feature = "python")]
254 FileScan::PythonDataset {
255 dataset_object,
256 cached_ir,
257 } => FileScanEqHashWrap::PythonDataset {
258 dataset_object: arc_as_ptr(dataset_object),
259 cached_ir: arc_as_ptr(cached_ir),
260 },
261
262 FileScan::Anonymous { options, function } => FileScanEqHashWrap::Anonymous {
263 options,
264 function: arc_as_ptr(function),
265 },
266 }
267 }
268 }
269
270 fn arc_as_ptr<T: ?Sized>(arc: &Arc<T>) -> usize {
271 Arc::as_ptr(arc) as *const () as usize
272 }
273}