1use std::sync::Arc;
2
3use polars_core::prelude::*;
4#[cfg(feature = "csv")]
5use polars_io::csv::read::CsvReadOptions;
6#[cfg(feature = "ipc")]
7use polars_io::ipc::IpcScanOptions;
8#[cfg(feature = "parquet")]
9use polars_io::parquet::read::ParquetOptions;
10
11#[cfg(feature = "python")]
12use crate::dsl::python_dsl::PythonFunction;
13use crate::prelude::*;
14
15pub struct DslBuilder(pub DslPlan);
16
17impl From<DslPlan> for DslBuilder {
18 fn from(lp: DslPlan) -> Self {
19 DslBuilder(lp)
20 }
21}
22
23impl DslBuilder {
24 pub fn anonymous_scan(
25 function: Arc<dyn AnonymousScan>,
26 options: AnonymousScanOptions,
27 unified_scan_args: UnifiedScanArgs,
28 ) -> PolarsResult<Self> {
29 let schema = unified_scan_args.schema.clone().ok_or_else(|| {
30 polars_err!(
31 ComputeError:
32 "anonymous scan requires schema to be specified in unified_scan_args"
33 )
34 })?;
35
36 Ok(DslPlan::Scan {
37 sources: ScanSources::default(),
38 file_info: Some(FileInfo {
39 schema: schema.clone(),
40 reader_schema: Some(either::Either::Right(schema)),
41 ..Default::default()
42 }),
43 unified_scan_args: Box::new(unified_scan_args),
44 scan_type: Box::new(FileScan::Anonymous {
45 function,
46 options: Arc::new(options),
47 }),
48 cached_ir: Default::default(),
49 }
50 .into())
51 }
52
53 #[cfg(feature = "parquet")]
54 #[allow(clippy::too_many_arguments)]
55 pub fn scan_parquet(
56 sources: ScanSources,
57 options: ParquetOptions,
58 unified_scan_args: UnifiedScanArgs,
59 ) -> PolarsResult<Self> {
60 Ok(DslPlan::Scan {
61 sources,
62 file_info: None,
63 unified_scan_args: Box::new(unified_scan_args),
64 scan_type: Box::new(FileScan::Parquet {
65 options,
66 metadata: None,
67 }),
68 cached_ir: Default::default(),
69 }
70 .into())
71 }
72
73 #[cfg(feature = "ipc")]
74 #[allow(clippy::too_many_arguments)]
75 pub fn scan_ipc(
76 sources: ScanSources,
77 options: IpcScanOptions,
78 unified_scan_args: UnifiedScanArgs,
79 ) -> PolarsResult<Self> {
80 Ok(DslPlan::Scan {
81 sources,
82 file_info: None,
83 unified_scan_args: Box::new(unified_scan_args),
84 scan_type: Box::new(FileScan::Ipc {
85 options,
86 metadata: None,
87 }),
88 cached_ir: Default::default(),
89 }
90 .into())
91 }
92
93 #[allow(clippy::too_many_arguments)]
94 #[cfg(feature = "csv")]
95 pub fn scan_csv(
96 sources: ScanSources,
97 options: CsvReadOptions,
98 unified_scan_args: UnifiedScanArgs,
99 ) -> PolarsResult<Self> {
100 Ok(DslPlan::Scan {
101 sources,
102 file_info: None,
103 unified_scan_args: Box::new(unified_scan_args),
104 scan_type: Box::new(FileScan::Csv { options }),
105 cached_ir: Default::default(),
106 }
107 .into())
108 }
109
110 #[cfg(feature = "python")]
111 pub fn scan_python_dataset(
112 dataset_object: polars_utils::python_function::PythonObject,
113 ) -> DslBuilder {
114 use super::python_dataset::PythonDatasetProvider;
115
116 DslPlan::Scan {
117 sources: ScanSources::default(),
118 file_info: None,
119 unified_scan_args: Default::default(),
120 scan_type: Box::new(FileScan::PythonDataset {
121 dataset_object: Arc::new(PythonDatasetProvider::new(dataset_object)),
122 cached_ir: Default::default(),
123 }),
124 cached_ir: Default::default(),
125 }
126 .into()
127 }
128
129 pub fn cache(self) -> Self {
130 let input = Arc::new(self.0);
131 let id = input.as_ref() as *const DslPlan as usize;
132 DslPlan::Cache { input, id }.into()
133 }
134
135 pub fn drop(self, to_drop: Vec<Selector>, strict: bool) -> Self {
136 self.map_private(DslFunction::Drop(DropFunction { to_drop, strict }))
137 }
138
139 pub fn project(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
140 DslPlan::Select {
141 expr: exprs,
142 input: Arc::new(self.0),
143 options,
144 }
145 .into()
146 }
147
148 pub fn fill_null(self, fill_value: Expr) -> Self {
149 self.project(
150 vec![all().fill_null(fill_value)],
151 ProjectionOptions {
152 duplicate_check: false,
153 ..Default::default()
154 },
155 )
156 }
157
158 pub fn drop_nans(self, subset: Option<Vec<Expr>>) -> Self {
159 let is_nan = match subset {
160 Some(subset) if subset.is_empty() => return self,
161 Some(subset) => subset.into_iter().map(Expr::is_nan).collect(),
162 None => vec![dtype_cols([DataType::Float32, DataType::Float64]).is_nan()],
163 };
164 self.remove(any_horizontal(is_nan).unwrap())
165 }
166
167 pub fn drop_nulls(self, subset: Option<Vec<Expr>>) -> Self {
168 let is_not_null = match subset {
169 Some(subset) if subset.is_empty() => return self,
170 Some(subset) => subset.into_iter().map(Expr::is_not_null).collect(),
171 None => vec![all().is_not_null()],
172 };
173 self.filter(all_horizontal(is_not_null).unwrap())
174 }
175
176 pub fn fill_nan(self, fill_value: Expr) -> Self {
177 self.map_private(DslFunction::FillNan(fill_value))
178 }
179
180 pub fn with_columns(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
181 if exprs.is_empty() {
182 return self;
183 }
184
185 DslPlan::HStack {
186 input: Arc::new(self.0),
187 exprs,
188 options,
189 }
190 .into()
191 }
192
193 pub fn match_to_schema(
194 self,
195 match_schema: SchemaRef,
196 per_column: Arc<[MatchToSchemaPerColumn]>,
197 extra_columns: ExtraColumnsPolicy,
198 ) -> Self {
199 DslPlan::MatchToSchema {
200 input: Arc::new(self.0),
201 match_schema,
202 per_column,
203 extra_columns,
204 }
205 .into()
206 }
207
208 pub fn with_context(self, contexts: Vec<DslPlan>) -> Self {
209 DslPlan::ExtContext {
210 input: Arc::new(self.0),
211 contexts,
212 }
213 .into()
214 }
215
216 pub fn filter(self, predicate: Expr) -> Self {
218 DslPlan::Filter {
219 predicate,
220 input: Arc::new(self.0),
221 }
222 .into()
223 }
224
225 pub fn remove(self, predicate: Expr) -> Self {
228 DslPlan::Filter {
229 predicate: predicate.neq_missing(lit(true)),
230 input: Arc::new(self.0),
231 }
232 .into()
233 }
234
235 pub fn group_by<E: AsRef<[Expr]>>(
236 self,
237 keys: Vec<Expr>,
238 aggs: E,
239 apply: Option<(Arc<dyn DataFrameUdf>, SchemaRef)>,
240 maintain_order: bool,
241 #[cfg(feature = "dynamic_group_by")] dynamic_options: Option<DynamicGroupOptions>,
242 #[cfg(feature = "dynamic_group_by")] rolling_options: Option<RollingGroupOptions>,
243 ) -> Self {
244 let aggs = aggs.as_ref().to_vec();
245 let options = GroupbyOptions {
246 #[cfg(feature = "dynamic_group_by")]
247 dynamic: dynamic_options,
248 #[cfg(feature = "dynamic_group_by")]
249 rolling: rolling_options,
250 slice: None,
251 };
252
253 DslPlan::GroupBy {
254 input: Arc::new(self.0),
255 keys,
256 aggs,
257 apply,
258 maintain_order,
259 options: Arc::new(options),
260 }
261 .into()
262 }
263
264 pub fn build(self) -> DslPlan {
265 self.0
266 }
267
268 pub fn from_existing_df(df: DataFrame) -> Self {
269 let schema = df.schema().clone();
270 DslPlan::DataFrameScan {
271 df: Arc::new(df),
272 schema,
273 }
274 .into()
275 }
276
277 pub fn sort(self, by_column: Vec<Expr>, sort_options: SortMultipleOptions) -> Self {
278 DslPlan::Sort {
279 input: Arc::new(self.0),
280 by_column,
281 slice: None,
282 sort_options,
283 }
284 .into()
285 }
286
287 pub fn explode(self, columns: Vec<Selector>, allow_empty: bool) -> Self {
288 DslPlan::MapFunction {
289 input: Arc::new(self.0),
290 function: DslFunction::Explode {
291 columns,
292 allow_empty,
293 },
294 }
295 .into()
296 }
297
298 #[cfg(feature = "pivot")]
299 pub fn unpivot(self, args: UnpivotArgsDSL) -> Self {
300 DslPlan::MapFunction {
301 input: Arc::new(self.0),
302 function: DslFunction::Unpivot { args },
303 }
304 .into()
305 }
306
307 pub fn row_index(self, name: PlSmallStr, offset: Option<IdxSize>) -> Self {
308 DslPlan::MapFunction {
309 input: Arc::new(self.0),
310 function: DslFunction::RowIndex { name, offset },
311 }
312 .into()
313 }
314
315 pub fn distinct(self, options: DistinctOptionsDSL) -> Self {
316 DslPlan::Distinct {
317 input: Arc::new(self.0),
318 options,
319 }
320 .into()
321 }
322
323 pub fn slice(self, offset: i64, len: IdxSize) -> Self {
324 DslPlan::Slice {
325 input: Arc::new(self.0),
326 offset,
327 len,
328 }
329 .into()
330 }
331
332 pub fn join(
333 self,
334 other: DslPlan,
335 left_on: Vec<Expr>,
336 right_on: Vec<Expr>,
337 options: Arc<JoinOptions>,
338 ) -> Self {
339 DslPlan::Join {
340 input_left: Arc::new(self.0),
341 input_right: Arc::new(other),
342 left_on,
343 right_on,
344 predicates: Default::default(),
345 options,
346 }
347 .into()
348 }
349 pub fn map_private(self, function: DslFunction) -> Self {
350 DslPlan::MapFunction {
351 input: Arc::new(self.0),
352 function,
353 }
354 .into()
355 }
356
357 #[cfg(feature = "python")]
358 pub fn map_python(
359 self,
360 function: PythonFunction,
361 optimizations: AllowedOptimizations,
362 schema: Option<SchemaRef>,
363 validate_output: bool,
364 ) -> Self {
365 DslPlan::MapFunction {
366 input: Arc::new(self.0),
367 function: DslFunction::OpaquePython(OpaquePythonUdf {
368 function,
369 schema,
370 predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
371 projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
372 streamable: optimizations.contains(OptFlags::STREAMING),
373 validate_output,
374 }),
375 }
376 .into()
377 }
378
379 pub fn map<F>(
380 self,
381 function: F,
382 optimizations: AllowedOptimizations,
383 schema: Option<Arc<dyn UdfSchema>>,
384 name: PlSmallStr,
385 ) -> Self
386 where
387 F: DataFrameUdf + 'static,
388 {
389 let function = Arc::new(function);
390
391 DslPlan::MapFunction {
392 input: Arc::new(self.0),
393 function: DslFunction::FunctionIR(FunctionIR::Opaque {
394 function,
395 schema,
396 predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
397 projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
398 streamable: optimizations.contains(OptFlags::STREAMING),
399 fmt_str: name,
400 }),
401 }
402 .into()
403 }
404}