polars_python/lazyframe/visitor/
nodes.rs

1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars_core::prelude::IdxSize;
4use polars_ops::prelude::JoinType;
5use polars_plan::plans::IR;
6use polars_plan::prelude::{
7    FileCount, FileScan, FileScanOptions, FunctionIR, PythonPredicate, PythonScanSource,
8};
9use pyo3::exceptions::{PyNotImplementedError, PyValueError};
10use pyo3::prelude::*;
11use pyo3::IntoPyObjectExt;
12
13use super::expr_nodes::PyGroupbyOptions;
14use crate::lazyframe::visit::PyExprIR;
15use crate::PyDataFrame;
16
17#[pyclass]
18/// Scan a table with an optional predicate from a python function
19pub struct PythonScan {
20    #[pyo3(get)]
21    options: PyObject,
22}
23
24#[pyclass]
25/// Slice the table
26pub struct Slice {
27    #[pyo3(get)]
28    input: usize,
29    #[pyo3(get)]
30    offset: i64,
31    #[pyo3(get)]
32    len: IdxSize,
33}
34
35#[pyclass]
36/// Filter the table with a boolean expression
37pub struct Filter {
38    #[pyo3(get)]
39    input: usize,
40    #[pyo3(get)]
41    predicate: PyExprIR,
42}
43
44#[pyclass]
45#[derive(Clone)]
46pub struct PyFileOptions {
47    inner: FileScanOptions,
48}
49
50#[pymethods]
51impl PyFileOptions {
52    #[getter]
53    fn n_rows(&self) -> Option<(i64, usize)> {
54        self.inner.slice
55    }
56    #[getter]
57    fn with_columns(&self) -> Option<Vec<&str>> {
58        self.inner
59            .with_columns
60            .as_ref()?
61            .iter()
62            .map(|x| x.as_str())
63            .collect::<Vec<_>>()
64            .into()
65    }
66    #[getter]
67    fn cache(&self, _py: Python<'_>) -> bool {
68        self.inner.cache
69    }
70    #[getter]
71    fn row_index(&self) -> Option<(&str, IdxSize)> {
72        self.inner
73            .row_index
74            .as_ref()
75            .map(|n| (n.name.as_str(), n.offset))
76    }
77    #[getter]
78    fn rechunk(&self, _py: Python<'_>) -> bool {
79        self.inner.rechunk
80    }
81    #[getter]
82    fn file_counter(&self, _py: Python<'_>) -> FileCount {
83        self.inner.file_counter
84    }
85    #[getter]
86    fn hive_options(&self, _py: Python<'_>) -> PyResult<PyObject> {
87        Err(PyNotImplementedError::new_err("hive options"))
88    }
89}
90
91#[pyclass]
92/// Scan a table from file
93pub struct Scan {
94    #[pyo3(get)]
95    paths: PyObject,
96    #[pyo3(get)]
97    file_info: PyObject,
98    #[pyo3(get)]
99    predicate: Option<PyExprIR>,
100    #[pyo3(get)]
101    file_options: PyFileOptions,
102    #[pyo3(get)]
103    scan_type: PyObject,
104}
105
106#[pyclass]
107/// Scan a table from an existing dataframe
108pub struct DataFrameScan {
109    #[pyo3(get)]
110    df: PyDataFrame,
111    #[pyo3(get)]
112    projection: PyObject,
113    #[pyo3(get)]
114    selection: Option<PyExprIR>,
115}
116
117#[pyclass]
118/// Project out columns from a table
119pub struct SimpleProjection {
120    #[pyo3(get)]
121    input: usize,
122}
123
124#[pyclass]
125/// Column selection
126pub struct Select {
127    #[pyo3(get)]
128    input: usize,
129    #[pyo3(get)]
130    expr: Vec<PyExprIR>,
131    #[pyo3(get)]
132    should_broadcast: bool,
133}
134
135#[pyclass]
136/// Sort the table
137pub struct Sort {
138    #[pyo3(get)]
139    input: usize,
140    #[pyo3(get)]
141    by_column: Vec<PyExprIR>,
142    #[pyo3(get)]
143    sort_options: (bool, Vec<bool>, Vec<bool>),
144    #[pyo3(get)]
145    slice: Option<(i64, usize)>,
146}
147
148#[pyclass]
149/// Cache the input at this point in the LP
150pub struct Cache {
151    #[pyo3(get)]
152    input: usize,
153    #[pyo3(get)]
154    id_: usize,
155    #[pyo3(get)]
156    cache_hits: u32,
157}
158
159#[pyclass]
160/// Groupby aggregation
161pub struct GroupBy {
162    #[pyo3(get)]
163    input: usize,
164    #[pyo3(get)]
165    keys: Vec<PyExprIR>,
166    #[pyo3(get)]
167    aggs: Vec<PyExprIR>,
168    #[pyo3(get)]
169    apply: (),
170    #[pyo3(get)]
171    maintain_order: bool,
172    #[pyo3(get)]
173    options: PyObject,
174}
175
176#[pyclass]
177/// Join operation
178pub struct Join {
179    #[pyo3(get)]
180    input_left: usize,
181    #[pyo3(get)]
182    input_right: usize,
183    #[pyo3(get)]
184    left_on: Vec<PyExprIR>,
185    #[pyo3(get)]
186    right_on: Vec<PyExprIR>,
187    #[pyo3(get)]
188    options: PyObject,
189}
190
191#[pyclass]
192/// Adding columns to the table without a Join
193pub struct HStack {
194    #[pyo3(get)]
195    input: usize,
196    #[pyo3(get)]
197    exprs: Vec<PyExprIR>,
198    #[pyo3(get)]
199    should_broadcast: bool,
200}
201
202#[pyclass]
203/// Like Select, but all operations produce a single row.
204pub struct Reduce {
205    #[pyo3(get)]
206    input: usize,
207    #[pyo3(get)]
208    exprs: Vec<PyExprIR>,
209}
210
211#[pyclass]
212/// Remove duplicates from the table
213pub struct Distinct {
214    #[pyo3(get)]
215    input: usize,
216    #[pyo3(get)]
217    options: PyObject,
218}
219#[pyclass]
220/// A (User Defined) Function
221pub struct MapFunction {
222    #[pyo3(get)]
223    input: usize,
224    #[pyo3(get)]
225    function: PyObject,
226}
227#[pyclass]
228pub struct Union {
229    #[pyo3(get)]
230    inputs: Vec<usize>,
231    #[pyo3(get)]
232    options: Option<(i64, usize)>,
233}
234#[pyclass]
235/// Horizontal concatenation of multiple plans
236pub struct HConcat {
237    #[pyo3(get)]
238    inputs: Vec<usize>,
239    #[pyo3(get)]
240    options: (),
241}
242#[pyclass]
243/// This allows expressions to access other tables
244pub struct ExtContext {
245    #[pyo3(get)]
246    input: usize,
247    #[pyo3(get)]
248    contexts: Vec<usize>,
249}
250
251#[pyclass]
252pub struct Sink {
253    #[pyo3(get)]
254    input: usize,
255    #[pyo3(get)]
256    payload: PyObject,
257}
258
259pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
260    match plan {
261        IR::PythonScan { options } => {
262            let python_src = match options.python_source {
263                PythonScanSource::Pyarrow => "pyarrow",
264                PythonScanSource::Cuda => "cuda",
265                PythonScanSource::IOPlugin => "io_plugin",
266            };
267
268            PythonScan {
269                options: (
270                    options
271                        .scan_fn
272                        .as_ref()
273                        .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
274                    options.with_columns.as_ref().map_or_else(
275                        || Ok(py.None()),
276                        |cols| {
277                            cols.iter()
278                                .map(|x| x.as_str())
279                                .collect::<Vec<_>>()
280                                .into_py_any(py)
281                        },
282                    )?,
283                    python_src,
284                    match &options.predicate {
285                        PythonPredicate::None => py.None(),
286                        PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
287                        PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
288                    },
289                    options
290                        .n_rows
291                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
292                )
293                    .into_py_any(py)?,
294            }
295            .into_py_any(py)
296        },
297        IR::Slice { input, offset, len } => Slice {
298            input: input.0,
299            offset: *offset,
300            len: *len,
301        }
302        .into_py_any(py),
303        IR::Filter { input, predicate } => Filter {
304            input: input.0,
305            predicate: predicate.into(),
306        }
307        .into_py_any(py),
308        IR::Scan {
309            hive_parts: Some(_),
310            ..
311        } => Err(PyNotImplementedError::new_err(
312            "scan with hive partitioning",
313        )),
314        IR::Scan {
315            sources,
316            file_info: _,
317            hive_parts: _,
318            predicate,
319            output_schema: _,
320            scan_type,
321            file_options,
322        } => Scan {
323            paths: sources
324                .into_paths()
325                .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?
326                .into_py_any(py)?,
327            // TODO: file info
328            file_info: py.None(),
329            predicate: predicate.as_ref().map(|e| e.into()),
330            file_options: PyFileOptions {
331                inner: file_options.clone(),
332            },
333            scan_type: match scan_type {
334                #[cfg(feature = "csv")]
335                FileScan::Csv {
336                    options,
337                    cloud_options,
338                } => {
339                    // Since these options structs are serializable,
340                    // we just use the serde json representation
341                    let options = serde_json::to_string(options)
342                        .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
343                    let cloud_options = serde_json::to_string(cloud_options)
344                        .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
345                    ("csv", options, cloud_options).into_py_any(py)?
346                },
347                #[cfg(feature = "parquet")]
348                FileScan::Parquet {
349                    options,
350                    cloud_options,
351                    ..
352                } => {
353                    let options = serde_json::to_string(options)
354                        .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
355                    let cloud_options = serde_json::to_string(cloud_options)
356                        .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
357                    ("parquet", options, cloud_options).into_py_any(py)?
358                },
359                #[cfg(feature = "ipc")]
360                FileScan::Ipc { .. } => return Err(PyNotImplementedError::new_err("ipc scan")),
361                #[cfg(feature = "json")]
362                FileScan::NDJson { options, .. } => {
363                    // TODO: Also pass cloud_options
364                    let options = serde_json::to_string(options)
365                        .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
366                    ("ndjson", options).into_py_any(py)?
367                },
368                FileScan::Anonymous { .. } => {
369                    return Err(PyNotImplementedError::new_err("anonymous scan"))
370                },
371            },
372        }
373        .into_py_any(py),
374        IR::DataFrameScan {
375            df,
376            schema: _,
377            output_schema,
378        } => DataFrameScan {
379            df: PyDataFrame::new((**df).clone()),
380            projection: output_schema.as_ref().map_or_else(
381                || Ok(py.None()),
382                |s| {
383                    s.iter_names()
384                        .map(|s| s.as_str())
385                        .collect::<Vec<_>>()
386                        .into_py_any(py)
387                },
388            )?,
389            selection: None,
390        }
391        .into_py_any(py),
392        IR::SimpleProjection { input, columns: _ } => {
393            SimpleProjection { input: input.0 }.into_py_any(py)
394        },
395        IR::Select {
396            input,
397            expr,
398            schema: _,
399            options,
400        } => Select {
401            expr: expr.iter().map(|e| e.into()).collect(),
402            input: input.0,
403            should_broadcast: options.should_broadcast,
404        }
405        .into_py_any(py),
406        IR::Sort {
407            input,
408            by_column,
409            slice,
410            sort_options,
411        } => Sort {
412            input: input.0,
413            by_column: by_column.iter().map(|e| e.into()).collect(),
414            sort_options: (
415                sort_options.maintain_order,
416                sort_options.nulls_last.clone(),
417                sort_options.descending.clone(),
418            ),
419            slice: *slice,
420        }
421        .into_py_any(py),
422        IR::Cache {
423            input,
424            id,
425            cache_hits,
426        } => Cache {
427            input: input.0,
428            id_: *id,
429            cache_hits: *cache_hits,
430        }
431        .into_py_any(py),
432        IR::GroupBy {
433            input,
434            keys,
435            aggs,
436            schema: _,
437            apply,
438            maintain_order,
439            options,
440        } => GroupBy {
441            input: input.0,
442            keys: keys.iter().map(|e| e.into()).collect(),
443            aggs: aggs.iter().map(|e| e.into()).collect(),
444            apply: apply.as_ref().map_or(Ok(()), |_| {
445                Err(PyNotImplementedError::new_err(format!(
446                    "apply inside GroupBy {:?}",
447                    plan
448                )))
449            })?,
450            maintain_order: *maintain_order,
451            options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
452        }
453        .into_py_any(py),
454        IR::Join {
455            input_left,
456            input_right,
457            schema: _,
458            left_on,
459            right_on,
460            options,
461        } => Join {
462            input_left: input_left.0,
463            input_right: input_right.0,
464            left_on: left_on.iter().map(|e| e.into()).collect(),
465            right_on: right_on.iter().map(|e| e.into()).collect(),
466            options: {
467                let how = &options.args.how;
468                let name = Into::<&str>::into(how).into_pyobject(py)?;
469                (
470                    match how {
471                        #[cfg(feature = "asof_join")]
472                        JoinType::AsOf(_) => {
473                            return Err(PyNotImplementedError::new_err("asof join"))
474                        },
475                        #[cfg(feature = "iejoin")]
476                        JoinType::IEJoin => {
477                            let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
478                            else {
479                                unreachable!()
480                            };
481                            (
482                                name,
483                                crate::Wrap(ie_options.operator1).into_py_any(py)?,
484                                ie_options.operator2.as_ref().map_or_else(
485                                    || Ok(py.None()),
486                                    |op| crate::Wrap(*op).into_py_any(py),
487                                )?,
488                            )
489                                .into_py_any(py)?
490                        },
491                        // This is a cross join fused with a predicate. Shown in the IR::explain as
492                        // NESTED LOOP JOIN
493                        JoinType::Cross if options.options.is_some() => {
494                            return Err(PyNotImplementedError::new_err("nested loop join"))
495                        },
496                        _ => name.into_any().unbind(),
497                    },
498                    options.args.join_nulls,
499                    options.args.slice,
500                    options.args.suffix().as_str(),
501                    options.args.coalesce.coalesce(how),
502                    Into::<&str>::into(options.args.maintain_order),
503                )
504                    .into_py_any(py)?
505            },
506        }
507        .into_py_any(py),
508        IR::HStack {
509            input,
510            exprs,
511            schema: _,
512            options,
513        } => HStack {
514            input: input.0,
515            exprs: exprs.iter().map(|e| e.into()).collect(),
516            should_broadcast: options.should_broadcast,
517        }
518        .into_py_any(py),
519        IR::Distinct { input, options } => Distinct {
520            input: input.0,
521            options: (
522                Into::<&str>::into(options.keep_strategy),
523                options.subset.as_ref().map_or_else(
524                    || Ok(py.None()),
525                    |f| {
526                        f.iter()
527                            .map(|s| s.as_ref())
528                            .collect::<Vec<&str>>()
529                            .into_py_any(py)
530                    },
531                )?,
532                options.maintain_order,
533                options.slice,
534            )
535                .into_py_any(py)?,
536        }
537        .into_py_any(py),
538        IR::MapFunction { input, function } => MapFunction {
539            input: input.0,
540            function: match function {
541                FunctionIR::OpaquePython(_) => {
542                    return Err(PyNotImplementedError::new_err("opaque python mapfunction"))
543                },
544                FunctionIR::Opaque {
545                    function: _,
546                    schema: _,
547                    predicate_pd: _,
548                    projection_pd: _,
549                    streamable: _,
550                    fmt_str: _,
551                } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
552                FunctionIR::Pipeline {
553                    function: _,
554                    schema: _,
555                    original: _,
556                } => return Err(PyNotImplementedError::new_err("pipeline mapfunction")),
557                FunctionIR::Unnest { columns } => (
558                    "unnest",
559                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
560                )
561                    .into_py_any(py)?,
562                FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
563                #[cfg(feature = "merge_sorted")]
564                FunctionIR::MergeSorted { column } => {
565                    ("merge_sorted", column.to_string()).into_py_any(py)?
566                },
567                FunctionIR::Rename {
568                    existing,
569                    new,
570                    swapping,
571                    schema: _,
572                } => (
573                    "rename",
574                    existing.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
575                    new.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
576                    *swapping,
577                )
578                    .into_py_any(py)?,
579                FunctionIR::Explode { columns, schema: _ } => (
580                    "explode",
581                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
582                )
583                    .into_py_any(py)?,
584                #[cfg(feature = "pivot")]
585                FunctionIR::Unpivot { args, schema: _ } => (
586                    "unpivot",
587                    args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
588                    args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
589                    args.variable_name
590                        .as_ref()
591                        .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
592                    args.value_name
593                        .as_ref()
594                        .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
595                )
596                    .into_py_any(py)?,
597                FunctionIR::RowIndex {
598                    name,
599                    schema: _,
600                    offset,
601                } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
602                FunctionIR::FastCount {
603                    sources: _,
604                    scan_type: _,
605                    alias: _,
606                } => return Err(PyNotImplementedError::new_err("function count")),
607            },
608        }
609        .into_py_any(py),
610        IR::Union { inputs, options } => Union {
611            inputs: inputs.iter().map(|n| n.0).collect(),
612            // TODO: rest of options
613            options: options.slice,
614        }
615        .into_py_any(py),
616        IR::HConcat {
617            inputs,
618            schema: _,
619            options: _,
620        } => HConcat {
621            inputs: inputs.iter().map(|n| n.0).collect(),
622            options: (),
623        }
624        .into_py_any(py),
625        IR::ExtContext {
626            input,
627            contexts,
628            schema: _,
629        } => ExtContext {
630            input: input.0,
631            contexts: contexts.iter().map(|n| n.0).collect(),
632        }
633        .into_py_any(py),
634        IR::Sink {
635            input: _,
636            payload: _,
637        } => Err(PyNotImplementedError::new_err(
638            "Not expecting to see a Sink node",
639        )),
640        IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
641    }
642}