polars_plan/dsl/
builder_dsl.rs

1use std::sync::Arc;
2
3use polars_core::prelude::*;
4#[cfg(feature = "csv")]
5use polars_io::csv::read::CsvReadOptions;
6#[cfg(feature = "ipc")]
7use polars_io::ipc::IpcScanOptions;
8#[cfg(feature = "parquet")]
9use polars_io::parquet::read::ParquetOptions;
10
11#[cfg(feature = "python")]
12use crate::dsl::python_dsl::PythonFunction;
13use crate::prelude::*;
14
15pub struct DslBuilder(pub DslPlan);
16
17impl From<DslPlan> for DslBuilder {
18    fn from(lp: DslPlan) -> Self {
19        DslBuilder(lp)
20    }
21}
22
23impl DslBuilder {
24    pub fn anonymous_scan(
25        function: Arc<dyn AnonymousScan>,
26        options: AnonymousScanOptions,
27        unified_scan_args: UnifiedScanArgs,
28    ) -> PolarsResult<Self> {
29        let schema = unified_scan_args.schema.clone().ok_or_else(|| {
30            polars_err!(
31                ComputeError:
32                "anonymous scan requires schema to be specified in unified_scan_args"
33            )
34        })?;
35
36        Ok(DslPlan::Scan {
37            sources: ScanSources::default(),
38            file_info: Some(FileInfo {
39                schema: schema.clone(),
40                reader_schema: Some(either::Either::Right(schema)),
41                ..Default::default()
42            }),
43            unified_scan_args: Box::new(unified_scan_args),
44            scan_type: Box::new(FileScan::Anonymous {
45                function,
46                options: Arc::new(options),
47            }),
48            cached_ir: Default::default(),
49        }
50        .into())
51    }
52
53    #[cfg(feature = "parquet")]
54    #[allow(clippy::too_many_arguments)]
55    pub fn scan_parquet(
56        sources: ScanSources,
57        options: ParquetOptions,
58        unified_scan_args: UnifiedScanArgs,
59    ) -> PolarsResult<Self> {
60        Ok(DslPlan::Scan {
61            sources,
62            file_info: None,
63            unified_scan_args: Box::new(unified_scan_args),
64            scan_type: Box::new(FileScan::Parquet {
65                options,
66                metadata: None,
67            }),
68            cached_ir: Default::default(),
69        }
70        .into())
71    }
72
73    #[cfg(feature = "ipc")]
74    #[allow(clippy::too_many_arguments)]
75    pub fn scan_ipc(
76        sources: ScanSources,
77        options: IpcScanOptions,
78        unified_scan_args: UnifiedScanArgs,
79    ) -> PolarsResult<Self> {
80        Ok(DslPlan::Scan {
81            sources,
82            file_info: None,
83            unified_scan_args: Box::new(unified_scan_args),
84            scan_type: Box::new(FileScan::Ipc {
85                options,
86                metadata: None,
87            }),
88            cached_ir: Default::default(),
89        }
90        .into())
91    }
92
93    #[allow(clippy::too_many_arguments)]
94    #[cfg(feature = "csv")]
95    pub fn scan_csv(
96        sources: ScanSources,
97        options: CsvReadOptions,
98        unified_scan_args: UnifiedScanArgs,
99    ) -> PolarsResult<Self> {
100        Ok(DslPlan::Scan {
101            sources,
102            file_info: None,
103            unified_scan_args: Box::new(unified_scan_args),
104            scan_type: Box::new(FileScan::Csv { options }),
105            cached_ir: Default::default(),
106        }
107        .into())
108    }
109
110    #[cfg(feature = "python")]
111    pub fn scan_python_dataset(
112        dataset_object: polars_utils::python_function::PythonObject,
113    ) -> DslBuilder {
114        use super::python_dataset::PythonDatasetProvider;
115
116        DslPlan::Scan {
117            sources: ScanSources::default(),
118            file_info: None,
119            unified_scan_args: Default::default(),
120            scan_type: Box::new(FileScan::PythonDataset {
121                dataset_object: Arc::new(PythonDatasetProvider::new(dataset_object)),
122                cached_ir: Default::default(),
123            }),
124            cached_ir: Default::default(),
125        }
126        .into()
127    }
128
129    pub fn cache(self) -> Self {
130        let input = Arc::new(self.0);
131        let id = input.as_ref() as *const DslPlan as usize;
132        DslPlan::Cache { input, id }.into()
133    }
134
135    pub fn drop(self, to_drop: Vec<Selector>, strict: bool) -> Self {
136        self.map_private(DslFunction::Drop(DropFunction { to_drop, strict }))
137    }
138
139    pub fn project(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
140        DslPlan::Select {
141            expr: exprs,
142            input: Arc::new(self.0),
143            options,
144        }
145        .into()
146    }
147
148    pub fn fill_null(self, fill_value: Expr) -> Self {
149        self.project(
150            vec![all().fill_null(fill_value)],
151            ProjectionOptions {
152                duplicate_check: false,
153                ..Default::default()
154            },
155        )
156    }
157
158    pub fn drop_nans(self, subset: Option<Vec<Expr>>) -> Self {
159        let is_nan = match subset {
160            Some(subset) if subset.is_empty() => return self,
161            Some(subset) => subset.into_iter().map(Expr::is_nan).collect(),
162            None => vec![dtype_cols([DataType::Float32, DataType::Float64]).is_nan()],
163        };
164        self.remove(any_horizontal(is_nan).unwrap())
165    }
166
167    pub fn drop_nulls(self, subset: Option<Vec<Expr>>) -> Self {
168        let is_not_null = match subset {
169            Some(subset) if subset.is_empty() => return self,
170            Some(subset) => subset.into_iter().map(Expr::is_not_null).collect(),
171            None => vec![all().is_not_null()],
172        };
173        self.filter(all_horizontal(is_not_null).unwrap())
174    }
175
176    pub fn fill_nan(self, fill_value: Expr) -> Self {
177        self.map_private(DslFunction::FillNan(fill_value))
178    }
179
180    pub fn with_columns(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
181        if exprs.is_empty() {
182            return self;
183        }
184
185        DslPlan::HStack {
186            input: Arc::new(self.0),
187            exprs,
188            options,
189        }
190        .into()
191    }
192
193    pub fn match_to_schema(
194        self,
195        match_schema: SchemaRef,
196        per_column: Arc<[MatchToSchemaPerColumn]>,
197        extra_columns: ExtraColumnsPolicy,
198    ) -> Self {
199        DslPlan::MatchToSchema {
200            input: Arc::new(self.0),
201            match_schema,
202            per_column,
203            extra_columns,
204        }
205        .into()
206    }
207
208    pub fn with_context(self, contexts: Vec<DslPlan>) -> Self {
209        DslPlan::ExtContext {
210            input: Arc::new(self.0),
211            contexts,
212        }
213        .into()
214    }
215
216    /// Apply a filter predicate, keeping the rows that match it.
217    pub fn filter(self, predicate: Expr) -> Self {
218        DslPlan::Filter {
219            predicate,
220            input: Arc::new(self.0),
221        }
222        .into()
223    }
224
225    /// Remove rows matching a filter predicate (note that rows
226    /// where the predicate resolves to `null` are *not* removed).
227    pub fn remove(self, predicate: Expr) -> Self {
228        DslPlan::Filter {
229            predicate: predicate.neq_missing(lit(true)),
230            input: Arc::new(self.0),
231        }
232        .into()
233    }
234
235    pub fn group_by<E: AsRef<[Expr]>>(
236        self,
237        keys: Vec<Expr>,
238        aggs: E,
239        apply: Option<(Arc<dyn DataFrameUdf>, SchemaRef)>,
240        maintain_order: bool,
241        #[cfg(feature = "dynamic_group_by")] dynamic_options: Option<DynamicGroupOptions>,
242        #[cfg(feature = "dynamic_group_by")] rolling_options: Option<RollingGroupOptions>,
243    ) -> Self {
244        let aggs = aggs.as_ref().to_vec();
245        let options = GroupbyOptions {
246            #[cfg(feature = "dynamic_group_by")]
247            dynamic: dynamic_options,
248            #[cfg(feature = "dynamic_group_by")]
249            rolling: rolling_options,
250            slice: None,
251        };
252
253        DslPlan::GroupBy {
254            input: Arc::new(self.0),
255            keys,
256            aggs,
257            apply,
258            maintain_order,
259            options: Arc::new(options),
260        }
261        .into()
262    }
263
264    pub fn build(self) -> DslPlan {
265        self.0
266    }
267
268    pub fn from_existing_df(df: DataFrame) -> Self {
269        let schema = df.schema().clone();
270        DslPlan::DataFrameScan {
271            df: Arc::new(df),
272            schema,
273        }
274        .into()
275    }
276
277    pub fn sort(self, by_column: Vec<Expr>, sort_options: SortMultipleOptions) -> Self {
278        DslPlan::Sort {
279            input: Arc::new(self.0),
280            by_column,
281            slice: None,
282            sort_options,
283        }
284        .into()
285    }
286
287    pub fn explode(self, columns: Vec<Selector>, allow_empty: bool) -> Self {
288        DslPlan::MapFunction {
289            input: Arc::new(self.0),
290            function: DslFunction::Explode {
291                columns,
292                allow_empty,
293            },
294        }
295        .into()
296    }
297
298    #[cfg(feature = "pivot")]
299    pub fn unpivot(self, args: UnpivotArgsDSL) -> Self {
300        DslPlan::MapFunction {
301            input: Arc::new(self.0),
302            function: DslFunction::Unpivot { args },
303        }
304        .into()
305    }
306
307    pub fn row_index(self, name: PlSmallStr, offset: Option<IdxSize>) -> Self {
308        DslPlan::MapFunction {
309            input: Arc::new(self.0),
310            function: DslFunction::RowIndex { name, offset },
311        }
312        .into()
313    }
314
315    pub fn distinct(self, options: DistinctOptionsDSL) -> Self {
316        DslPlan::Distinct {
317            input: Arc::new(self.0),
318            options,
319        }
320        .into()
321    }
322
323    pub fn slice(self, offset: i64, len: IdxSize) -> Self {
324        DslPlan::Slice {
325            input: Arc::new(self.0),
326            offset,
327            len,
328        }
329        .into()
330    }
331
332    pub fn join(
333        self,
334        other: DslPlan,
335        left_on: Vec<Expr>,
336        right_on: Vec<Expr>,
337        options: Arc<JoinOptions>,
338    ) -> Self {
339        DslPlan::Join {
340            input_left: Arc::new(self.0),
341            input_right: Arc::new(other),
342            left_on,
343            right_on,
344            predicates: Default::default(),
345            options,
346        }
347        .into()
348    }
349    pub fn map_private(self, function: DslFunction) -> Self {
350        DslPlan::MapFunction {
351            input: Arc::new(self.0),
352            function,
353        }
354        .into()
355    }
356
357    #[cfg(feature = "python")]
358    pub fn map_python(
359        self,
360        function: PythonFunction,
361        optimizations: AllowedOptimizations,
362        schema: Option<SchemaRef>,
363        validate_output: bool,
364    ) -> Self {
365        DslPlan::MapFunction {
366            input: Arc::new(self.0),
367            function: DslFunction::OpaquePython(OpaquePythonUdf {
368                function,
369                schema,
370                predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
371                projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
372                streamable: optimizations.contains(OptFlags::STREAMING),
373                validate_output,
374            }),
375        }
376        .into()
377    }
378
379    pub fn map<F>(
380        self,
381        function: F,
382        optimizations: AllowedOptimizations,
383        schema: Option<Arc<dyn UdfSchema>>,
384        name: PlSmallStr,
385    ) -> Self
386    where
387        F: DataFrameUdf + 'static,
388    {
389        let function = Arc::new(function);
390
391        DslPlan::MapFunction {
392            input: Arc::new(self.0),
393            function: DslFunction::FunctionIR(FunctionIR::Opaque {
394                function,
395                schema,
396                predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
397                projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
398                streamable: optimizations.contains(OptFlags::STREAMING),
399                fmt_str: name,
400            }),
401        }
402        .into()
403    }
404}