Skip to main content

polars_python/lazyframe/visitor/
nodes.rs

1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars::prelude::deletion::DeletionFilesList;
4use polars::prelude::python_dsl::PythonScanSource;
5use polars::prelude::{ColumnMapping, PredicateFileSkip};
6use polars_core::prelude::IdxSize;
7use polars_io::cloud::CloudOptions;
8use polars_ops::prelude::JoinType;
9use polars_plan::plans::{HintIR, IR};
10use polars_plan::prelude::{FileScanIR, FunctionIR, PythonPredicate, UnifiedScanArgs};
11use pyo3::IntoPyObjectExt;
12use pyo3::exceptions::{PyNotImplementedError, PyValueError};
13use pyo3::prelude::*;
14use pyo3::types::{PyDict, PyList, PyString};
15
16use super::expr_nodes::PyGroupbyOptions;
17use crate::PyDataFrame;
18use crate::lazyframe::visit::PyExprIR;
19
20fn scan_type_to_pyobject(
21    py: Python<'_>,
22    scan_type: &FileScanIR,
23    cloud_options: &Option<CloudOptions>,
24) -> PyResult<Py<PyAny>> {
25    match scan_type {
26        #[cfg(feature = "csv")]
27        FileScanIR::Csv { options } => {
28            let options = serde_json::to_string(options)
29                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30            let cloud_options = serde_json::to_string(cloud_options)
31                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
32            Ok(("csv", options, cloud_options).into_py_any(py)?)
33        },
34        #[cfg(feature = "parquet")]
35        FileScanIR::Parquet { options, .. } => {
36            let options = serde_json::to_string(options)
37                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38            let cloud_options = serde_json::to_string(cloud_options)
39                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
40            Ok(("parquet", options, cloud_options).into_py_any(py)?)
41        },
42        #[cfg(feature = "ipc")]
43        FileScanIR::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
44        #[cfg(feature = "json")]
45        FileScanIR::NDJson { options, .. } => {
46            let options = serde_json::to_string(options)
47                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
48            Ok(("ndjson", options).into_py_any(py)?)
49        },
50        #[cfg(feature = "scan_lines")]
51        FileScanIR::Lines { name } => Ok(("lines", name.as_str()).into_py_any(py)?),
52        FileScanIR::ExpandedPaths { name } => {
53            Ok(("expanded-paths", name.as_str()).into_py_any(py)?)
54        },
55        FileScanIR::PythonDataset { .. } => {
56            Err(PyNotImplementedError::new_err("python dataset scan"))
57        },
58        FileScanIR::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
59    }
60}
61
62#[pyclass(frozen)]
63/// Scan a table with an optional predicate from a python function
64pub struct PythonScan {
65    #[pyo3(get)]
66    options: Py<PyAny>,
67}
68
69#[pyclass(frozen)]
70/// Slice the table
71pub struct Slice {
72    #[pyo3(get)]
73    input: usize,
74    #[pyo3(get)]
75    offset: i64,
76    #[pyo3(get)]
77    len: IdxSize,
78}
79
80#[pyclass(frozen)]
81/// Filter the table with a boolean expression
82pub struct Filter {
83    #[pyo3(get)]
84    input: usize,
85    #[pyo3(get)]
86    predicate: PyExprIR,
87}
88
89#[pyclass(frozen, skip_from_py_object)]
90#[derive(Clone)]
91pub struct PyFileOptions {
92    inner: UnifiedScanArgs,
93}
94
95#[pymethods]
96impl PyFileOptions {
97    #[getter]
98    fn n_rows(&self) -> Option<(i64, IdxSize)> {
99        self.inner
100            .pre_slice
101            .clone()
102            .map(|slice| slice.to_signed_offset_len())
103    }
104    #[getter]
105    fn with_columns(&self) -> Option<Vec<&str>> {
106        self.inner
107            .projection
108            .as_ref()?
109            .iter()
110            .map(|x| x.as_str())
111            .collect::<Vec<_>>()
112            .into()
113    }
114    #[getter]
115    fn cache(&self, _py: Python<'_>) -> bool {
116        self.inner.cache
117    }
118    #[getter]
119    fn row_index(&self) -> Option<(&str, IdxSize)> {
120        self.inner
121            .row_index
122            .as_ref()
123            .map(|n| (n.name.as_str(), n.offset))
124    }
125    #[getter]
126    fn rechunk(&self, _py: Python<'_>) -> bool {
127        self.inner.rechunk
128    }
129    #[getter]
130    fn hive_options(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {
131        Err(PyNotImplementedError::new_err("hive options"))
132    }
133    #[getter]
134    fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
135        self.inner.include_file_paths.as_deref()
136    }
137
138    /// One of:
139    /// * None
140    /// * ("iceberg-position-delete", dict[int, list[str]])
141    #[getter]
142    fn deletion_files(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
143        Ok(match &self.inner.deletion_files {
144            None => py.None().into_any(),
145            Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
146                let out = PyDict::new(py);
147                for (k, v) in paths.iter() {
148                    out.set_item(*k, v.as_ref())?;
149                }
150                ("iceberg-position-delete", out)
151                    .into_pyobject(py)?
152                    .into_any()
153                    .unbind()
154            },
155            Some(DeletionFilesList::Delta(provider)) => {
156                ("delta-deletion-vector", provider.callback().0.clone_ref(py))
157                    .into_pyobject(py)?
158                    .into_any()
159                    .unbind()
160            },
161        })
162    }
163
164    /// One of:
165    /// * None
166    /// * ("iceberg-column-mapping", <unimplemented>)
167    #[getter]
168    fn column_mapping(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
169        Ok(match &self.inner.column_mapping {
170            None => py.None().into_any(),
171
172            Some(ColumnMapping::Iceberg { .. }) => unimplemented!(),
173        })
174    }
175}
176
177#[pyclass(frozen)]
178/// Scan a table from file
179pub struct Scan {
180    #[pyo3(get)]
181    paths: Py<PyAny>,
182    #[pyo3(get)]
183    file_info: Py<PyAny>,
184    #[pyo3(get)]
185    predicate: Option<PyExprIR>,
186    #[pyo3(get)]
187    file_options: PyFileOptions,
188    #[pyo3(get)]
189    scan_type: Py<PyAny>,
190}
191
192#[pyclass(frozen)]
193/// Scan a table from an existing dataframe
194pub struct DataFrameScan {
195    #[pyo3(get)]
196    df: PyDataFrame,
197    #[pyo3(get)]
198    projection: Py<PyAny>,
199    #[pyo3(get)]
200    selection: Option<PyExprIR>,
201}
202
203#[pyclass(frozen)]
204/// Project out columns from a table
205pub struct SimpleProjection {
206    #[pyo3(get)]
207    input: usize,
208}
209
210#[pyclass(frozen)]
211/// Column selection
212pub struct Select {
213    #[pyo3(get)]
214    input: usize,
215    #[pyo3(get)]
216    expr: Vec<PyExprIR>,
217    #[pyo3(get)]
218    should_broadcast: bool,
219}
220
221#[pyclass(frozen)]
222/// Sort the table
223pub struct Sort {
224    #[pyo3(get)]
225    input: usize,
226    #[pyo3(get)]
227    by_column: Vec<PyExprIR>,
228    #[pyo3(get)]
229    sort_options: (bool, Vec<bool>, Vec<bool>),
230    #[pyo3(get)]
231    slice: Option<(i64, usize)>,
232}
233
234#[pyclass(frozen)]
235/// Cache the input at this point in the LP
236pub struct Cache {
237    #[pyo3(get)]
238    input: usize,
239    #[pyo3(get)]
240    id_: u128,
241}
242
243#[pyclass(frozen)]
244/// Groupby aggregation
245pub struct GroupBy {
246    #[pyo3(get)]
247    input: usize,
248    #[pyo3(get)]
249    keys: Vec<PyExprIR>,
250    #[pyo3(get)]
251    aggs: Vec<PyExprIR>,
252    #[pyo3(get)]
253    apply: (),
254    #[pyo3(get)]
255    maintain_order: bool,
256    #[pyo3(get)]
257    options: Py<PyAny>,
258}
259
260#[pyclass(frozen)]
261/// Join operation
262pub struct Join {
263    #[pyo3(get)]
264    input_left: usize,
265    #[pyo3(get)]
266    input_right: usize,
267    #[pyo3(get)]
268    left_on: Vec<PyExprIR>,
269    #[pyo3(get)]
270    right_on: Vec<PyExprIR>,
271    #[pyo3(get)]
272    options: Py<PyAny>,
273}
274
275#[pyclass(frozen)]
276/// Join operation
277pub struct Gather {
278    #[pyo3(get)]
279    input: usize,
280    #[pyo3(get)]
281    idxs: usize,
282    #[pyo3(get)]
283    null_on_oob: bool,
284}
285
286#[pyclass(frozen)]
287/// Merge sorted operation
288pub struct MergeSorted {
289    #[pyo3(get)]
290    input_left: usize,
291    #[pyo3(get)]
292    input_right: usize,
293    #[pyo3(get)]
294    key: String,
295    #[pyo3(get)]
296    maintain_order: bool,
297}
298
299#[pyclass(frozen)]
300/// Adding columns to the table without a Join
301pub struct HStack {
302    #[pyo3(get)]
303    input: usize,
304    #[pyo3(get)]
305    exprs: Vec<PyExprIR>,
306    #[pyo3(get)]
307    should_broadcast: bool,
308}
309
310#[pyclass(frozen)]
311/// Like Select, but all operations produce a single row.
312pub struct Reduce {
313    #[pyo3(get)]
314    input: usize,
315    #[pyo3(get)]
316    exprs: Vec<PyExprIR>,
317}
318
319#[pyclass(frozen)]
320/// Remove duplicates from the table
321pub struct Distinct {
322    #[pyo3(get)]
323    input: usize,
324    #[pyo3(get)]
325    options: Py<PyAny>,
326}
327#[pyclass(frozen)]
328/// A (User Defined) Function
329pub struct MapFunction {
330    #[pyo3(get)]
331    input: usize,
332    #[pyo3(get)]
333    function: Py<PyAny>,
334}
335#[pyclass(frozen)]
336pub struct Union {
337    #[pyo3(get)]
338    inputs: Vec<usize>,
339    #[pyo3(get)]
340    slice: Option<(i64, usize)>,
341    #[pyo3(get)]
342    rows: (Option<usize>, usize),
343    #[pyo3(get)]
344    maintain_order: bool,
345}
346#[pyclass(frozen)]
347/// Horizontal concatenation of multiple plans
348pub struct HConcat {
349    #[pyo3(get)]
350    inputs: Vec<usize>,
351    #[pyo3(get)]
352    options: Py<PyAny>,
353}
354#[pyclass(frozen)]
355/// This allows expressions to access other tables
356pub struct ExtContext {
357    #[pyo3(get)]
358    input: usize,
359    #[pyo3(get)]
360    contexts: Vec<usize>,
361}
362
363#[pyclass(frozen)]
364pub struct Sink {
365    #[pyo3(get)]
366    input: usize,
367    #[pyo3(get)]
368    payload: Py<PyAny>,
369}
370
371pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<Py<PyAny>> {
372    match plan {
373        IR::PythonScan { options } => {
374            let python_src = match options.python_source {
375                PythonScanSource::Pyarrow => "pyarrow",
376                PythonScanSource::Cuda => "cuda",
377                PythonScanSource::IOPlugin => "io_plugin",
378            };
379
380            PythonScan {
381                options: (
382                    options
383                        .scan_fn
384                        .as_ref()
385                        .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
386                    options.with_columns.as_ref().map_or_else(
387                        || Ok(py.None()),
388                        |cols| {
389                            cols.iter()
390                                .map(|x| x.as_str())
391                                .collect::<Vec<_>>()
392                                .into_py_any(py)
393                        },
394                    )?,
395                    python_src,
396                    match &options.predicate {
397                        PythonPredicate::None => py.None(),
398                        PythonPredicate::PyArrow {
399                            predicate,
400                            has_residual,
401                        } => {
402                            ("pyarrow", predicate, "has_residual", has_residual).into_py_any(py)?
403                        },
404                        PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
405                    },
406                    options
407                        .n_rows
408                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
409                )
410                    .into_py_any(py)?,
411            }
412            .into_py_any(py)
413        },
414        IR::Slice { input, offset, len } => Slice {
415            input: input.0,
416            offset: *offset,
417            len: *len,
418        }
419        .into_py_any(py),
420        IR::Filter { input, predicate } => Filter {
421            input: input.0,
422            predicate: predicate.into(),
423        }
424        .into_py_any(py),
425        IR::Scan {
426            hive_parts: Some(_),
427            ..
428        } => Err(PyNotImplementedError::new_err(
429            "scan with hive partitioning",
430        )),
431        IR::Scan {
432            sources,
433            file_info: _,
434            hive_parts: _,
435            predicate,
436            predicate_file_skip_applied,
437            output_schema: _,
438            scan_type,
439            unified_scan_args,
440        } => {
441            Scan {
442                paths: {
443                    let paths = sources
444                        .into_paths()
445                        .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
446
447                    let out = PyList::new(py, [] as [(); 0])?;
448
449                    // Manual conversion to preserve `uri://...` - converting Rust `Path` to `PosixPath`
450                    // will corrupt to `uri:/...`
451                    for path in paths.iter() {
452                        out.append(path.as_str())?;
453                    }
454
455                    out.into_py_any(py)?
456                },
457                // TODO: file info
458                file_info: py.None(),
459                predicate: predicate
460                    .as_ref()
461                    .filter(|_| {
462                        !matches!(
463                            predicate_file_skip_applied,
464                            Some(PredicateFileSkip {
465                                no_residual_predicate: true,
466                                original_len: _,
467                            })
468                        )
469                    })
470                    .map(|e| e.into()),
471                file_options: PyFileOptions {
472                    inner: (**unified_scan_args).clone(),
473                },
474                scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
475            }
476        }
477        .into_py_any(py),
478        IR::DataFrameScan {
479            df,
480            schema: _,
481            output_schema,
482        } => DataFrameScan {
483            df: PyDataFrame::new((**df).clone()),
484            projection: output_schema.as_ref().map_or_else(
485                || Ok(py.None()),
486                |s| {
487                    s.iter_names()
488                        .map(|s| s.as_str())
489                        .collect::<Vec<_>>()
490                        .into_py_any(py)
491                },
492            )?,
493            selection: None,
494        }
495        .into_py_any(py),
496        IR::SimpleProjection { input, columns: _ } => {
497            SimpleProjection { input: input.0 }.into_py_any(py)
498        },
499        IR::Select {
500            input,
501            expr,
502            schema: _,
503            options,
504        } => Select {
505            expr: expr.iter().map(|e| e.into()).collect(),
506            input: input.0,
507            should_broadcast: options.should_broadcast,
508        }
509        .into_py_any(py),
510        IR::Sort {
511            input,
512            by_column,
513            slice,
514            sort_options,
515        } => Sort {
516            input: input.0,
517            by_column: by_column.iter().map(|e| e.into()).collect(),
518            sort_options: (
519                sort_options.maintain_order,
520                sort_options.nulls_last.clone(),
521                sort_options.descending.clone(),
522            ),
523            slice: slice.as_ref().map(|t| (t.0, t.1)),
524        }
525        .into_py_any(py),
526        IR::Cache { input, id } => Cache {
527            input: input.0,
528            id_: id.as_u128(),
529        }
530        .into_py_any(py),
531        IR::GroupBy {
532            input,
533            keys,
534            aggs,
535            schema: _,
536            apply,
537            maintain_order,
538            options,
539        } => GroupBy {
540            input: input.0,
541            keys: keys.iter().map(|e| e.into()).collect(),
542            aggs: aggs.iter().map(|e| e.into()).collect(),
543            apply: apply.as_ref().map_or(Ok(()), |_| {
544                Err(PyNotImplementedError::new_err(format!(
545                    "apply inside GroupBy {plan:?}"
546                )))
547            })?,
548            maintain_order: *maintain_order,
549            options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
550        }
551        .into_py_any(py),
552        IR::Join {
553            input_left,
554            input_right,
555            schema: _,
556            left_on,
557            right_on,
558            options,
559        } => Join {
560            input_left: input_left.0,
561            input_right: input_right.0,
562            left_on: left_on.iter().map(|e| e.into()).collect(),
563            right_on: right_on.iter().map(|e| e.into()).collect(),
564            options: {
565                let how = &options.args.how;
566                let name = Into::<&str>::into(how).into_pyobject(py)?;
567                (
568                    match how {
569                        #[cfg(feature = "asof_join")]
570                        JoinType::AsOf(_) => {
571                            return Err(PyNotImplementedError::new_err("asof join"));
572                        },
573                        #[cfg(feature = "iejoin")]
574                        JoinType::IEJoin => {
575                            let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
576                            else {
577                                unreachable!()
578                            };
579                            (
580                                name,
581                                crate::Wrap(ie_options.operator1).into_py_any(py)?,
582                                ie_options.operator2.as_ref().map_or_else(
583                                    || Ok(py.None()),
584                                    |op| crate::Wrap(*op).into_py_any(py),
585                                )?,
586                            )
587                                .into_py_any(py)?
588                        },
589                        // This is a cross join fused with a predicate. Shown in the IR::explain as
590                        // NESTED LOOP JOIN
591                        JoinType::Cross if options.options.is_some() => {
592                            return Err(PyNotImplementedError::new_err("nested loop join"));
593                        },
594                        _ => name.into_any().unbind(),
595                    },
596                    options.args.nulls_equal,
597                    options.args.slice,
598                    options.args.suffix().as_str(),
599                    options.args.coalesce.coalesce(how),
600                    Into::<&str>::into(options.args.maintain_order),
601                )
602                    .into_py_any(py)?
603            },
604        }
605        .into_py_any(py),
606        IR::Gather {
607            input,
608            idxs,
609            null_on_oob,
610        } => Gather {
611            input: input.0,
612            idxs: idxs.0,
613            null_on_oob: *null_on_oob,
614        }
615        .into_py_any(py),
616        IR::HStack {
617            input,
618            exprs,
619            schema: _,
620            options,
621        } => HStack {
622            input: input.0,
623            exprs: exprs.iter().map(|e| e.into()).collect(),
624            should_broadcast: options.should_broadcast,
625        }
626        .into_py_any(py),
627        IR::Distinct { input, options } => Distinct {
628            input: input.0,
629            options: (
630                Into::<&str>::into(options.keep_strategy),
631                options.subset.as_ref().map_or_else(
632                    || Ok(py.None()),
633                    |f| {
634                        f.iter()
635                            .map(|s| s.as_ref())
636                            .collect::<Vec<&str>>()
637                            .into_py_any(py)
638                    },
639                )?,
640                options.maintain_order,
641                options.slice,
642            )
643                .into_py_any(py)?,
644        }
645        .into_py_any(py),
646        IR::MapFunction { input, function } => MapFunction {
647            input: input.0,
648            function: match function {
649                FunctionIR::OpaquePython(_) => {
650                    return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
651                },
652                FunctionIR::Opaque {
653                    function: _,
654                    schema: _,
655                    predicate_pd: _,
656                    projection_pd: _,
657                    streamable: _,
658                    fmt_str: _,
659                } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
660                FunctionIR::Unnest { columns, separator } => (
661                    "unnest",
662                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
663                    separator.as_ref().map(|s| s.to_string()),
664                )
665                    .into_py_any(py)?,
666                FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
667                FunctionIR::Explode {
668                    columns,
669                    options,
670                    schema: _,
671                } => (
672                    "explode",
673                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
674                    options.empty_as_null,
675                    options.keep_nulls,
676                )
677                    .into_py_any(py)?,
678                #[cfg(feature = "pivot")]
679                FunctionIR::Unpivot { args, schema: _ } => (
680                    "unpivot",
681                    args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
682                    args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
683                    args.variable_name.as_str().into_py_any(py)?,
684                    args.value_name.as_str().into_py_any(py)?,
685                )
686                    .into_py_any(py)?,
687                FunctionIR::RowIndex {
688                    name,
689                    schema: _,
690                    offset,
691                } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
692                FunctionIR::FastCount {
693                    sources,
694                    scan_type,
695                    alias,
696                    ..
697                } => {
698                    let sources = sources
699                        .into_paths()
700                        .ok_or_else(|| {
701                            PyNotImplementedError::new_err("FastCount with BytesIO sources")
702                        })?
703                        .iter()
704                        .map(|p| p.as_str())
705                        .collect::<Vec<_>>()
706                        .into_py_any(py)?;
707
708                    // FastCount does not support cloud options.
709                    let cloud_options = None;
710
711                    let scan_type = scan_type_to_pyobject(py, scan_type, &cloud_options)?;
712
713                    let alias = alias
714                        .as_ref()
715                        .map(|a| a.as_str())
716                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
717
718                    ("fast_count", sources, scan_type, alias).into_py_any(py)?
719                },
720                FunctionIR::Hint(hint) => match hint {
721                    HintIR::Sorted(sorted_vec) => {
722                        let sorted_info: Vec<_> = sorted_vec
723                            .iter()
724                            .map(|s| (s.column.as_str(), s.descending, s.nulls_last))
725                            .collect();
726                        ("hint_sorted", sorted_info).into_py_any(py)?
727                    },
728                },
729            },
730        }
731        .into_py_any(py),
732        IR::Union { inputs, options } => Union {
733            inputs: inputs.iter().map(|n| n.0).collect(),
734            // Remaining options are implementation detail, not logical
735            slice: options.slice,
736            rows: options.rows,
737            maintain_order: options.maintain_order,
738        }
739        .into_py_any(py),
740        IR::HConcat {
741            inputs,
742            schema: _,
743            options,
744        } => HConcat {
745            inputs: inputs.iter().map(|n| n.0).collect(),
746            options: (
747                options.parallel,
748                options.strict,
749                options.broadcast_unit_length,
750            )
751                .into_py_any(py)?,
752        }
753        .into_py_any(py),
754        IR::ExtContext {
755            input,
756            contexts,
757            schema: _,
758        } => ExtContext {
759            input: input.0,
760            contexts: contexts.iter().map(|n| n.0).collect(),
761        }
762        .into_py_any(py),
763        IR::Sink { input, payload } => Sink {
764            input: input.0,
765            payload: PyString::new(
766                py,
767                &serde_json::to_string(payload)
768                    .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
769            )
770            .into(),
771        }
772        .into_py_any(py),
773        IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
774            "Not expecting to see a SinkMultiple node",
775        )),
776        #[cfg(feature = "merge_sorted")]
777        IR::MergeSorted {
778            input_left,
779            input_right,
780            key,
781            maintain_order,
782        } => MergeSorted {
783            input_left: input_left.0,
784            input_right: input_right.0,
785            key: key.to_string(),
786            maintain_order: *maintain_order,
787        }
788        .into_py_any(py),
789        IR::UnoptimizedDispatch { .. } => Err(PyNotImplementedError::new_err(
790            "Not expecting to see a UnoptimizedDispatch node",
791        )),
792        IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
793    }
794}