polars_plan/dsl/file_scan/
mod.rs

1use std::hash::Hash;
2use std::sync::Mutex;
3
4use polars_io::cloud::CloudOptions;
5#[cfg(feature = "csv")]
6use polars_io::csv::read::CsvReadOptions;
7#[cfg(feature = "ipc")]
8use polars_io::ipc::IpcScanOptions;
9#[cfg(feature = "parquet")]
10use polars_io::parquet::metadata::FileMetadataRef;
11#[cfg(feature = "parquet")]
12use polars_io::parquet::read::ParquetOptions;
13use polars_io::{HiveOptions, RowIndex};
14use polars_utils::slice_enum::Slice;
15#[cfg(feature = "serde")]
16use serde::{Deserialize, Serialize};
17use strum_macros::IntoStaticStr;
18
19use super::*;
20
21#[cfg(feature = "python")]
22pub mod python_dataset;
23#[cfg(feature = "python")]
24pub use python_dataset::{DATASET_PROVIDER_VTABLE, PythonDatasetProviderVTable};
25
26bitflags::bitflags! {
27    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
28    pub struct ScanFlags : u32 {
29        const SPECIALIZED_PREDICATE_FILTER = 0x01;
30    }
31}
32
33#[derive(Clone, Debug, IntoStaticStr)]
34#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
35// TODO: Arc<> some of the options and the cloud options.
36pub enum FileScan {
37    #[cfg(feature = "csv")]
38    Csv { options: CsvReadOptions },
39
40    #[cfg(feature = "json")]
41    NDJson { options: NDJsonReadOptions },
42
43    #[cfg(feature = "parquet")]
44    Parquet {
45        options: ParquetOptions,
46        #[cfg_attr(feature = "serde", serde(skip))]
47        metadata: Option<FileMetadataRef>,
48    },
49
50    #[cfg(feature = "ipc")]
51    Ipc {
52        options: IpcScanOptions,
53        #[cfg_attr(feature = "serde", serde(skip))]
54        metadata: Option<Arc<arrow::io::ipc::read::FileMetadata>>,
55    },
56
57    #[cfg(feature = "python")]
58    PythonDataset {
59        dataset_object: Arc<python_dataset::PythonDatasetProvider>,
60
61        #[cfg_attr(feature = "serde", serde(skip, default))]
62        cached_ir: Arc<Mutex<Option<ExpandedDataset>>>,
63    },
64
65    #[cfg_attr(feature = "serde", serde(skip))]
66    Anonymous {
67        options: Arc<AnonymousScanOptions>,
68        function: Arc<dyn AnonymousScan>,
69    },
70}
71
72impl FileScan {
73    pub fn flags(&self) -> ScanFlags {
74        match self {
75            #[cfg(feature = "csv")]
76            Self::Csv { .. } => ScanFlags::empty(),
77            #[cfg(feature = "ipc")]
78            Self::Ipc { .. } => ScanFlags::empty(),
79            #[cfg(feature = "parquet")]
80            Self::Parquet { .. } => ScanFlags::SPECIALIZED_PREDICATE_FILTER,
81            #[cfg(feature = "json")]
82            Self::NDJson { .. } => ScanFlags::empty(),
83            #[allow(unreachable_patterns)]
84            _ => ScanFlags::empty(),
85        }
86    }
87
88    pub(crate) fn sort_projection(&self, _has_row_index: bool) -> bool {
89        match self {
90            #[cfg(feature = "csv")]
91            Self::Csv { .. } => true,
92            #[cfg(feature = "ipc")]
93            Self::Ipc { .. } => _has_row_index,
94            #[cfg(feature = "parquet")]
95            Self::Parquet { .. } => false,
96            #[allow(unreachable_patterns)]
97            _ => false,
98        }
99    }
100
101    pub fn streamable(&self) -> bool {
102        match self {
103            #[cfg(feature = "csv")]
104            Self::Csv { .. } => true,
105            #[cfg(feature = "ipc")]
106            Self::Ipc { .. } => false,
107            #[cfg(feature = "parquet")]
108            Self::Parquet { .. } => true,
109            #[cfg(feature = "json")]
110            Self::NDJson { .. } => false,
111            #[allow(unreachable_patterns)]
112            _ => false,
113        }
114    }
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
118#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
119pub enum MissingColumnsPolicy {
120    #[default]
121    Raise,
122    /// Inserts full-NULL columns for the missing ones.
123    Insert,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
127#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
128pub enum CastColumnsPolicy {
129    /// Raise an error if the datatypes do not match
130    #[default]
131    ErrorOnMismatch,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
135#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
136pub enum ExtraColumnsPolicy {
137    /// Error if there are extra columns outside the target schema.
138    #[default]
139    Raise,
140    Ignore,
141}
142
143/// Scan arguments shared across different scan types.
144#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
145#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
146pub struct UnifiedScanArgs {
147    /// User-provided schema of the file. Will be inferred during IR conversion
148    /// if None.
149    pub schema: Option<SchemaRef>,
150    pub cloud_options: Option<CloudOptions>,
151    pub hive_options: HiveOptions,
152
153    pub rechunk: bool,
154    pub cache: bool,
155    pub glob: bool,
156
157    pub projection: Option<Arc<[PlSmallStr]>>,
158    pub row_index: Option<RowIndex>,
159    /// Slice applied before predicates
160    pub pre_slice: Option<Slice>,
161
162    pub cast_columns_policy: CastColumnsPolicy,
163    pub missing_columns_policy: MissingColumnsPolicy,
164    pub include_file_paths: Option<PlSmallStr>,
165}
166
167/// Manual impls of Eq/Hash, as some fields are `Arc<T>` where T does not have Eq/Hash. For these
168/// fields we compare the pointer addresses instead.
169mod _file_scan_eq_hash {
170    use std::hash::{Hash, Hasher};
171    use std::sync::Arc;
172
173    use super::FileScan;
174
175    impl PartialEq for FileScan {
176        fn eq(&self, other: &Self) -> bool {
177            FileScanEqHashWrap::from(self) == FileScanEqHashWrap::from(other)
178        }
179    }
180
181    impl Eq for FileScan {}
182
183    impl Hash for FileScan {
184        fn hash<H: Hasher>(&self, state: &mut H) {
185            FileScanEqHashWrap::from(self).hash(state)
186        }
187    }
188
189    /// # Hash / Eq safety
190    /// * All usizes originate from `Arc<>`s, and the lifetime of this enum is bound to that of the
191    ///   input ref.
192    #[derive(PartialEq, Hash)]
193    pub enum FileScanEqHashWrap<'a> {
194        #[cfg(feature = "csv")]
195        Csv {
196            options: &'a polars_io::csv::read::CsvReadOptions,
197        },
198
199        #[cfg(feature = "json")]
200        NDJson {
201            options: &'a crate::prelude::NDJsonReadOptions,
202        },
203
204        #[cfg(feature = "parquet")]
205        Parquet {
206            options: &'a polars_io::prelude::ParquetOptions,
207            metadata: Option<usize>,
208        },
209
210        #[cfg(feature = "ipc")]
211        Ipc {
212            options: &'a polars_io::prelude::IpcScanOptions,
213            metadata: Option<usize>,
214        },
215
216        #[cfg(feature = "python")]
217        PythonDataset {
218            dataset_object: usize,
219            cached_ir: usize,
220        },
221
222        Anonymous {
223            options: &'a crate::dsl::AnonymousScanOptions,
224            function: usize,
225        },
226
227        /// Variant to ensure the lifetime is used regardless of feature gate combination.
228        #[expect(unused)]
229        Phantom(&'a ()),
230    }
231
232    impl<'a> From<&'a FileScan> for FileScanEqHashWrap<'a> {
233        fn from(value: &'a FileScan) -> Self {
234            match value {
235                #[cfg(feature = "csv")]
236                FileScan::Csv { options } => FileScanEqHashWrap::Csv { options },
237
238                #[cfg(feature = "json")]
239                FileScan::NDJson { options } => FileScanEqHashWrap::NDJson { options },
240
241                #[cfg(feature = "parquet")]
242                FileScan::Parquet { options, metadata } => FileScanEqHashWrap::Parquet {
243                    options,
244                    metadata: metadata.as_ref().map(arc_as_ptr),
245                },
246
247                #[cfg(feature = "ipc")]
248                FileScan::Ipc { options, metadata } => FileScanEqHashWrap::Ipc {
249                    options,
250                    metadata: metadata.as_ref().map(arc_as_ptr),
251                },
252
253                #[cfg(feature = "python")]
254                FileScan::PythonDataset {
255                    dataset_object,
256                    cached_ir,
257                } => FileScanEqHashWrap::PythonDataset {
258                    dataset_object: arc_as_ptr(dataset_object),
259                    cached_ir: arc_as_ptr(cached_ir),
260                },
261
262                FileScan::Anonymous { options, function } => FileScanEqHashWrap::Anonymous {
263                    options,
264                    function: arc_as_ptr(function),
265                },
266            }
267        }
268    }
269
270    fn arc_as_ptr<T: ?Sized>(arc: &Arc<T>) -> usize {
271        Arc::as_ptr(arc) as *const () as usize
272    }
273}