polars_python/lazyframe/visitor/
nodes.rs

1use polars::prelude::ColumnMapping;
2#[cfg(feature = "iejoin")]
3use polars::prelude::JoinTypeOptionsIR;
4use polars::prelude::deletion::DeletionFilesList;
5use polars::prelude::python_dsl::PythonScanSource;
6use polars_core::prelude::IdxSize;
7use polars_io::cloud::CloudOptions;
8use polars_ops::prelude::JoinType;
9use polars_plan::plans::IR;
10use polars_plan::prelude::{FileScanIR, FunctionIR, PythonPredicate, UnifiedScanArgs};
11use pyo3::IntoPyObjectExt;
12use pyo3::exceptions::{PyNotImplementedError, PyValueError};
13use pyo3::prelude::*;
14use pyo3::types::{PyDict, PyList, PyString};
15
16use super::expr_nodes::PyGroupbyOptions;
17use crate::PyDataFrame;
18use crate::lazyframe::visit::PyExprIR;
19
20fn scan_type_to_pyobject(
21    py: Python<'_>,
22    scan_type: &FileScanIR,
23    cloud_options: &Option<CloudOptions>,
24) -> PyResult<PyObject> {
25    match scan_type {
26        #[cfg(feature = "csv")]
27        FileScanIR::Csv { options } => {
28            let options = serde_json::to_string(options)
29                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30            let cloud_options = serde_json::to_string(cloud_options)
31                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
32            Ok(("csv", options, cloud_options).into_py_any(py)?)
33        },
34        #[cfg(feature = "parquet")]
35        FileScanIR::Parquet { options, .. } => {
36            let options = serde_json::to_string(options)
37                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38            let cloud_options = serde_json::to_string(cloud_options)
39                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
40            Ok(("parquet", options, cloud_options).into_py_any(py)?)
41        },
42        #[cfg(feature = "ipc")]
43        FileScanIR::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
44        #[cfg(feature = "json")]
45        FileScanIR::NDJson { options, .. } => {
46            let options = serde_json::to_string(options)
47                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
48            Ok(("ndjson", options).into_py_any(py)?)
49        },
50        FileScanIR::PythonDataset { .. } => {
51            Err(PyNotImplementedError::new_err("python dataset scan"))
52        },
53        FileScanIR::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
54    }
55}
56
57#[pyclass(frozen)]
58/// Scan a table with an optional predicate from a python function
59pub struct PythonScan {
60    #[pyo3(get)]
61    options: PyObject,
62}
63
64#[pyclass(frozen)]
65/// Slice the table
66pub struct Slice {
67    #[pyo3(get)]
68    input: usize,
69    #[pyo3(get)]
70    offset: i64,
71    #[pyo3(get)]
72    len: IdxSize,
73}
74
75#[pyclass(frozen)]
76/// Filter the table with a boolean expression
77pub struct Filter {
78    #[pyo3(get)]
79    input: usize,
80    #[pyo3(get)]
81    predicate: PyExprIR,
82}
83
84#[pyclass(frozen)]
85#[derive(Clone)]
86pub struct PyFileOptions {
87    inner: UnifiedScanArgs,
88}
89
90#[pymethods]
91impl PyFileOptions {
92    #[getter]
93    fn n_rows(&self) -> Option<(i64, usize)> {
94        self.inner
95            .pre_slice
96            .clone()
97            .map(|slice| <(i64, usize)>::try_from(slice).unwrap())
98    }
99    #[getter]
100    fn with_columns(&self) -> Option<Vec<&str>> {
101        self.inner
102            .projection
103            .as_ref()?
104            .iter()
105            .map(|x| x.as_str())
106            .collect::<Vec<_>>()
107            .into()
108    }
109    #[getter]
110    fn cache(&self, _py: Python<'_>) -> bool {
111        self.inner.cache
112    }
113    #[getter]
114    fn row_index(&self) -> Option<(&str, IdxSize)> {
115        self.inner
116            .row_index
117            .as_ref()
118            .map(|n| (n.name.as_str(), n.offset))
119    }
120    #[getter]
121    fn rechunk(&self, _py: Python<'_>) -> bool {
122        self.inner.rechunk
123    }
124    #[getter]
125    fn hive_options(&self, _py: Python<'_>) -> PyResult<PyObject> {
126        Err(PyNotImplementedError::new_err("hive options"))
127    }
128    #[getter]
129    fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
130        self.inner.include_file_paths.as_deref()
131    }
132
133    /// One of:
134    /// * None
135    /// * ("iceberg-position-delete", dict[int, list[str]])
136    #[getter]
137    fn deletion_files(&self, py: Python<'_>) -> PyResult<PyObject> {
138        Ok(match &self.inner.deletion_files {
139            None => py.None().into_any(),
140
141            Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
142                let out = PyDict::new(py);
143
144                for (k, v) in paths.iter() {
145                    out.set_item(*k, v.as_ref())?;
146                }
147
148                ("iceberg-position-delete", out)
149                    .into_pyobject(py)?
150                    .into_any()
151                    .unbind()
152            },
153        })
154    }
155
156    /// One of:
157    /// * None
158    /// * ("iceberg-column-mapping", <unimplemented>)
159    #[getter]
160    fn column_mapping(&self, py: Python<'_>) -> PyResult<PyObject> {
161        Ok(match &self.inner.column_mapping {
162            None => py.None().into_any(),
163
164            Some(ColumnMapping::Iceberg { .. }) => unimplemented!(),
165        })
166    }
167}
168
169#[pyclass(frozen)]
170/// Scan a table from file
171pub struct Scan {
172    #[pyo3(get)]
173    paths: PyObject,
174    #[pyo3(get)]
175    file_info: PyObject,
176    #[pyo3(get)]
177    predicate: Option<PyExprIR>,
178    #[pyo3(get)]
179    file_options: PyFileOptions,
180    #[pyo3(get)]
181    scan_type: PyObject,
182}
183
184#[pyclass(frozen)]
185/// Scan a table from an existing dataframe
186pub struct DataFrameScan {
187    #[pyo3(get)]
188    df: PyDataFrame,
189    #[pyo3(get)]
190    projection: PyObject,
191    #[pyo3(get)]
192    selection: Option<PyExprIR>,
193}
194
195#[pyclass(frozen)]
196/// Project out columns from a table
197pub struct SimpleProjection {
198    #[pyo3(get)]
199    input: usize,
200}
201
202#[pyclass(frozen)]
203/// Column selection
204pub struct Select {
205    #[pyo3(get)]
206    input: usize,
207    #[pyo3(get)]
208    expr: Vec<PyExprIR>,
209    #[pyo3(get)]
210    should_broadcast: bool,
211}
212
213#[pyclass(frozen)]
214/// Sort the table
215pub struct Sort {
216    #[pyo3(get)]
217    input: usize,
218    #[pyo3(get)]
219    by_column: Vec<PyExprIR>,
220    #[pyo3(get)]
221    sort_options: (bool, Vec<bool>, Vec<bool>),
222    #[pyo3(get)]
223    slice: Option<(i64, usize)>,
224}
225
226#[pyclass(frozen)]
227/// Cache the input at this point in the LP
228pub struct Cache {
229    #[pyo3(get)]
230    input: usize,
231    #[pyo3(get)]
232    id_: u128,
233}
234
235#[pyclass(frozen)]
236/// Groupby aggregation
237pub struct GroupBy {
238    #[pyo3(get)]
239    input: usize,
240    #[pyo3(get)]
241    keys: Vec<PyExprIR>,
242    #[pyo3(get)]
243    aggs: Vec<PyExprIR>,
244    #[pyo3(get)]
245    apply: (),
246    #[pyo3(get)]
247    maintain_order: bool,
248    #[pyo3(get)]
249    options: PyObject,
250}
251
252#[pyclass(frozen)]
253/// Join operation
254pub struct Join {
255    #[pyo3(get)]
256    input_left: usize,
257    #[pyo3(get)]
258    input_right: usize,
259    #[pyo3(get)]
260    left_on: Vec<PyExprIR>,
261    #[pyo3(get)]
262    right_on: Vec<PyExprIR>,
263    #[pyo3(get)]
264    options: PyObject,
265}
266
267#[pyclass(frozen)]
268/// Merge sorted operation
269pub struct MergeSorted {
270    #[pyo3(get)]
271    input_left: usize,
272    #[pyo3(get)]
273    input_right: usize,
274    #[pyo3(get)]
275    key: String,
276}
277
278#[pyclass(frozen)]
279/// Adding columns to the table without a Join
280pub struct HStack {
281    #[pyo3(get)]
282    input: usize,
283    #[pyo3(get)]
284    exprs: Vec<PyExprIR>,
285    #[pyo3(get)]
286    should_broadcast: bool,
287}
288
289#[pyclass(frozen)]
290/// Like Select, but all operations produce a single row.
291pub struct Reduce {
292    #[pyo3(get)]
293    input: usize,
294    #[pyo3(get)]
295    exprs: Vec<PyExprIR>,
296}
297
298#[pyclass(frozen)]
299/// Remove duplicates from the table
300pub struct Distinct {
301    #[pyo3(get)]
302    input: usize,
303    #[pyo3(get)]
304    options: PyObject,
305}
306#[pyclass(frozen)]
307/// A (User Defined) Function
308pub struct MapFunction {
309    #[pyo3(get)]
310    input: usize,
311    #[pyo3(get)]
312    function: PyObject,
313}
314#[pyclass(frozen)]
315pub struct Union {
316    #[pyo3(get)]
317    inputs: Vec<usize>,
318    #[pyo3(get)]
319    options: Option<(i64, usize)>,
320}
321#[pyclass(frozen)]
322/// Horizontal concatenation of multiple plans
323pub struct HConcat {
324    #[pyo3(get)]
325    inputs: Vec<usize>,
326    #[pyo3(get)]
327    options: (),
328}
329#[pyclass(frozen)]
330/// This allows expressions to access other tables
331pub struct ExtContext {
332    #[pyo3(get)]
333    input: usize,
334    #[pyo3(get)]
335    contexts: Vec<usize>,
336}
337
338#[pyclass(frozen)]
339pub struct Sink {
340    #[pyo3(get)]
341    input: usize,
342    #[pyo3(get)]
343    payload: PyObject,
344}
345
346pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
347    match plan {
348        IR::PythonScan { options } => {
349            let python_src = match options.python_source {
350                PythonScanSource::Pyarrow => "pyarrow",
351                PythonScanSource::Cuda => "cuda",
352                PythonScanSource::IOPlugin => "io_plugin",
353            };
354
355            PythonScan {
356                options: (
357                    options
358                        .scan_fn
359                        .as_ref()
360                        .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
361                    options.with_columns.as_ref().map_or_else(
362                        || Ok(py.None()),
363                        |cols| {
364                            cols.iter()
365                                .map(|x| x.as_str())
366                                .collect::<Vec<_>>()
367                                .into_py_any(py)
368                        },
369                    )?,
370                    python_src,
371                    match &options.predicate {
372                        PythonPredicate::None => py.None(),
373                        PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
374                        PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
375                    },
376                    options
377                        .n_rows
378                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
379                )
380                    .into_py_any(py)?,
381            }
382            .into_py_any(py)
383        },
384        IR::Slice { input, offset, len } => Slice {
385            input: input.0,
386            offset: *offset,
387            len: *len,
388        }
389        .into_py_any(py),
390        IR::Filter { input, predicate } => Filter {
391            input: input.0,
392            predicate: predicate.into(),
393        }
394        .into_py_any(py),
395        IR::Scan {
396            hive_parts: Some(_),
397            ..
398        } => Err(PyNotImplementedError::new_err(
399            "scan with hive partitioning",
400        )),
401        IR::Scan {
402            sources,
403            file_info: _,
404            hive_parts: _,
405            predicate,
406            output_schema: _,
407            scan_type,
408            unified_scan_args,
409        } => {
410            Scan {
411                paths: {
412                    let paths = sources
413                        .into_paths()
414                        .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
415
416                    let out = PyList::new(py, [] as [(); 0])?;
417
418                    // Manual conversion to preserve `uri://...` - converting Rust `Path` to `PosixPath`
419                    // will corrupt to `uri:/...`
420                    for path in paths.iter() {
421                        out.append(path.to_str())?;
422                    }
423
424                    out.into_py_any(py)?
425                },
426                // TODO: file info
427                file_info: py.None(),
428                predicate: predicate.as_ref().map(|e| e.into()),
429                file_options: PyFileOptions {
430                    inner: (**unified_scan_args).clone(),
431                },
432                scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
433            }
434        }
435        .into_py_any(py),
436        IR::DataFrameScan {
437            df,
438            schema: _,
439            output_schema,
440        } => DataFrameScan {
441            df: PyDataFrame::new((**df).clone()),
442            projection: output_schema.as_ref().map_or_else(
443                || Ok(py.None()),
444                |s| {
445                    s.iter_names()
446                        .map(|s| s.as_str())
447                        .collect::<Vec<_>>()
448                        .into_py_any(py)
449                },
450            )?,
451            selection: None,
452        }
453        .into_py_any(py),
454        IR::SimpleProjection { input, columns: _ } => {
455            SimpleProjection { input: input.0 }.into_py_any(py)
456        },
457        IR::Select {
458            input,
459            expr,
460            schema: _,
461            options,
462        } => Select {
463            expr: expr.iter().map(|e| e.into()).collect(),
464            input: input.0,
465            should_broadcast: options.should_broadcast,
466        }
467        .into_py_any(py),
468        IR::Sort {
469            input,
470            by_column,
471            slice,
472            sort_options,
473        } => Sort {
474            input: input.0,
475            by_column: by_column.iter().map(|e| e.into()).collect(),
476            sort_options: (
477                sort_options.maintain_order,
478                sort_options.nulls_last.clone(),
479                sort_options.descending.clone(),
480            ),
481            slice: *slice,
482        }
483        .into_py_any(py),
484        IR::Cache { input, id } => Cache {
485            input: input.0,
486            id_: id.as_u128(),
487        }
488        .into_py_any(py),
489        IR::GroupBy {
490            input,
491            keys,
492            aggs,
493            schema: _,
494            apply,
495            maintain_order,
496            options,
497        } => GroupBy {
498            input: input.0,
499            keys: keys.iter().map(|e| e.into()).collect(),
500            aggs: aggs.iter().map(|e| e.into()).collect(),
501            apply: apply.as_ref().map_or(Ok(()), |_| {
502                Err(PyNotImplementedError::new_err(format!(
503                    "apply inside GroupBy {plan:?}"
504                )))
505            })?,
506            maintain_order: *maintain_order,
507            options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
508        }
509        .into_py_any(py),
510        IR::Join {
511            input_left,
512            input_right,
513            schema: _,
514            left_on,
515            right_on,
516            options,
517        } => Join {
518            input_left: input_left.0,
519            input_right: input_right.0,
520            left_on: left_on.iter().map(|e| e.into()).collect(),
521            right_on: right_on.iter().map(|e| e.into()).collect(),
522            options: {
523                let how = &options.args.how;
524                let name = Into::<&str>::into(how).into_pyobject(py)?;
525                (
526                    match how {
527                        #[cfg(feature = "asof_join")]
528                        JoinType::AsOf(_) => {
529                            return Err(PyNotImplementedError::new_err("asof join"));
530                        },
531                        #[cfg(feature = "iejoin")]
532                        JoinType::IEJoin => {
533                            let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
534                            else {
535                                unreachable!()
536                            };
537                            (
538                                name,
539                                crate::Wrap(ie_options.operator1).into_py_any(py)?,
540                                ie_options.operator2.as_ref().map_or_else(
541                                    || Ok(py.None()),
542                                    |op| crate::Wrap(*op).into_py_any(py),
543                                )?,
544                            )
545                                .into_py_any(py)?
546                        },
547                        // This is a cross join fused with a predicate. Shown in the IR::explain as
548                        // NESTED LOOP JOIN
549                        JoinType::Cross if options.options.is_some() => {
550                            return Err(PyNotImplementedError::new_err("nested loop join"));
551                        },
552                        _ => name.into_any().unbind(),
553                    },
554                    options.args.nulls_equal,
555                    options.args.slice,
556                    options.args.suffix().as_str(),
557                    options.args.coalesce.coalesce(how),
558                    Into::<&str>::into(options.args.maintain_order),
559                )
560                    .into_py_any(py)?
561            },
562        }
563        .into_py_any(py),
564        IR::HStack {
565            input,
566            exprs,
567            schema: _,
568            options,
569        } => HStack {
570            input: input.0,
571            exprs: exprs.iter().map(|e| e.into()).collect(),
572            should_broadcast: options.should_broadcast,
573        }
574        .into_py_any(py),
575        IR::Distinct { input, options } => Distinct {
576            input: input.0,
577            options: (
578                Into::<&str>::into(options.keep_strategy),
579                options.subset.as_ref().map_or_else(
580                    || Ok(py.None()),
581                    |f| {
582                        f.iter()
583                            .map(|s| s.as_ref())
584                            .collect::<Vec<&str>>()
585                            .into_py_any(py)
586                    },
587                )?,
588                options.maintain_order,
589                options.slice,
590            )
591                .into_py_any(py)?,
592        }
593        .into_py_any(py),
594        IR::MapFunction { input, function } => MapFunction {
595            input: input.0,
596            function: match function {
597                FunctionIR::OpaquePython(_) => {
598                    return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
599                },
600                FunctionIR::Opaque {
601                    function: _,
602                    schema: _,
603                    predicate_pd: _,
604                    projection_pd: _,
605                    streamable: _,
606                    fmt_str: _,
607                } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
608                FunctionIR::Unnest { columns } => (
609                    "unnest",
610                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
611                )
612                    .into_py_any(py)?,
613                FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
614                FunctionIR::Explode { columns, schema: _ } => (
615                    "explode",
616                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
617                )
618                    .into_py_any(py)?,
619                #[cfg(feature = "pivot")]
620                FunctionIR::Unpivot { args, schema: _ } => (
621                    "unpivot",
622                    args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
623                    args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
624                    args.variable_name
625                        .as_ref()
626                        .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
627                    args.value_name
628                        .as_ref()
629                        .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
630                )
631                    .into_py_any(py)?,
632                FunctionIR::RowIndex {
633                    name,
634                    schema: _,
635                    offset,
636                } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
637                FunctionIR::FastCount {
638                    sources,
639                    scan_type,
640                    cloud_options,
641                    alias,
642                } => {
643                    let sources = sources
644                        .into_paths()
645                        .ok_or_else(|| {
646                            PyNotImplementedError::new_err("FastCount with BytesIO sources")
647                        })?
648                        .iter()
649                        .map(|p| p.to_str())
650                        .collect::<Vec<_>>()
651                        .into_py_any(py)?;
652
653                    let scan_type = scan_type_to_pyobject(py, scan_type, cloud_options)?;
654
655                    let alias = alias
656                        .as_ref()
657                        .map(|a| a.as_str())
658                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
659
660                    ("fast_count", sources, scan_type, alias).into_py_any(py)?
661                },
662            },
663        }
664        .into_py_any(py),
665        IR::Union { inputs, options } => Union {
666            inputs: inputs.iter().map(|n| n.0).collect(),
667            // TODO: rest of options
668            options: options.slice,
669        }
670        .into_py_any(py),
671        IR::HConcat {
672            inputs,
673            schema: _,
674            options: _,
675        } => HConcat {
676            inputs: inputs.iter().map(|n| n.0).collect(),
677            options: (),
678        }
679        .into_py_any(py),
680        IR::ExtContext {
681            input,
682            contexts,
683            schema: _,
684        } => ExtContext {
685            input: input.0,
686            contexts: contexts.iter().map(|n| n.0).collect(),
687        }
688        .into_py_any(py),
689        IR::Sink { input, payload } => Sink {
690            input: input.0,
691            payload: PyString::new(
692                py,
693                &serde_json::to_string(payload)
694                    .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
695            )
696            .into(),
697        }
698        .into_py_any(py),
699        IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
700            "Not expecting to see a SinkMultiple node",
701        )),
702        #[cfg(feature = "merge_sorted")]
703        IR::MergeSorted {
704            input_left,
705            input_right,
706            key,
707        } => MergeSorted {
708            input_left: input_left.0,
709            input_right: input_right.0,
710            key: key.to_string(),
711        }
712        .into_py_any(py),
713        IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
714    }
715}