polars_python/lazyframe/visitor/
nodes.rs

1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars::prelude::python_dsl::PythonScanSource;
4use polars_core::prelude::IdxSize;
5use polars_io::cloud::CloudOptions;
6use polars_ops::prelude::JoinType;
7use polars_plan::plans::IR;
8use polars_plan::prelude::{FileScan, FunctionIR, PythonPredicate, UnifiedScanArgs};
9use pyo3::IntoPyObjectExt;
10use pyo3::exceptions::{PyNotImplementedError, PyValueError};
11use pyo3::prelude::*;
12use pyo3::types::PyString;
13
14use super::expr_nodes::PyGroupbyOptions;
15use crate::PyDataFrame;
16use crate::lazyframe::visit::PyExprIR;
17
18fn scan_type_to_pyobject(
19    py: Python<'_>,
20    scan_type: &FileScan,
21    cloud_options: &Option<CloudOptions>,
22) -> PyResult<PyObject> {
23    match scan_type {
24        #[cfg(feature = "csv")]
25        FileScan::Csv { options } => {
26            let options = serde_json::to_string(options)
27                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
28            let cloud_options = serde_json::to_string(cloud_options)
29                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30            Ok(("csv", options, cloud_options).into_py_any(py)?)
31        },
32        #[cfg(feature = "parquet")]
33        FileScan::Parquet { options, .. } => {
34            let options = serde_json::to_string(options)
35                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
36            let cloud_options = serde_json::to_string(cloud_options)
37                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38            Ok(("parquet", options, cloud_options).into_py_any(py)?)
39        },
40        #[cfg(feature = "ipc")]
41        FileScan::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
42        #[cfg(feature = "json")]
43        FileScan::NDJson { options, .. } => {
44            let options = serde_json::to_string(options)
45                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
46            Ok(("ndjson", options).into_py_any(py)?)
47        },
48        FileScan::PythonDataset { .. } => {
49            Err(PyNotImplementedError::new_err("python dataset scan"))
50        },
51        FileScan::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
52    }
53}
54
55#[pyclass]
56/// Scan a table with an optional predicate from a python function
57pub struct PythonScan {
58    #[pyo3(get)]
59    options: PyObject,
60}
61
62#[pyclass]
63/// Slice the table
64pub struct Slice {
65    #[pyo3(get)]
66    input: usize,
67    #[pyo3(get)]
68    offset: i64,
69    #[pyo3(get)]
70    len: IdxSize,
71}
72
73#[pyclass]
74/// Filter the table with a boolean expression
75pub struct Filter {
76    #[pyo3(get)]
77    input: usize,
78    #[pyo3(get)]
79    predicate: PyExprIR,
80}
81
82#[pyclass]
83#[derive(Clone)]
84pub struct PyFileOptions {
85    inner: UnifiedScanArgs,
86}
87
88#[pymethods]
89impl PyFileOptions {
90    #[getter]
91    fn n_rows(&self) -> Option<(i64, usize)> {
92        self.inner
93            .pre_slice
94            .clone()
95            .map(|slice| <(i64, usize)>::try_from(slice).unwrap())
96    }
97    #[getter]
98    fn with_columns(&self) -> Option<Vec<&str>> {
99        self.inner
100            .projection
101            .as_ref()?
102            .iter()
103            .map(|x| x.as_str())
104            .collect::<Vec<_>>()
105            .into()
106    }
107    #[getter]
108    fn cache(&self, _py: Python<'_>) -> bool {
109        self.inner.cache
110    }
111    #[getter]
112    fn row_index(&self) -> Option<(&str, IdxSize)> {
113        self.inner
114            .row_index
115            .as_ref()
116            .map(|n| (n.name.as_str(), n.offset))
117    }
118    #[getter]
119    fn rechunk(&self, _py: Python<'_>) -> bool {
120        self.inner.rechunk
121    }
122    #[getter]
123    fn hive_options(&self, _py: Python<'_>) -> PyResult<PyObject> {
124        Err(PyNotImplementedError::new_err("hive options"))
125    }
126    #[getter]
127    fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
128        self.inner.include_file_paths.as_deref()
129    }
130}
131
132#[pyclass]
133/// Scan a table from file
134pub struct Scan {
135    #[pyo3(get)]
136    paths: PyObject,
137    #[pyo3(get)]
138    file_info: PyObject,
139    #[pyo3(get)]
140    predicate: Option<PyExprIR>,
141    #[pyo3(get)]
142    file_options: PyFileOptions,
143    #[pyo3(get)]
144    scan_type: PyObject,
145}
146
147#[pyclass]
148/// Scan a table from an existing dataframe
149pub struct DataFrameScan {
150    #[pyo3(get)]
151    df: PyDataFrame,
152    #[pyo3(get)]
153    projection: PyObject,
154    #[pyo3(get)]
155    selection: Option<PyExprIR>,
156}
157
158#[pyclass]
159/// Project out columns from a table
160pub struct SimpleProjection {
161    #[pyo3(get)]
162    input: usize,
163}
164
165#[pyclass]
166/// Column selection
167pub struct Select {
168    #[pyo3(get)]
169    input: usize,
170    #[pyo3(get)]
171    expr: Vec<PyExprIR>,
172    #[pyo3(get)]
173    should_broadcast: bool,
174}
175
176#[pyclass]
177/// Sort the table
178pub struct Sort {
179    #[pyo3(get)]
180    input: usize,
181    #[pyo3(get)]
182    by_column: Vec<PyExprIR>,
183    #[pyo3(get)]
184    sort_options: (bool, Vec<bool>, Vec<bool>),
185    #[pyo3(get)]
186    slice: Option<(i64, usize)>,
187}
188
189#[pyclass]
190/// Cache the input at this point in the LP
191pub struct Cache {
192    #[pyo3(get)]
193    input: usize,
194    #[pyo3(get)]
195    id_: usize,
196    #[pyo3(get)]
197    cache_hits: u32,
198}
199
200#[pyclass]
201/// Groupby aggregation
202pub struct GroupBy {
203    #[pyo3(get)]
204    input: usize,
205    #[pyo3(get)]
206    keys: Vec<PyExprIR>,
207    #[pyo3(get)]
208    aggs: Vec<PyExprIR>,
209    #[pyo3(get)]
210    apply: (),
211    #[pyo3(get)]
212    maintain_order: bool,
213    #[pyo3(get)]
214    options: PyObject,
215}
216
217#[pyclass]
218/// Join operation
219pub struct Join {
220    #[pyo3(get)]
221    input_left: usize,
222    #[pyo3(get)]
223    input_right: usize,
224    #[pyo3(get)]
225    left_on: Vec<PyExprIR>,
226    #[pyo3(get)]
227    right_on: Vec<PyExprIR>,
228    #[pyo3(get)]
229    options: PyObject,
230}
231
232#[pyclass]
233/// Merge sorted operation
234pub struct MergeSorted {
235    #[pyo3(get)]
236    input_left: usize,
237    #[pyo3(get)]
238    input_right: usize,
239    #[pyo3(get)]
240    key: String,
241}
242
243#[pyclass]
244/// Adding columns to the table without a Join
245pub struct HStack {
246    #[pyo3(get)]
247    input: usize,
248    #[pyo3(get)]
249    exprs: Vec<PyExprIR>,
250    #[pyo3(get)]
251    should_broadcast: bool,
252}
253
254#[pyclass]
255/// Like Select, but all operations produce a single row.
256pub struct Reduce {
257    #[pyo3(get)]
258    input: usize,
259    #[pyo3(get)]
260    exprs: Vec<PyExprIR>,
261}
262
263#[pyclass]
264/// Remove duplicates from the table
265pub struct Distinct {
266    #[pyo3(get)]
267    input: usize,
268    #[pyo3(get)]
269    options: PyObject,
270}
271#[pyclass]
272/// A (User Defined) Function
273pub struct MapFunction {
274    #[pyo3(get)]
275    input: usize,
276    #[pyo3(get)]
277    function: PyObject,
278}
279#[pyclass]
280pub struct Union {
281    #[pyo3(get)]
282    inputs: Vec<usize>,
283    #[pyo3(get)]
284    options: Option<(i64, usize)>,
285}
286#[pyclass]
287/// Horizontal concatenation of multiple plans
288pub struct HConcat {
289    #[pyo3(get)]
290    inputs: Vec<usize>,
291    #[pyo3(get)]
292    options: (),
293}
294#[pyclass]
295/// This allows expressions to access other tables
296pub struct ExtContext {
297    #[pyo3(get)]
298    input: usize,
299    #[pyo3(get)]
300    contexts: Vec<usize>,
301}
302
303#[pyclass]
304pub struct Sink {
305    #[pyo3(get)]
306    input: usize,
307    #[pyo3(get)]
308    payload: PyObject,
309}
310
311pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
312    match plan {
313        IR::PythonScan { options } => {
314            let python_src = match options.python_source {
315                PythonScanSource::Pyarrow => "pyarrow",
316                PythonScanSource::Cuda => "cuda",
317                PythonScanSource::IOPlugin => "io_plugin",
318            };
319
320            PythonScan {
321                options: (
322                    options
323                        .scan_fn
324                        .as_ref()
325                        .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
326                    options.with_columns.as_ref().map_or_else(
327                        || Ok(py.None()),
328                        |cols| {
329                            cols.iter()
330                                .map(|x| x.as_str())
331                                .collect::<Vec<_>>()
332                                .into_py_any(py)
333                        },
334                    )?,
335                    python_src,
336                    match &options.predicate {
337                        PythonPredicate::None => py.None(),
338                        PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
339                        PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
340                    },
341                    options
342                        .n_rows
343                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
344                )
345                    .into_py_any(py)?,
346            }
347            .into_py_any(py)
348        },
349        IR::Slice { input, offset, len } => Slice {
350            input: input.0,
351            offset: *offset,
352            len: *len,
353        }
354        .into_py_any(py),
355        IR::Filter { input, predicate } => Filter {
356            input: input.0,
357            predicate: predicate.into(),
358        }
359        .into_py_any(py),
360        IR::Scan {
361            hive_parts: Some(_),
362            ..
363        } => Err(PyNotImplementedError::new_err(
364            "scan with hive partitioning",
365        )),
366        IR::Scan {
367            sources,
368            file_info: _,
369            hive_parts: _,
370            predicate,
371            output_schema: _,
372            scan_type,
373            unified_scan_args,
374            id: _,
375        } => Scan {
376            paths: sources
377                .into_paths()
378                .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?
379                .into_py_any(py)?,
380            // TODO: file info
381            file_info: py.None(),
382            predicate: predicate.as_ref().map(|e| e.into()),
383            file_options: PyFileOptions {
384                inner: (**unified_scan_args).clone(),
385            },
386            scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
387        }
388        .into_py_any(py),
389        IR::DataFrameScan {
390            df,
391            schema: _,
392            output_schema,
393        } => DataFrameScan {
394            df: PyDataFrame::new((**df).clone()),
395            projection: output_schema.as_ref().map_or_else(
396                || Ok(py.None()),
397                |s| {
398                    s.iter_names()
399                        .map(|s| s.as_str())
400                        .collect::<Vec<_>>()
401                        .into_py_any(py)
402                },
403            )?,
404            selection: None,
405        }
406        .into_py_any(py),
407        IR::SimpleProjection { input, columns: _ } => {
408            SimpleProjection { input: input.0 }.into_py_any(py)
409        },
410        IR::Select {
411            input,
412            expr,
413            schema: _,
414            options,
415        } => Select {
416            expr: expr.iter().map(|e| e.into()).collect(),
417            input: input.0,
418            should_broadcast: options.should_broadcast,
419        }
420        .into_py_any(py),
421        IR::Sort {
422            input,
423            by_column,
424            slice,
425            sort_options,
426        } => Sort {
427            input: input.0,
428            by_column: by_column.iter().map(|e| e.into()).collect(),
429            sort_options: (
430                sort_options.maintain_order,
431                sort_options.nulls_last.clone(),
432                sort_options.descending.clone(),
433            ),
434            slice: *slice,
435        }
436        .into_py_any(py),
437        IR::Cache {
438            input,
439            id,
440            cache_hits,
441        } => Cache {
442            input: input.0,
443            id_: *id,
444            cache_hits: *cache_hits,
445        }
446        .into_py_any(py),
447        IR::GroupBy {
448            input,
449            keys,
450            aggs,
451            schema: _,
452            apply,
453            maintain_order,
454            options,
455        } => GroupBy {
456            input: input.0,
457            keys: keys.iter().map(|e| e.into()).collect(),
458            aggs: aggs.iter().map(|e| e.into()).collect(),
459            apply: apply.as_ref().map_or(Ok(()), |_| {
460                Err(PyNotImplementedError::new_err(format!(
461                    "apply inside GroupBy {:?}",
462                    plan
463                )))
464            })?,
465            maintain_order: *maintain_order,
466            options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
467        }
468        .into_py_any(py),
469        IR::Join {
470            input_left,
471            input_right,
472            schema: _,
473            left_on,
474            right_on,
475            options,
476        } => Join {
477            input_left: input_left.0,
478            input_right: input_right.0,
479            left_on: left_on.iter().map(|e| e.into()).collect(),
480            right_on: right_on.iter().map(|e| e.into()).collect(),
481            options: {
482                let how = &options.args.how;
483                let name = Into::<&str>::into(how).into_pyobject(py)?;
484                (
485                    match how {
486                        #[cfg(feature = "asof_join")]
487                        JoinType::AsOf(_) => {
488                            return Err(PyNotImplementedError::new_err("asof join"));
489                        },
490                        #[cfg(feature = "iejoin")]
491                        JoinType::IEJoin => {
492                            let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
493                            else {
494                                unreachable!()
495                            };
496                            (
497                                name,
498                                crate::Wrap(ie_options.operator1).into_py_any(py)?,
499                                ie_options.operator2.as_ref().map_or_else(
500                                    || Ok(py.None()),
501                                    |op| crate::Wrap(*op).into_py_any(py),
502                                )?,
503                            )
504                                .into_py_any(py)?
505                        },
506                        // This is a cross join fused with a predicate. Shown in the IR::explain as
507                        // NESTED LOOP JOIN
508                        JoinType::Cross if options.options.is_some() => {
509                            return Err(PyNotImplementedError::new_err("nested loop join"));
510                        },
511                        _ => name.into_any().unbind(),
512                    },
513                    options.args.nulls_equal,
514                    options.args.slice,
515                    options.args.suffix().as_str(),
516                    options.args.coalesce.coalesce(how),
517                    Into::<&str>::into(options.args.maintain_order),
518                )
519                    .into_py_any(py)?
520            },
521        }
522        .into_py_any(py),
523        IR::HStack {
524            input,
525            exprs,
526            schema: _,
527            options,
528        } => HStack {
529            input: input.0,
530            exprs: exprs.iter().map(|e| e.into()).collect(),
531            should_broadcast: options.should_broadcast,
532        }
533        .into_py_any(py),
534        IR::Distinct { input, options } => Distinct {
535            input: input.0,
536            options: (
537                Into::<&str>::into(options.keep_strategy),
538                options.subset.as_ref().map_or_else(
539                    || Ok(py.None()),
540                    |f| {
541                        f.iter()
542                            .map(|s| s.as_ref())
543                            .collect::<Vec<&str>>()
544                            .into_py_any(py)
545                    },
546                )?,
547                options.maintain_order,
548                options.slice,
549            )
550                .into_py_any(py)?,
551        }
552        .into_py_any(py),
553        IR::MapFunction { input, function } => MapFunction {
554            input: input.0,
555            function: match function {
556                FunctionIR::OpaquePython(_) => {
557                    return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
558                },
559                FunctionIR::Opaque {
560                    function: _,
561                    schema: _,
562                    predicate_pd: _,
563                    projection_pd: _,
564                    streamable: _,
565                    fmt_str: _,
566                } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
567                FunctionIR::Pipeline {
568                    function: _,
569                    schema: _,
570                    original: _,
571                } => return Err(PyNotImplementedError::new_err("pipeline mapfunction")),
572                FunctionIR::Unnest { columns } => (
573                    "unnest",
574                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
575                )
576                    .into_py_any(py)?,
577                FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
578                FunctionIR::Rename {
579                    existing,
580                    new,
581                    swapping,
582                    schema: _,
583                } => (
584                    "rename",
585                    existing.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
586                    new.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
587                    *swapping,
588                )
589                    .into_py_any(py)?,
590                FunctionIR::Explode { columns, schema: _ } => (
591                    "explode",
592                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
593                )
594                    .into_py_any(py)?,
595                #[cfg(feature = "pivot")]
596                FunctionIR::Unpivot { args, schema: _ } => (
597                    "unpivot",
598                    args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
599                    args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
600                    args.variable_name
601                        .as_ref()
602                        .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
603                    args.value_name
604                        .as_ref()
605                        .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
606                )
607                    .into_py_any(py)?,
608                FunctionIR::RowIndex {
609                    name,
610                    schema: _,
611                    offset,
612                } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
613                FunctionIR::FastCount {
614                    sources,
615                    scan_type,
616                    cloud_options,
617                    alias,
618                } => {
619                    let sources = sources
620                        .into_paths()
621                        .ok_or_else(|| {
622                            PyNotImplementedError::new_err("FastCount with BytesIO sources")
623                        })?
624                        .into_py_any(py)?;
625
626                    let scan_type = scan_type_to_pyobject(py, scan_type, cloud_options)?;
627
628                    let alias = alias
629                        .as_ref()
630                        .map(|a| a.as_str())
631                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
632
633                    ("fast_count", sources, scan_type, alias).into_py_any(py)?
634                },
635            },
636        }
637        .into_py_any(py),
638        IR::Union { inputs, options } => Union {
639            inputs: inputs.iter().map(|n| n.0).collect(),
640            // TODO: rest of options
641            options: options.slice,
642        }
643        .into_py_any(py),
644        IR::HConcat {
645            inputs,
646            schema: _,
647            options: _,
648        } => HConcat {
649            inputs: inputs.iter().map(|n| n.0).collect(),
650            options: (),
651        }
652        .into_py_any(py),
653        IR::ExtContext {
654            input,
655            contexts,
656            schema: _,
657        } => ExtContext {
658            input: input.0,
659            contexts: contexts.iter().map(|n| n.0).collect(),
660        }
661        .into_py_any(py),
662        IR::Sink { input, payload } => Sink {
663            input: input.0,
664            payload: PyString::new(
665                py,
666                &serde_json::to_string(payload)
667                    .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
668            )
669            .into(),
670        }
671        .into_py_any(py),
672        IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
673            "Not expecting to see a SinkMultiple node",
674        )),
675        #[cfg(feature = "merge_sorted")]
676        IR::MergeSorted {
677            input_left,
678            input_right,
679            key,
680        } => MergeSorted {
681            input_left: input_left.0,
682            input_right: input_right.0,
683            key: key.to_string(),
684        }
685        .into_py_any(py),
686        IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
687    }
688}