polars_python/lazyframe/visitor/
nodes.rs

1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars::prelude::python_dsl::PythonScanSource;
4use polars_core::prelude::IdxSize;
5use polars_io::cloud::CloudOptions;
6use polars_ops::prelude::JoinType;
7use polars_plan::plans::IR;
8use polars_plan::prelude::{FileScan, FunctionIR, PythonPredicate, UnifiedScanArgs};
9use pyo3::IntoPyObjectExt;
10use pyo3::exceptions::{PyNotImplementedError, PyValueError};
11use pyo3::prelude::*;
12use pyo3::types::PyString;
13
14use super::expr_nodes::PyGroupbyOptions;
15use crate::PyDataFrame;
16use crate::lazyframe::visit::PyExprIR;
17
18fn scan_type_to_pyobject(
19    py: Python,
20    scan_type: &FileScan,
21    cloud_options: &Option<CloudOptions>,
22) -> PyResult<PyObject> {
23    match scan_type {
24        #[cfg(feature = "csv")]
25        FileScan::Csv { options } => {
26            let options = serde_json::to_string(options)
27                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
28            let cloud_options = serde_json::to_string(cloud_options)
29                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30            Ok(("csv", options, cloud_options).into_py_any(py)?)
31        },
32        #[cfg(feature = "parquet")]
33        FileScan::Parquet { options, .. } => {
34            let options = serde_json::to_string(options)
35                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
36            let cloud_options = serde_json::to_string(cloud_options)
37                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38            Ok(("parquet", options, cloud_options).into_py_any(py)?)
39        },
40        #[cfg(feature = "ipc")]
41        FileScan::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
42        #[cfg(feature = "json")]
43        FileScan::NDJson { options, .. } => {
44            let options = serde_json::to_string(options)
45                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
46            Ok(("ndjson", options).into_py_any(py)?)
47        },
48        FileScan::PythonDataset { .. } => {
49            Err(PyNotImplementedError::new_err("python dataset scan"))
50        },
51        FileScan::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
52    }
53}
54
55#[pyclass]
56/// Scan a table with an optional predicate from a python function
57pub struct PythonScan {
58    #[pyo3(get)]
59    options: PyObject,
60}
61
62#[pyclass]
63/// Slice the table
64pub struct Slice {
65    #[pyo3(get)]
66    input: usize,
67    #[pyo3(get)]
68    offset: i64,
69    #[pyo3(get)]
70    len: IdxSize,
71}
72
73#[pyclass]
74/// Filter the table with a boolean expression
75pub struct Filter {
76    #[pyo3(get)]
77    input: usize,
78    #[pyo3(get)]
79    predicate: PyExprIR,
80}
81
82#[pyclass]
83#[derive(Clone)]
84pub struct PyFileOptions {
85    inner: UnifiedScanArgs,
86}
87
88#[pymethods]
89impl PyFileOptions {
90    #[getter]
91    fn n_rows(&self) -> Option<(i64, usize)> {
92        self.inner
93            .pre_slice
94            .clone()
95            .map(|slice| <(i64, usize)>::try_from(slice).unwrap())
96    }
97    #[getter]
98    fn with_columns(&self) -> Option<Vec<&str>> {
99        self.inner
100            .projection
101            .as_ref()?
102            .iter()
103            .map(|x| x.as_str())
104            .collect::<Vec<_>>()
105            .into()
106    }
107    #[getter]
108    fn cache(&self, _py: Python<'_>) -> bool {
109        self.inner.cache
110    }
111    #[getter]
112    fn row_index(&self) -> Option<(&str, IdxSize)> {
113        self.inner
114            .row_index
115            .as_ref()
116            .map(|n| (n.name.as_str(), n.offset))
117    }
118    #[getter]
119    fn rechunk(&self, _py: Python<'_>) -> bool {
120        self.inner.rechunk
121    }
122    #[getter]
123    fn hive_options(&self, _py: Python<'_>) -> PyResult<PyObject> {
124        Err(PyNotImplementedError::new_err("hive options"))
125    }
126    #[getter]
127    fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
128        self.inner.include_file_paths.as_deref()
129    }
130}
131
132#[pyclass]
133/// Scan a table from file
134pub struct Scan {
135    #[pyo3(get)]
136    paths: PyObject,
137    #[pyo3(get)]
138    file_info: PyObject,
139    #[pyo3(get)]
140    predicate: Option<PyExprIR>,
141    #[pyo3(get)]
142    file_options: PyFileOptions,
143    #[pyo3(get)]
144    scan_type: PyObject,
145}
146
147#[pyclass]
148/// Scan a table from an existing dataframe
149pub struct DataFrameScan {
150    #[pyo3(get)]
151    df: PyDataFrame,
152    #[pyo3(get)]
153    projection: PyObject,
154    #[pyo3(get)]
155    selection: Option<PyExprIR>,
156}
157
158#[pyclass]
159/// Project out columns from a table
160pub struct SimpleProjection {
161    #[pyo3(get)]
162    input: usize,
163}
164
165#[pyclass]
166/// Column selection
167pub struct Select {
168    #[pyo3(get)]
169    input: usize,
170    #[pyo3(get)]
171    expr: Vec<PyExprIR>,
172    #[pyo3(get)]
173    should_broadcast: bool,
174}
175
176#[pyclass]
177/// Sort the table
178pub struct Sort {
179    #[pyo3(get)]
180    input: usize,
181    #[pyo3(get)]
182    by_column: Vec<PyExprIR>,
183    #[pyo3(get)]
184    sort_options: (bool, Vec<bool>, Vec<bool>),
185    #[pyo3(get)]
186    slice: Option<(i64, usize)>,
187}
188
189#[pyclass]
190/// Cache the input at this point in the LP
191pub struct Cache {
192    #[pyo3(get)]
193    input: usize,
194    #[pyo3(get)]
195    id_: usize,
196    #[pyo3(get)]
197    cache_hits: u32,
198}
199
200#[pyclass]
201/// Groupby aggregation
202pub struct GroupBy {
203    #[pyo3(get)]
204    input: usize,
205    #[pyo3(get)]
206    keys: Vec<PyExprIR>,
207    #[pyo3(get)]
208    aggs: Vec<PyExprIR>,
209    #[pyo3(get)]
210    apply: (),
211    #[pyo3(get)]
212    maintain_order: bool,
213    #[pyo3(get)]
214    options: PyObject,
215}
216
217#[pyclass]
218/// Join operation
219pub struct Join {
220    #[pyo3(get)]
221    input_left: usize,
222    #[pyo3(get)]
223    input_right: usize,
224    #[pyo3(get)]
225    left_on: Vec<PyExprIR>,
226    #[pyo3(get)]
227    right_on: Vec<PyExprIR>,
228    #[pyo3(get)]
229    options: PyObject,
230}
231
232#[pyclass]
233/// Merge sorted operation
234pub struct MergeSorted {
235    #[pyo3(get)]
236    input_left: usize,
237    #[pyo3(get)]
238    input_right: usize,
239    #[pyo3(get)]
240    key: String,
241}
242
243#[pyclass]
244/// Adding columns to the table without a Join
245pub struct HStack {
246    #[pyo3(get)]
247    input: usize,
248    #[pyo3(get)]
249    exprs: Vec<PyExprIR>,
250    #[pyo3(get)]
251    should_broadcast: bool,
252}
253
254#[pyclass]
255/// Like Select, but all operations produce a single row.
256pub struct Reduce {
257    #[pyo3(get)]
258    input: usize,
259    #[pyo3(get)]
260    exprs: Vec<PyExprIR>,
261}
262
263#[pyclass]
264/// Remove duplicates from the table
265pub struct Distinct {
266    #[pyo3(get)]
267    input: usize,
268    #[pyo3(get)]
269    options: PyObject,
270}
271#[pyclass]
272/// A (User Defined) Function
273pub struct MapFunction {
274    #[pyo3(get)]
275    input: usize,
276    #[pyo3(get)]
277    function: PyObject,
278}
279#[pyclass]
280pub struct Union {
281    #[pyo3(get)]
282    inputs: Vec<usize>,
283    #[pyo3(get)]
284    options: Option<(i64, usize)>,
285}
286#[pyclass]
287/// Horizontal concatenation of multiple plans
288pub struct HConcat {
289    #[pyo3(get)]
290    inputs: Vec<usize>,
291    #[pyo3(get)]
292    options: (),
293}
294#[pyclass]
295/// This allows expressions to access other tables
296pub struct ExtContext {
297    #[pyo3(get)]
298    input: usize,
299    #[pyo3(get)]
300    contexts: Vec<usize>,
301}
302
303#[pyclass]
304pub struct Sink {
305    #[pyo3(get)]
306    input: usize,
307    #[pyo3(get)]
308    payload: PyObject,
309}
310
311pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
312    match plan {
313        IR::PythonScan { options } => {
314            let python_src = match options.python_source {
315                PythonScanSource::Pyarrow => "pyarrow",
316                PythonScanSource::Cuda => "cuda",
317                PythonScanSource::IOPlugin => "io_plugin",
318            };
319
320            PythonScan {
321                options: (
322                    options
323                        .scan_fn
324                        .as_ref()
325                        .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
326                    options.with_columns.as_ref().map_or_else(
327                        || Ok(py.None()),
328                        |cols| {
329                            cols.iter()
330                                .map(|x| x.as_str())
331                                .collect::<Vec<_>>()
332                                .into_py_any(py)
333                        },
334                    )?,
335                    python_src,
336                    match &options.predicate {
337                        PythonPredicate::None => py.None(),
338                        PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
339                        PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
340                    },
341                    options
342                        .n_rows
343                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
344                )
345                    .into_py_any(py)?,
346            }
347            .into_py_any(py)
348        },
349        IR::Slice { input, offset, len } => Slice {
350            input: input.0,
351            offset: *offset,
352            len: *len,
353        }
354        .into_py_any(py),
355        IR::Filter { input, predicate } => Filter {
356            input: input.0,
357            predicate: predicate.into(),
358        }
359        .into_py_any(py),
360        IR::Scan {
361            hive_parts: Some(_),
362            ..
363        } => Err(PyNotImplementedError::new_err(
364            "scan with hive partitioning",
365        )),
366        IR::Scan {
367            sources,
368            file_info: _,
369            hive_parts: _,
370            predicate,
371            output_schema: _,
372            scan_type,
373            unified_scan_args,
374        } => Scan {
375            paths: sources
376                .into_paths()
377                .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?
378                .into_py_any(py)?,
379            // TODO: file info
380            file_info: py.None(),
381            predicate: predicate.as_ref().map(|e| e.into()),
382            file_options: PyFileOptions {
383                inner: (**unified_scan_args).clone(),
384            },
385            scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
386        }
387        .into_py_any(py),
388        IR::DataFrameScan {
389            df,
390            schema: _,
391            output_schema,
392        } => DataFrameScan {
393            df: PyDataFrame::new((**df).clone()),
394            projection: output_schema.as_ref().map_or_else(
395                || Ok(py.None()),
396                |s| {
397                    s.iter_names()
398                        .map(|s| s.as_str())
399                        .collect::<Vec<_>>()
400                        .into_py_any(py)
401                },
402            )?,
403            selection: None,
404        }
405        .into_py_any(py),
406        IR::SimpleProjection { input, columns: _ } => {
407            SimpleProjection { input: input.0 }.into_py_any(py)
408        },
409        IR::Select {
410            input,
411            expr,
412            schema: _,
413            options,
414        } => Select {
415            expr: expr.iter().map(|e| e.into()).collect(),
416            input: input.0,
417            should_broadcast: options.should_broadcast,
418        }
419        .into_py_any(py),
420        IR::Sort {
421            input,
422            by_column,
423            slice,
424            sort_options,
425        } => Sort {
426            input: input.0,
427            by_column: by_column.iter().map(|e| e.into()).collect(),
428            sort_options: (
429                sort_options.maintain_order,
430                sort_options.nulls_last.clone(),
431                sort_options.descending.clone(),
432            ),
433            slice: *slice,
434        }
435        .into_py_any(py),
436        IR::Cache {
437            input,
438            id,
439            cache_hits,
440        } => Cache {
441            input: input.0,
442            id_: *id,
443            cache_hits: *cache_hits,
444        }
445        .into_py_any(py),
446        IR::GroupBy {
447            input,
448            keys,
449            aggs,
450            schema: _,
451            apply,
452            maintain_order,
453            options,
454        } => GroupBy {
455            input: input.0,
456            keys: keys.iter().map(|e| e.into()).collect(),
457            aggs: aggs.iter().map(|e| e.into()).collect(),
458            apply: apply.as_ref().map_or(Ok(()), |_| {
459                Err(PyNotImplementedError::new_err(format!(
460                    "apply inside GroupBy {:?}",
461                    plan
462                )))
463            })?,
464            maintain_order: *maintain_order,
465            options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
466        }
467        .into_py_any(py),
468        IR::Join {
469            input_left,
470            input_right,
471            schema: _,
472            left_on,
473            right_on,
474            options,
475        } => Join {
476            input_left: input_left.0,
477            input_right: input_right.0,
478            left_on: left_on.iter().map(|e| e.into()).collect(),
479            right_on: right_on.iter().map(|e| e.into()).collect(),
480            options: {
481                let how = &options.args.how;
482                let name = Into::<&str>::into(how).into_pyobject(py)?;
483                (
484                    match how {
485                        #[cfg(feature = "asof_join")]
486                        JoinType::AsOf(_) => {
487                            return Err(PyNotImplementedError::new_err("asof join"));
488                        },
489                        #[cfg(feature = "iejoin")]
490                        JoinType::IEJoin => {
491                            let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
492                            else {
493                                unreachable!()
494                            };
495                            (
496                                name,
497                                crate::Wrap(ie_options.operator1).into_py_any(py)?,
498                                ie_options.operator2.as_ref().map_or_else(
499                                    || Ok(py.None()),
500                                    |op| crate::Wrap(*op).into_py_any(py),
501                                )?,
502                            )
503                                .into_py_any(py)?
504                        },
505                        // This is a cross join fused with a predicate. Shown in the IR::explain as
506                        // NESTED LOOP JOIN
507                        JoinType::Cross if options.options.is_some() => {
508                            return Err(PyNotImplementedError::new_err("nested loop join"));
509                        },
510                        _ => name.into_any().unbind(),
511                    },
512                    options.args.nulls_equal,
513                    options.args.slice,
514                    options.args.suffix().as_str(),
515                    options.args.coalesce.coalesce(how),
516                    Into::<&str>::into(options.args.maintain_order),
517                )
518                    .into_py_any(py)?
519            },
520        }
521        .into_py_any(py),
522        IR::HStack {
523            input,
524            exprs,
525            schema: _,
526            options,
527        } => HStack {
528            input: input.0,
529            exprs: exprs.iter().map(|e| e.into()).collect(),
530            should_broadcast: options.should_broadcast,
531        }
532        .into_py_any(py),
533        IR::Distinct { input, options } => Distinct {
534            input: input.0,
535            options: (
536                Into::<&str>::into(options.keep_strategy),
537                options.subset.as_ref().map_or_else(
538                    || Ok(py.None()),
539                    |f| {
540                        f.iter()
541                            .map(|s| s.as_ref())
542                            .collect::<Vec<&str>>()
543                            .into_py_any(py)
544                    },
545                )?,
546                options.maintain_order,
547                options.slice,
548            )
549                .into_py_any(py)?,
550        }
551        .into_py_any(py),
552        IR::MapFunction { input, function } => MapFunction {
553            input: input.0,
554            function: match function {
555                FunctionIR::OpaquePython(_) => {
556                    return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
557                },
558                FunctionIR::Opaque {
559                    function: _,
560                    schema: _,
561                    predicate_pd: _,
562                    projection_pd: _,
563                    streamable: _,
564                    fmt_str: _,
565                } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
566                FunctionIR::Pipeline {
567                    function: _,
568                    schema: _,
569                    original: _,
570                } => return Err(PyNotImplementedError::new_err("pipeline mapfunction")),
571                FunctionIR::Unnest { columns } => (
572                    "unnest",
573                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
574                )
575                    .into_py_any(py)?,
576                FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
577                FunctionIR::Rename {
578                    existing,
579                    new,
580                    swapping,
581                    schema: _,
582                } => (
583                    "rename",
584                    existing.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
585                    new.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
586                    *swapping,
587                )
588                    .into_py_any(py)?,
589                FunctionIR::Explode { columns, schema: _ } => (
590                    "explode",
591                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
592                )
593                    .into_py_any(py)?,
594                #[cfg(feature = "pivot")]
595                FunctionIR::Unpivot { args, schema: _ } => (
596                    "unpivot",
597                    args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
598                    args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
599                    args.variable_name
600                        .as_ref()
601                        .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
602                    args.value_name
603                        .as_ref()
604                        .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
605                )
606                    .into_py_any(py)?,
607                FunctionIR::RowIndex {
608                    name,
609                    schema: _,
610                    offset,
611                } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
612                FunctionIR::FastCount {
613                    sources,
614                    scan_type,
615                    cloud_options,
616                    alias,
617                } => {
618                    let sources = sources
619                        .into_paths()
620                        .ok_or_else(|| {
621                            PyNotImplementedError::new_err("FastCount with BytesIO sources")
622                        })?
623                        .into_py_any(py)?;
624
625                    let scan_type = scan_type_to_pyobject(py, scan_type, cloud_options)?;
626
627                    let alias = alias
628                        .as_ref()
629                        .map(|a| a.as_str())
630                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
631
632                    ("fast_count", sources, scan_type, alias).into_py_any(py)?
633                },
634            },
635        }
636        .into_py_any(py),
637        IR::Union { inputs, options } => Union {
638            inputs: inputs.iter().map(|n| n.0).collect(),
639            // TODO: rest of options
640            options: options.slice,
641        }
642        .into_py_any(py),
643        IR::HConcat {
644            inputs,
645            schema: _,
646            options: _,
647        } => HConcat {
648            inputs: inputs.iter().map(|n| n.0).collect(),
649            options: (),
650        }
651        .into_py_any(py),
652        IR::ExtContext {
653            input,
654            contexts,
655            schema: _,
656        } => ExtContext {
657            input: input.0,
658            contexts: contexts.iter().map(|n| n.0).collect(),
659        }
660        .into_py_any(py),
661        IR::Sink { input, payload } => Sink {
662            input: input.0,
663            payload: PyString::new(
664                py,
665                &serde_json::to_string(payload)
666                    .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
667            )
668            .into(),
669        }
670        .into_py_any(py),
671        IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
672            "Not expecting to see a SinkMultiple node",
673        )),
674        #[cfg(feature = "merge_sorted")]
675        IR::MergeSorted {
676            input_left,
677            input_right,
678            key,
679        } => MergeSorted {
680            input_left: input_left.0,
681            input_right: input_right.0,
682            key: key.to_string(),
683        }
684        .into_py_any(py),
685        IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
686    }
687}