polars_python/lazyframe/visitor/
nodes.rs

1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars::prelude::deletion::DeletionFilesList;
4use polars::prelude::python_dsl::PythonScanSource;
5use polars_core::prelude::IdxSize;
6use polars_io::cloud::CloudOptions;
7use polars_ops::prelude::JoinType;
8use polars_plan::plans::IR;
9use polars_plan::prelude::{FileScan, FunctionIR, PythonPredicate, UnifiedScanArgs};
10use pyo3::IntoPyObjectExt;
11use pyo3::exceptions::{PyNotImplementedError, PyValueError};
12use pyo3::prelude::*;
13use pyo3::types::{PyDict, PyList, PyString};
14
15use super::expr_nodes::PyGroupbyOptions;
16use crate::PyDataFrame;
17use crate::lazyframe::visit::PyExprIR;
18
19fn scan_type_to_pyobject(
20    py: Python<'_>,
21    scan_type: &FileScan,
22    cloud_options: &Option<CloudOptions>,
23) -> PyResult<PyObject> {
24    match scan_type {
25        #[cfg(feature = "csv")]
26        FileScan::Csv { options } => {
27            let options = serde_json::to_string(options)
28                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
29            let cloud_options = serde_json::to_string(cloud_options)
30                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
31            Ok(("csv", options, cloud_options).into_py_any(py)?)
32        },
33        #[cfg(feature = "parquet")]
34        FileScan::Parquet { options, .. } => {
35            let options = serde_json::to_string(options)
36                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
37            let cloud_options = serde_json::to_string(cloud_options)
38                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
39            Ok(("parquet", options, cloud_options).into_py_any(py)?)
40        },
41        #[cfg(feature = "ipc")]
42        FileScan::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
43        #[cfg(feature = "json")]
44        FileScan::NDJson { options, .. } => {
45            let options = serde_json::to_string(options)
46                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
47            Ok(("ndjson", options).into_py_any(py)?)
48        },
49        FileScan::PythonDataset { .. } => {
50            Err(PyNotImplementedError::new_err("python dataset scan"))
51        },
52        FileScan::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
53    }
54}
55
56#[pyclass]
57/// Scan a table with an optional predicate from a python function
58pub struct PythonScan {
59    #[pyo3(get)]
60    options: PyObject,
61}
62
63#[pyclass]
64/// Slice the table
65pub struct Slice {
66    #[pyo3(get)]
67    input: usize,
68    #[pyo3(get)]
69    offset: i64,
70    #[pyo3(get)]
71    len: IdxSize,
72}
73
74#[pyclass]
75/// Filter the table with a boolean expression
76pub struct Filter {
77    #[pyo3(get)]
78    input: usize,
79    #[pyo3(get)]
80    predicate: PyExprIR,
81}
82
83#[pyclass]
84#[derive(Clone)]
85pub struct PyFileOptions {
86    inner: UnifiedScanArgs,
87}
88
89#[pymethods]
90impl PyFileOptions {
91    #[getter]
92    fn n_rows(&self) -> Option<(i64, usize)> {
93        self.inner
94            .pre_slice
95            .clone()
96            .map(|slice| <(i64, usize)>::try_from(slice).unwrap())
97    }
98    #[getter]
99    fn with_columns(&self) -> Option<Vec<&str>> {
100        self.inner
101            .projection
102            .as_ref()?
103            .iter()
104            .map(|x| x.as_str())
105            .collect::<Vec<_>>()
106            .into()
107    }
108    #[getter]
109    fn cache(&self, _py: Python<'_>) -> bool {
110        self.inner.cache
111    }
112    #[getter]
113    fn row_index(&self) -> Option<(&str, IdxSize)> {
114        self.inner
115            .row_index
116            .as_ref()
117            .map(|n| (n.name.as_str(), n.offset))
118    }
119    #[getter]
120    fn rechunk(&self, _py: Python<'_>) -> bool {
121        self.inner.rechunk
122    }
123    #[getter]
124    fn hive_options(&self, _py: Python<'_>) -> PyResult<PyObject> {
125        Err(PyNotImplementedError::new_err("hive options"))
126    }
127    #[getter]
128    fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
129        self.inner.include_file_paths.as_deref()
130    }
131
132    /// One of:
133    /// * None
134    /// * ("iceberg-position-delete", dict[int, list[str]])
135    #[getter]
136    fn deletion_files(&self, py: Python<'_>) -> PyResult<PyObject> {
137        Ok(match &self.inner.deletion_files {
138            None => py.None().into_any(),
139
140            Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
141                let out = PyDict::new(py);
142
143                for (k, v) in paths.iter() {
144                    out.set_item(*k, v.as_ref())?;
145                }
146
147                ("iceberg-position-delete", out)
148                    .into_pyobject(py)?
149                    .into_any()
150                    .unbind()
151            },
152        })
153    }
154}
155
156#[pyclass]
157/// Scan a table from file
158pub struct Scan {
159    #[pyo3(get)]
160    paths: PyObject,
161    #[pyo3(get)]
162    file_info: PyObject,
163    #[pyo3(get)]
164    predicate: Option<PyExprIR>,
165    #[pyo3(get)]
166    file_options: PyFileOptions,
167    #[pyo3(get)]
168    scan_type: PyObject,
169}
170
171#[pyclass]
172/// Scan a table from an existing dataframe
173pub struct DataFrameScan {
174    #[pyo3(get)]
175    df: PyDataFrame,
176    #[pyo3(get)]
177    projection: PyObject,
178    #[pyo3(get)]
179    selection: Option<PyExprIR>,
180}
181
182#[pyclass]
183/// Project out columns from a table
184pub struct SimpleProjection {
185    #[pyo3(get)]
186    input: usize,
187}
188
189#[pyclass]
190/// Column selection
191pub struct Select {
192    #[pyo3(get)]
193    input: usize,
194    #[pyo3(get)]
195    expr: Vec<PyExprIR>,
196    #[pyo3(get)]
197    should_broadcast: bool,
198}
199
200#[pyclass]
201/// Sort the table
202pub struct Sort {
203    #[pyo3(get)]
204    input: usize,
205    #[pyo3(get)]
206    by_column: Vec<PyExprIR>,
207    #[pyo3(get)]
208    sort_options: (bool, Vec<bool>, Vec<bool>),
209    #[pyo3(get)]
210    slice: Option<(i64, usize)>,
211}
212
213#[pyclass]
214/// Cache the input at this point in the LP
215pub struct Cache {
216    #[pyo3(get)]
217    input: usize,
218    #[pyo3(get)]
219    id_: usize,
220    #[pyo3(get)]
221    cache_hits: u32,
222}
223
224#[pyclass]
225/// Groupby aggregation
226pub struct GroupBy {
227    #[pyo3(get)]
228    input: usize,
229    #[pyo3(get)]
230    keys: Vec<PyExprIR>,
231    #[pyo3(get)]
232    aggs: Vec<PyExprIR>,
233    #[pyo3(get)]
234    apply: (),
235    #[pyo3(get)]
236    maintain_order: bool,
237    #[pyo3(get)]
238    options: PyObject,
239}
240
241#[pyclass]
242/// Join operation
243pub struct Join {
244    #[pyo3(get)]
245    input_left: usize,
246    #[pyo3(get)]
247    input_right: usize,
248    #[pyo3(get)]
249    left_on: Vec<PyExprIR>,
250    #[pyo3(get)]
251    right_on: Vec<PyExprIR>,
252    #[pyo3(get)]
253    options: PyObject,
254}
255
256#[pyclass]
257/// Merge sorted operation
258pub struct MergeSorted {
259    #[pyo3(get)]
260    input_left: usize,
261    #[pyo3(get)]
262    input_right: usize,
263    #[pyo3(get)]
264    key: String,
265}
266
267#[pyclass]
268/// Adding columns to the table without a Join
269pub struct HStack {
270    #[pyo3(get)]
271    input: usize,
272    #[pyo3(get)]
273    exprs: Vec<PyExprIR>,
274    #[pyo3(get)]
275    should_broadcast: bool,
276}
277
278#[pyclass]
279/// Like Select, but all operations produce a single row.
280pub struct Reduce {
281    #[pyo3(get)]
282    input: usize,
283    #[pyo3(get)]
284    exprs: Vec<PyExprIR>,
285}
286
287#[pyclass]
288/// Remove duplicates from the table
289pub struct Distinct {
290    #[pyo3(get)]
291    input: usize,
292    #[pyo3(get)]
293    options: PyObject,
294}
295#[pyclass]
296/// A (User Defined) Function
297pub struct MapFunction {
298    #[pyo3(get)]
299    input: usize,
300    #[pyo3(get)]
301    function: PyObject,
302}
303#[pyclass]
304pub struct Union {
305    #[pyo3(get)]
306    inputs: Vec<usize>,
307    #[pyo3(get)]
308    options: Option<(i64, usize)>,
309}
310#[pyclass]
311/// Horizontal concatenation of multiple plans
312pub struct HConcat {
313    #[pyo3(get)]
314    inputs: Vec<usize>,
315    #[pyo3(get)]
316    options: (),
317}
318#[pyclass]
319/// This allows expressions to access other tables
320pub struct ExtContext {
321    #[pyo3(get)]
322    input: usize,
323    #[pyo3(get)]
324    contexts: Vec<usize>,
325}
326
327#[pyclass]
328pub struct Sink {
329    #[pyo3(get)]
330    input: usize,
331    #[pyo3(get)]
332    payload: PyObject,
333}
334
335pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
336    match plan {
337        IR::PythonScan { options } => {
338            let python_src = match options.python_source {
339                PythonScanSource::Pyarrow => "pyarrow",
340                PythonScanSource::Cuda => "cuda",
341                PythonScanSource::IOPlugin => "io_plugin",
342            };
343
344            PythonScan {
345                options: (
346                    options
347                        .scan_fn
348                        .as_ref()
349                        .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
350                    options.with_columns.as_ref().map_or_else(
351                        || Ok(py.None()),
352                        |cols| {
353                            cols.iter()
354                                .map(|x| x.as_str())
355                                .collect::<Vec<_>>()
356                                .into_py_any(py)
357                        },
358                    )?,
359                    python_src,
360                    match &options.predicate {
361                        PythonPredicate::None => py.None(),
362                        PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
363                        PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
364                    },
365                    options
366                        .n_rows
367                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
368                )
369                    .into_py_any(py)?,
370            }
371            .into_py_any(py)
372        },
373        IR::Slice { input, offset, len } => Slice {
374            input: input.0,
375            offset: *offset,
376            len: *len,
377        }
378        .into_py_any(py),
379        IR::Filter { input, predicate } => Filter {
380            input: input.0,
381            predicate: predicate.into(),
382        }
383        .into_py_any(py),
384        IR::Scan {
385            hive_parts: Some(_),
386            ..
387        } => Err(PyNotImplementedError::new_err(
388            "scan with hive partitioning",
389        )),
390        IR::Scan {
391            sources,
392            file_info: _,
393            hive_parts: _,
394            predicate,
395            output_schema: _,
396            scan_type,
397            unified_scan_args,
398            id: _,
399        } => {
400            Scan {
401                paths: {
402                    let paths = sources
403                        .into_paths()
404                        .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
405
406                    let out = PyList::new(py, [] as [(); 0])?;
407
408                    // Manual conversion to preserve `uri://...` - converting Rust `Path` to `PosixPath`
409                    // will corrupt to `uri:/...`
410                    for path in paths.iter() {
411                        if let Some(path) = path.to_str() {
412                            out.append(path)?
413                        } else {
414                            out.append(path)?
415                        }
416                    }
417
418                    out.into_py_any(py)?
419                },
420                // TODO: file info
421                file_info: py.None(),
422                predicate: predicate.as_ref().map(|e| e.into()),
423                file_options: PyFileOptions {
424                    inner: (**unified_scan_args).clone(),
425                },
426                scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
427            }
428        }
429        .into_py_any(py),
430        IR::DataFrameScan {
431            df,
432            schema: _,
433            output_schema,
434        } => DataFrameScan {
435            df: PyDataFrame::new((**df).clone()),
436            projection: output_schema.as_ref().map_or_else(
437                || Ok(py.None()),
438                |s| {
439                    s.iter_names()
440                        .map(|s| s.as_str())
441                        .collect::<Vec<_>>()
442                        .into_py_any(py)
443                },
444            )?,
445            selection: None,
446        }
447        .into_py_any(py),
448        IR::SimpleProjection { input, columns: _ } => {
449            SimpleProjection { input: input.0 }.into_py_any(py)
450        },
451        IR::Select {
452            input,
453            expr,
454            schema: _,
455            options,
456        } => Select {
457            expr: expr.iter().map(|e| e.into()).collect(),
458            input: input.0,
459            should_broadcast: options.should_broadcast,
460        }
461        .into_py_any(py),
462        IR::Sort {
463            input,
464            by_column,
465            slice,
466            sort_options,
467        } => Sort {
468            input: input.0,
469            by_column: by_column.iter().map(|e| e.into()).collect(),
470            sort_options: (
471                sort_options.maintain_order,
472                sort_options.nulls_last.clone(),
473                sort_options.descending.clone(),
474            ),
475            slice: *slice,
476        }
477        .into_py_any(py),
478        IR::Cache {
479            input,
480            id,
481            cache_hits,
482        } => Cache {
483            input: input.0,
484            id_: id.to_usize(),
485            cache_hits: *cache_hits,
486        }
487        .into_py_any(py),
488        IR::GroupBy {
489            input,
490            keys,
491            aggs,
492            schema: _,
493            apply,
494            maintain_order,
495            options,
496        } => GroupBy {
497            input: input.0,
498            keys: keys.iter().map(|e| e.into()).collect(),
499            aggs: aggs.iter().map(|e| e.into()).collect(),
500            apply: apply.as_ref().map_or(Ok(()), |_| {
501                Err(PyNotImplementedError::new_err(format!(
502                    "apply inside GroupBy {plan:?}"
503                )))
504            })?,
505            maintain_order: *maintain_order,
506            options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
507        }
508        .into_py_any(py),
509        IR::Join {
510            input_left,
511            input_right,
512            schema: _,
513            left_on,
514            right_on,
515            options,
516        } => Join {
517            input_left: input_left.0,
518            input_right: input_right.0,
519            left_on: left_on.iter().map(|e| e.into()).collect(),
520            right_on: right_on.iter().map(|e| e.into()).collect(),
521            options: {
522                let how = &options.args.how;
523                let name = Into::<&str>::into(how).into_pyobject(py)?;
524                (
525                    match how {
526                        #[cfg(feature = "asof_join")]
527                        JoinType::AsOf(_) => {
528                            return Err(PyNotImplementedError::new_err("asof join"));
529                        },
530                        #[cfg(feature = "iejoin")]
531                        JoinType::IEJoin => {
532                            let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
533                            else {
534                                unreachable!()
535                            };
536                            (
537                                name,
538                                crate::Wrap(ie_options.operator1).into_py_any(py)?,
539                                ie_options.operator2.as_ref().map_or_else(
540                                    || Ok(py.None()),
541                                    |op| crate::Wrap(*op).into_py_any(py),
542                                )?,
543                            )
544                                .into_py_any(py)?
545                        },
546                        // This is a cross join fused with a predicate. Shown in the IR::explain as
547                        // NESTED LOOP JOIN
548                        JoinType::Cross if options.options.is_some() => {
549                            return Err(PyNotImplementedError::new_err("nested loop join"));
550                        },
551                        _ => name.into_any().unbind(),
552                    },
553                    options.args.nulls_equal,
554                    options.args.slice,
555                    options.args.suffix().as_str(),
556                    options.args.coalesce.coalesce(how),
557                    Into::<&str>::into(options.args.maintain_order),
558                )
559                    .into_py_any(py)?
560            },
561        }
562        .into_py_any(py),
563        IR::HStack {
564            input,
565            exprs,
566            schema: _,
567            options,
568        } => HStack {
569            input: input.0,
570            exprs: exprs.iter().map(|e| e.into()).collect(),
571            should_broadcast: options.should_broadcast,
572        }
573        .into_py_any(py),
574        IR::Distinct { input, options } => Distinct {
575            input: input.0,
576            options: (
577                Into::<&str>::into(options.keep_strategy),
578                options.subset.as_ref().map_or_else(
579                    || Ok(py.None()),
580                    |f| {
581                        f.iter()
582                            .map(|s| s.as_ref())
583                            .collect::<Vec<&str>>()
584                            .into_py_any(py)
585                    },
586                )?,
587                options.maintain_order,
588                options.slice,
589            )
590                .into_py_any(py)?,
591        }
592        .into_py_any(py),
593        IR::MapFunction { input, function } => MapFunction {
594            input: input.0,
595            function: match function {
596                FunctionIR::OpaquePython(_) => {
597                    return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
598                },
599                FunctionIR::Opaque {
600                    function: _,
601                    schema: _,
602                    predicate_pd: _,
603                    projection_pd: _,
604                    streamable: _,
605                    fmt_str: _,
606                } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
607                FunctionIR::Unnest { columns } => (
608                    "unnest",
609                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
610                )
611                    .into_py_any(py)?,
612                FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
613                FunctionIR::Explode { columns, schema: _ } => (
614                    "explode",
615                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
616                )
617                    .into_py_any(py)?,
618                #[cfg(feature = "pivot")]
619                FunctionIR::Unpivot { args, schema: _ } => (
620                    "unpivot",
621                    args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
622                    args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
623                    args.variable_name
624                        .as_ref()
625                        .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
626                    args.value_name
627                        .as_ref()
628                        .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
629                )
630                    .into_py_any(py)?,
631                FunctionIR::RowIndex {
632                    name,
633                    schema: _,
634                    offset,
635                } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
636                FunctionIR::FastCount {
637                    sources,
638                    scan_type,
639                    cloud_options,
640                    alias,
641                } => {
642                    let sources = sources
643                        .into_paths()
644                        .ok_or_else(|| {
645                            PyNotImplementedError::new_err("FastCount with BytesIO sources")
646                        })?
647                        .into_py_any(py)?;
648
649                    let scan_type = scan_type_to_pyobject(py, scan_type, cloud_options)?;
650
651                    let alias = alias
652                        .as_ref()
653                        .map(|a| a.as_str())
654                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
655
656                    ("fast_count", sources, scan_type, alias).into_py_any(py)?
657                },
658            },
659        }
660        .into_py_any(py),
661        IR::Union { inputs, options } => Union {
662            inputs: inputs.iter().map(|n| n.0).collect(),
663            // TODO: rest of options
664            options: options.slice,
665        }
666        .into_py_any(py),
667        IR::HConcat {
668            inputs,
669            schema: _,
670            options: _,
671        } => HConcat {
672            inputs: inputs.iter().map(|n| n.0).collect(),
673            options: (),
674        }
675        .into_py_any(py),
676        IR::ExtContext {
677            input,
678            contexts,
679            schema: _,
680        } => ExtContext {
681            input: input.0,
682            contexts: contexts.iter().map(|n| n.0).collect(),
683        }
684        .into_py_any(py),
685        IR::Sink { input, payload } => Sink {
686            input: input.0,
687            payload: PyString::new(
688                py,
689                &serde_json::to_string(payload)
690                    .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
691            )
692            .into(),
693        }
694        .into_py_any(py),
695        IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
696            "Not expecting to see a SinkMultiple node",
697        )),
698        #[cfg(feature = "merge_sorted")]
699        IR::MergeSorted {
700            input_left,
701            input_right,
702            key,
703        } => MergeSorted {
704            input_left: input_left.0,
705            input_right: input_right.0,
706            key: key.to_string(),
707        }
708        .into_py_any(py),
709        IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
710    }
711}