Skip to main content

polars_python/lazyframe/visitor/
nodes.rs

1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars::prelude::deletion::DeletionFilesList;
4use polars::prelude::python_dsl::PythonScanSource;
5use polars::prelude::{ColumnMapping, PredicateFileSkip};
6use polars_core::prelude::IdxSize;
7use polars_io::cloud::CloudOptions;
8use polars_ops::prelude::JoinType;
9use polars_plan::plans::{HintIR, IR};
10use polars_plan::prelude::{FileScanIR, FunctionIR, PythonPredicate, UnifiedScanArgs};
11use pyo3::IntoPyObjectExt;
12use pyo3::exceptions::{PyNotImplementedError, PyValueError};
13use pyo3::prelude::*;
14use pyo3::types::{PyDict, PyList, PyString};
15
16use super::expr_nodes::PyGroupbyOptions;
17use crate::PyDataFrame;
18use crate::lazyframe::visit::PyExprIR;
19
20fn scan_type_to_pyobject(
21    py: Python<'_>,
22    scan_type: &FileScanIR,
23    cloud_options: &Option<CloudOptions>,
24) -> PyResult<Py<PyAny>> {
25    match scan_type {
26        #[cfg(feature = "csv")]
27        FileScanIR::Csv { options } => {
28            let options = serde_json::to_string(options)
29                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30            let cloud_options = serde_json::to_string(cloud_options)
31                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
32            Ok(("csv", options, cloud_options).into_py_any(py)?)
33        },
34        #[cfg(feature = "parquet")]
35        FileScanIR::Parquet { options, .. } => {
36            let options = serde_json::to_string(options)
37                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38            let cloud_options = serde_json::to_string(cloud_options)
39                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
40            Ok(("parquet", options, cloud_options).into_py_any(py)?)
41        },
42        #[cfg(feature = "ipc")]
43        FileScanIR::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
44        #[cfg(feature = "json")]
45        FileScanIR::NDJson { options, .. } => {
46            let options = serde_json::to_string(options)
47                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
48            Ok(("ndjson", options).into_py_any(py)?)
49        },
50        #[cfg(feature = "scan_lines")]
51        FileScanIR::Lines { name } => Ok(("lines", name.as_str()).into_py_any(py)?),
52        FileScanIR::PythonDataset { .. } => {
53            Err(PyNotImplementedError::new_err("python dataset scan"))
54        },
55        FileScanIR::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
56    }
57}
58
59#[pyclass(frozen)]
60/// Scan a table with an optional predicate from a python function
61pub struct PythonScan {
62    #[pyo3(get)]
63    options: Py<PyAny>,
64}
65
66#[pyclass(frozen)]
67/// Slice the table
68pub struct Slice {
69    #[pyo3(get)]
70    input: usize,
71    #[pyo3(get)]
72    offset: i64,
73    #[pyo3(get)]
74    len: IdxSize,
75}
76
77#[pyclass(frozen)]
78/// Filter the table with a boolean expression
79pub struct Filter {
80    #[pyo3(get)]
81    input: usize,
82    #[pyo3(get)]
83    predicate: PyExprIR,
84}
85
86#[pyclass(frozen)]
87#[derive(Clone)]
88pub struct PyFileOptions {
89    inner: UnifiedScanArgs,
90}
91
92#[pymethods]
93impl PyFileOptions {
94    #[getter]
95    fn n_rows(&self) -> Option<(i64, usize)> {
96        self.inner
97            .pre_slice
98            .clone()
99            .map(|slice| <(i64, usize)>::try_from(slice).unwrap())
100    }
101    #[getter]
102    fn with_columns(&self) -> Option<Vec<&str>> {
103        self.inner
104            .projection
105            .as_ref()?
106            .iter()
107            .map(|x| x.as_str())
108            .collect::<Vec<_>>()
109            .into()
110    }
111    #[getter]
112    fn cache(&self, _py: Python<'_>) -> bool {
113        self.inner.cache
114    }
115    #[getter]
116    fn row_index(&self) -> Option<(&str, IdxSize)> {
117        self.inner
118            .row_index
119            .as_ref()
120            .map(|n| (n.name.as_str(), n.offset))
121    }
122    #[getter]
123    fn rechunk(&self, _py: Python<'_>) -> bool {
124        self.inner.rechunk
125    }
126    #[getter]
127    fn hive_options(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {
128        Err(PyNotImplementedError::new_err("hive options"))
129    }
130    #[getter]
131    fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
132        self.inner.include_file_paths.as_deref()
133    }
134
135    /// One of:
136    /// * None
137    /// * ("iceberg-position-delete", dict[int, list[str]])
138    #[getter]
139    fn deletion_files(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
140        Ok(match &self.inner.deletion_files {
141            None => py.None().into_any(),
142
143            Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
144                let out = PyDict::new(py);
145
146                for (k, v) in paths.iter() {
147                    out.set_item(*k, v.as_ref())?;
148                }
149
150                ("iceberg-position-delete", out)
151                    .into_pyobject(py)?
152                    .into_any()
153                    .unbind()
154            },
155        })
156    }
157
158    /// One of:
159    /// * None
160    /// * ("iceberg-column-mapping", <unimplemented>)
161    #[getter]
162    fn column_mapping(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
163        Ok(match &self.inner.column_mapping {
164            None => py.None().into_any(),
165
166            Some(ColumnMapping::Iceberg { .. }) => unimplemented!(),
167        })
168    }
169}
170
171#[pyclass(frozen)]
172/// Scan a table from file
173pub struct Scan {
174    #[pyo3(get)]
175    paths: Py<PyAny>,
176    #[pyo3(get)]
177    file_info: Py<PyAny>,
178    #[pyo3(get)]
179    predicate: Option<PyExprIR>,
180    #[pyo3(get)]
181    file_options: PyFileOptions,
182    #[pyo3(get)]
183    scan_type: Py<PyAny>,
184}
185
186#[pyclass(frozen)]
187/// Scan a table from an existing dataframe
188pub struct DataFrameScan {
189    #[pyo3(get)]
190    df: PyDataFrame,
191    #[pyo3(get)]
192    projection: Py<PyAny>,
193    #[pyo3(get)]
194    selection: Option<PyExprIR>,
195}
196
197#[pyclass(frozen)]
198/// Project out columns from a table
199pub struct SimpleProjection {
200    #[pyo3(get)]
201    input: usize,
202}
203
204#[pyclass(frozen)]
205/// Column selection
206pub struct Select {
207    #[pyo3(get)]
208    input: usize,
209    #[pyo3(get)]
210    expr: Vec<PyExprIR>,
211    #[pyo3(get)]
212    should_broadcast: bool,
213}
214
215#[pyclass(frozen)]
216/// Sort the table
217pub struct Sort {
218    #[pyo3(get)]
219    input: usize,
220    #[pyo3(get)]
221    by_column: Vec<PyExprIR>,
222    #[pyo3(get)]
223    sort_options: (bool, Vec<bool>, Vec<bool>),
224    #[pyo3(get)]
225    slice: Option<(i64, usize)>,
226}
227
228#[pyclass(frozen)]
229/// Cache the input at this point in the LP
230pub struct Cache {
231    #[pyo3(get)]
232    input: usize,
233    #[pyo3(get)]
234    id_: u128,
235}
236
237#[pyclass(frozen)]
238/// Groupby aggregation
239pub struct GroupBy {
240    #[pyo3(get)]
241    input: usize,
242    #[pyo3(get)]
243    keys: Vec<PyExprIR>,
244    #[pyo3(get)]
245    aggs: Vec<PyExprIR>,
246    #[pyo3(get)]
247    apply: (),
248    #[pyo3(get)]
249    maintain_order: bool,
250    #[pyo3(get)]
251    options: Py<PyAny>,
252}
253
254#[pyclass(frozen)]
255/// Join operation
256pub struct Join {
257    #[pyo3(get)]
258    input_left: usize,
259    #[pyo3(get)]
260    input_right: usize,
261    #[pyo3(get)]
262    left_on: Vec<PyExprIR>,
263    #[pyo3(get)]
264    right_on: Vec<PyExprIR>,
265    #[pyo3(get)]
266    options: Py<PyAny>,
267}
268
269#[pyclass(frozen)]
270/// Merge sorted operation
271pub struct MergeSorted {
272    #[pyo3(get)]
273    input_left: usize,
274    #[pyo3(get)]
275    input_right: usize,
276    #[pyo3(get)]
277    key: String,
278}
279
280#[pyclass(frozen)]
281/// Adding columns to the table without a Join
282pub struct HStack {
283    #[pyo3(get)]
284    input: usize,
285    #[pyo3(get)]
286    exprs: Vec<PyExprIR>,
287    #[pyo3(get)]
288    should_broadcast: bool,
289}
290
291#[pyclass(frozen)]
292/// Like Select, but all operations produce a single row.
293pub struct Reduce {
294    #[pyo3(get)]
295    input: usize,
296    #[pyo3(get)]
297    exprs: Vec<PyExprIR>,
298}
299
300#[pyclass(frozen)]
301/// Remove duplicates from the table
302pub struct Distinct {
303    #[pyo3(get)]
304    input: usize,
305    #[pyo3(get)]
306    options: Py<PyAny>,
307}
308#[pyclass(frozen)]
309/// A (User Defined) Function
310pub struct MapFunction {
311    #[pyo3(get)]
312    input: usize,
313    #[pyo3(get)]
314    function: Py<PyAny>,
315}
316#[pyclass(frozen)]
317pub struct Union {
318    #[pyo3(get)]
319    inputs: Vec<usize>,
320    #[pyo3(get)]
321    options: Option<(i64, usize)>,
322}
323#[pyclass(frozen)]
324/// Horizontal concatenation of multiple plans
325pub struct HConcat {
326    #[pyo3(get)]
327    inputs: Vec<usize>,
328    #[pyo3(get)]
329    options: (),
330}
331#[pyclass(frozen)]
332/// This allows expressions to access other tables
333pub struct ExtContext {
334    #[pyo3(get)]
335    input: usize,
336    #[pyo3(get)]
337    contexts: Vec<usize>,
338}
339
340#[pyclass(frozen)]
341pub struct Sink {
342    #[pyo3(get)]
343    input: usize,
344    #[pyo3(get)]
345    payload: Py<PyAny>,
346}
347
348pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<Py<PyAny>> {
349    match plan {
350        IR::PythonScan { options } => {
351            let python_src = match options.python_source {
352                PythonScanSource::Pyarrow => "pyarrow",
353                PythonScanSource::Cuda => "cuda",
354                PythonScanSource::IOPlugin => "io_plugin",
355            };
356
357            PythonScan {
358                options: (
359                    options
360                        .scan_fn
361                        .as_ref()
362                        .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
363                    options.with_columns.as_ref().map_or_else(
364                        || Ok(py.None()),
365                        |cols| {
366                            cols.iter()
367                                .map(|x| x.as_str())
368                                .collect::<Vec<_>>()
369                                .into_py_any(py)
370                        },
371                    )?,
372                    python_src,
373                    match &options.predicate {
374                        PythonPredicate::None => py.None(),
375                        PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
376                        PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
377                    },
378                    options
379                        .n_rows
380                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
381                )
382                    .into_py_any(py)?,
383            }
384            .into_py_any(py)
385        },
386        IR::Slice { input, offset, len } => Slice {
387            input: input.0,
388            offset: *offset,
389            len: *len,
390        }
391        .into_py_any(py),
392        IR::Filter { input, predicate } => Filter {
393            input: input.0,
394            predicate: predicate.into(),
395        }
396        .into_py_any(py),
397        IR::Scan {
398            hive_parts: Some(_),
399            ..
400        } => Err(PyNotImplementedError::new_err(
401            "scan with hive partitioning",
402        )),
403        IR::Scan {
404            sources,
405            file_info: _,
406            hive_parts: _,
407            predicate,
408            predicate_file_skip_applied,
409            output_schema: _,
410            scan_type,
411            unified_scan_args,
412        } => {
413            Scan {
414                paths: {
415                    let paths = sources
416                        .into_paths()
417                        .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
418
419                    let out = PyList::new(py, [] as [(); 0])?;
420
421                    // Manual conversion to preserve `uri://...` - converting Rust `Path` to `PosixPath`
422                    // will corrupt to `uri:/...`
423                    for path in paths.iter() {
424                        out.append(path.as_str())?;
425                    }
426
427                    out.into_py_any(py)?
428                },
429                // TODO: file info
430                file_info: py.None(),
431                predicate: predicate
432                    .as_ref()
433                    .filter(|_| {
434                        !matches!(
435                            predicate_file_skip_applied,
436                            Some(PredicateFileSkip {
437                                no_residual_predicate: true,
438                                original_len: _,
439                            })
440                        )
441                    })
442                    .map(|e| e.into()),
443                file_options: PyFileOptions {
444                    inner: (**unified_scan_args).clone(),
445                },
446                scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
447            }
448        }
449        .into_py_any(py),
450        IR::DataFrameScan {
451            df,
452            schema: _,
453            output_schema,
454        } => DataFrameScan {
455            df: PyDataFrame::new((**df).clone()),
456            projection: output_schema.as_ref().map_or_else(
457                || Ok(py.None()),
458                |s| {
459                    s.iter_names()
460                        .map(|s| s.as_str())
461                        .collect::<Vec<_>>()
462                        .into_py_any(py)
463                },
464            )?,
465            selection: None,
466        }
467        .into_py_any(py),
468        IR::SimpleProjection { input, columns: _ } => {
469            SimpleProjection { input: input.0 }.into_py_any(py)
470        },
471        IR::Select {
472            input,
473            expr,
474            schema: _,
475            options,
476        } => Select {
477            expr: expr.iter().map(|e| e.into()).collect(),
478            input: input.0,
479            should_broadcast: options.should_broadcast,
480        }
481        .into_py_any(py),
482        IR::Sort {
483            input,
484            by_column,
485            slice,
486            sort_options,
487        } => Sort {
488            input: input.0,
489            by_column: by_column.iter().map(|e| e.into()).collect(),
490            sort_options: (
491                sort_options.maintain_order,
492                sort_options.nulls_last.clone(),
493                sort_options.descending.clone(),
494            ),
495            slice: *slice,
496        }
497        .into_py_any(py),
498        IR::Cache { input, id } => Cache {
499            input: input.0,
500            id_: id.as_u128(),
501        }
502        .into_py_any(py),
503        IR::GroupBy {
504            input,
505            keys,
506            aggs,
507            schema: _,
508            apply,
509            maintain_order,
510            options,
511        } => GroupBy {
512            input: input.0,
513            keys: keys.iter().map(|e| e.into()).collect(),
514            aggs: aggs.iter().map(|e| e.into()).collect(),
515            apply: apply.as_ref().map_or(Ok(()), |_| {
516                Err(PyNotImplementedError::new_err(format!(
517                    "apply inside GroupBy {plan:?}"
518                )))
519            })?,
520            maintain_order: *maintain_order,
521            options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
522        }
523        .into_py_any(py),
524        IR::Join {
525            input_left,
526            input_right,
527            schema: _,
528            left_on,
529            right_on,
530            options,
531        } => Join {
532            input_left: input_left.0,
533            input_right: input_right.0,
534            left_on: left_on.iter().map(|e| e.into()).collect(),
535            right_on: right_on.iter().map(|e| e.into()).collect(),
536            options: {
537                let how = &options.args.how;
538                let name = Into::<&str>::into(how).into_pyobject(py)?;
539                (
540                    match how {
541                        #[cfg(feature = "asof_join")]
542                        JoinType::AsOf(_) => {
543                            return Err(PyNotImplementedError::new_err("asof join"));
544                        },
545                        #[cfg(feature = "iejoin")]
546                        JoinType::IEJoin => {
547                            let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
548                            else {
549                                unreachable!()
550                            };
551                            (
552                                name,
553                                crate::Wrap(ie_options.operator1).into_py_any(py)?,
554                                ie_options.operator2.as_ref().map_or_else(
555                                    || Ok(py.None()),
556                                    |op| crate::Wrap(*op).into_py_any(py),
557                                )?,
558                            )
559                                .into_py_any(py)?
560                        },
561                        // This is a cross join fused with a predicate. Shown in the IR::explain as
562                        // NESTED LOOP JOIN
563                        JoinType::Cross if options.options.is_some() => {
564                            return Err(PyNotImplementedError::new_err("nested loop join"));
565                        },
566                        _ => name.into_any().unbind(),
567                    },
568                    options.args.nulls_equal,
569                    options.args.slice,
570                    options.args.suffix().as_str(),
571                    options.args.coalesce.coalesce(how),
572                    Into::<&str>::into(options.args.maintain_order),
573                )
574                    .into_py_any(py)?
575            },
576        }
577        .into_py_any(py),
578        IR::HStack {
579            input,
580            exprs,
581            schema: _,
582            options,
583        } => HStack {
584            input: input.0,
585            exprs: exprs.iter().map(|e| e.into()).collect(),
586            should_broadcast: options.should_broadcast,
587        }
588        .into_py_any(py),
589        IR::Distinct { input, options } => Distinct {
590            input: input.0,
591            options: (
592                Into::<&str>::into(options.keep_strategy),
593                options.subset.as_ref().map_or_else(
594                    || Ok(py.None()),
595                    |f| {
596                        f.iter()
597                            .map(|s| s.as_ref())
598                            .collect::<Vec<&str>>()
599                            .into_py_any(py)
600                    },
601                )?,
602                options.maintain_order,
603                options.slice,
604            )
605                .into_py_any(py)?,
606        }
607        .into_py_any(py),
608        IR::MapFunction { input, function } => MapFunction {
609            input: input.0,
610            function: match function {
611                FunctionIR::OpaquePython(_) => {
612                    return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
613                },
614                FunctionIR::Opaque {
615                    function: _,
616                    schema: _,
617                    predicate_pd: _,
618                    projection_pd: _,
619                    streamable: _,
620                    fmt_str: _,
621                } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
622                FunctionIR::Unnest { columns, separator } => (
623                    "unnest",
624                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
625                    separator.as_ref().map(|s| s.to_string()),
626                )
627                    .into_py_any(py)?,
628                FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
629                FunctionIR::Explode {
630                    columns,
631                    options,
632                    schema: _,
633                } => (
634                    "explode",
635                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
636                    options.empty_as_null,
637                    options.keep_nulls,
638                )
639                    .into_py_any(py)?,
640                #[cfg(feature = "pivot")]
641                FunctionIR::Unpivot { args, schema: _ } => (
642                    "unpivot",
643                    args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
644                    args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
645                    args.variable_name.as_str().into_py_any(py)?,
646                    args.value_name.as_str().into_py_any(py)?,
647                )
648                    .into_py_any(py)?,
649                FunctionIR::RowIndex {
650                    name,
651                    schema: _,
652                    offset,
653                } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
654                FunctionIR::FastCount {
655                    sources,
656                    scan_type,
657                    alias,
658                } => {
659                    let sources = sources
660                        .into_paths()
661                        .ok_or_else(|| {
662                            PyNotImplementedError::new_err("FastCount with BytesIO sources")
663                        })?
664                        .iter()
665                        .map(|p| p.as_str())
666                        .collect::<Vec<_>>()
667                        .into_py_any(py)?;
668
669                    // FastCount does not support cloud options.
670                    let cloud_options = None;
671
672                    let scan_type = scan_type_to_pyobject(py, scan_type, &cloud_options)?;
673
674                    let alias = alias
675                        .as_ref()
676                        .map(|a| a.as_str())
677                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
678
679                    ("fast_count", sources, scan_type, alias).into_py_any(py)?
680                },
681                FunctionIR::Hint(hint) => match hint {
682                    HintIR::Sorted(sorted_vec) => {
683                        let sorted_info: Vec<_> = sorted_vec
684                            .iter()
685                            .map(|s| (s.column.as_str(), s.descending, s.nulls_last))
686                            .collect();
687                        ("hint_sorted", sorted_info).into_py_any(py)?
688                    },
689                },
690            },
691        }
692        .into_py_any(py),
693        IR::Union { inputs, options } => Union {
694            inputs: inputs.iter().map(|n| n.0).collect(),
695            // TODO: rest of options
696            options: options.slice,
697        }
698        .into_py_any(py),
699        IR::HConcat {
700            inputs,
701            schema: _,
702            options: _,
703        } => HConcat {
704            inputs: inputs.iter().map(|n| n.0).collect(),
705            options: (),
706        }
707        .into_py_any(py),
708        IR::ExtContext {
709            input,
710            contexts,
711            schema: _,
712        } => ExtContext {
713            input: input.0,
714            contexts: contexts.iter().map(|n| n.0).collect(),
715        }
716        .into_py_any(py),
717        IR::Sink { input, payload } => Sink {
718            input: input.0,
719            payload: PyString::new(
720                py,
721                &serde_json::to_string(payload)
722                    .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
723            )
724            .into(),
725        }
726        .into_py_any(py),
727        IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
728            "Not expecting to see a SinkMultiple node",
729        )),
730        #[cfg(feature = "merge_sorted")]
731        IR::MergeSorted {
732            input_left,
733            input_right,
734            key,
735        } => MergeSorted {
736            input_left: input_left.0,
737            input_right: input_right.0,
738            key: key.to_string(),
739        }
740        .into_py_any(py),
741        IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
742    }
743}