Skip to main content

polars_python/lazyframe/visitor/
nodes.rs

1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars::prelude::deletion::DeletionFilesList;
4use polars::prelude::python_dsl::PythonScanSource;
5use polars::prelude::{ColumnMapping, PredicateFileSkip};
6use polars_core::prelude::IdxSize;
7use polars_io::cloud::CloudOptions;
8use polars_ops::prelude::JoinType;
9use polars_plan::plans::{HintIR, IR};
10use polars_plan::prelude::{FileScanIR, FunctionIR, PythonPredicate, UnifiedScanArgs};
11use pyo3::IntoPyObjectExt;
12use pyo3::exceptions::{PyNotImplementedError, PyValueError};
13use pyo3::prelude::*;
14use pyo3::types::{PyDict, PyList, PyString};
15
16use super::expr_nodes::PyGroupbyOptions;
17use crate::PyDataFrame;
18use crate::lazyframe::visit::PyExprIR;
19
20fn scan_type_to_pyobject(
21    py: Python<'_>,
22    scan_type: &FileScanIR,
23    cloud_options: &Option<CloudOptions>,
24) -> PyResult<Py<PyAny>> {
25    match scan_type {
26        #[cfg(feature = "csv")]
27        FileScanIR::Csv { options } => {
28            let options = serde_json::to_string(options)
29                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30            let cloud_options = serde_json::to_string(cloud_options)
31                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
32            Ok(("csv", options, cloud_options).into_py_any(py)?)
33        },
34        #[cfg(feature = "parquet")]
35        FileScanIR::Parquet { options, .. } => {
36            let options = serde_json::to_string(options)
37                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38            let cloud_options = serde_json::to_string(cloud_options)
39                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
40            Ok(("parquet", options, cloud_options).into_py_any(py)?)
41        },
42        #[cfg(feature = "ipc")]
43        FileScanIR::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
44        #[cfg(feature = "json")]
45        FileScanIR::NDJson { options, .. } => {
46            let options = serde_json::to_string(options)
47                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
48            Ok(("ndjson", options).into_py_any(py)?)
49        },
50        #[cfg(feature = "scan_lines")]
51        FileScanIR::Lines { name } => Ok(("lines", name.as_str()).into_py_any(py)?),
52        FileScanIR::ExpandedPaths { name } => {
53            Ok(("expanded-paths", name.as_str()).into_py_any(py)?)
54        },
55        FileScanIR::PythonDataset { .. } => {
56            Err(PyNotImplementedError::new_err("python dataset scan"))
57        },
58        FileScanIR::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
59    }
60}
61
62#[pyclass(frozen)]
63/// Scan a table with an optional predicate from a python function
64pub struct PythonScan {
65    #[pyo3(get)]
66    options: Py<PyAny>,
67}
68
69#[pyclass(frozen)]
70/// Slice the table
71pub struct Slice {
72    #[pyo3(get)]
73    input: usize,
74    #[pyo3(get)]
75    offset: i64,
76    #[pyo3(get)]
77    len: IdxSize,
78}
79
80#[pyclass(frozen)]
81/// Filter the table with a boolean expression
82pub struct Filter {
83    #[pyo3(get)]
84    input: usize,
85    #[pyo3(get)]
86    predicate: PyExprIR,
87}
88
89#[pyclass(frozen, skip_from_py_object)]
90#[derive(Clone)]
91pub struct PyFileOptions {
92    inner: UnifiedScanArgs,
93}
94
95#[pymethods]
96impl PyFileOptions {
97    #[getter]
98    fn n_rows(&self) -> Option<(i64, IdxSize)> {
99        self.inner
100            .pre_slice
101            .clone()
102            .map(|slice| slice.to_signed_offset_len())
103    }
104    #[getter]
105    fn with_columns(&self) -> Option<Vec<&str>> {
106        self.inner
107            .projection
108            .as_ref()?
109            .iter()
110            .map(|x| x.as_str())
111            .collect::<Vec<_>>()
112            .into()
113    }
114    #[getter]
115    fn cache(&self, _py: Python<'_>) -> bool {
116        self.inner.cache
117    }
118    #[getter]
119    fn row_index(&self) -> Option<(&str, IdxSize)> {
120        self.inner
121            .row_index
122            .as_ref()
123            .map(|n| (n.name.as_str(), n.offset))
124    }
125    #[getter]
126    fn rechunk(&self, _py: Python<'_>) -> bool {
127        self.inner.rechunk
128    }
129    #[getter]
130    fn hive_options(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {
131        Err(PyNotImplementedError::new_err("hive options"))
132    }
133    #[getter]
134    fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
135        self.inner.include_file_paths.as_deref()
136    }
137
138    /// One of:
139    /// * None
140    /// * ("iceberg-position-delete", dict[int, list[str]])
141    #[getter]
142    fn deletion_files(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
143        Ok(match &self.inner.deletion_files {
144            None => py.None().into_any(),
145            Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
146                let out = PyDict::new(py);
147                for (k, v) in paths.iter() {
148                    out.set_item(*k, v.as_ref())?;
149                }
150                ("iceberg-position-delete", out)
151                    .into_pyobject(py)?
152                    .into_any()
153                    .unbind()
154            },
155            Some(DeletionFilesList::Delta(provider)) => {
156                ("delta-deletion-vector", provider.callback().0.clone_ref(py))
157                    .into_pyobject(py)?
158                    .into_any()
159                    .unbind()
160            },
161        })
162    }
163
164    /// One of:
165    /// * None
166    /// * ("iceberg-column-mapping", <unimplemented>)
167    #[getter]
168    fn column_mapping(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
169        Ok(match &self.inner.column_mapping {
170            None => py.None().into_any(),
171
172            Some(ColumnMapping::Iceberg { .. }) => unimplemented!(),
173        })
174    }
175}
176
177#[pyclass(frozen)]
178/// Scan a table from file
179pub struct Scan {
180    #[pyo3(get)]
181    paths: Py<PyAny>,
182    #[pyo3(get)]
183    file_info: Py<PyAny>,
184    #[pyo3(get)]
185    hive_parts: Option<PyDataFrame>,
186    #[pyo3(get)]
187    predicate: Option<PyExprIR>,
188    #[pyo3(get)]
189    file_options: PyFileOptions,
190    #[pyo3(get)]
191    scan_type: Py<PyAny>,
192}
193
194#[pyclass(frozen)]
195/// Scan a table from an existing dataframe
196pub struct DataFrameScan {
197    #[pyo3(get)]
198    df: PyDataFrame,
199    #[pyo3(get)]
200    projection: Py<PyAny>,
201    #[pyo3(get)]
202    selection: Option<PyExprIR>,
203}
204
205#[pyclass(frozen)]
206/// Project out columns from a table
207pub struct SimpleProjection {
208    #[pyo3(get)]
209    input: usize,
210}
211
212#[pyclass(frozen)]
213/// Column selection
214pub struct Select {
215    #[pyo3(get)]
216    input: usize,
217    #[pyo3(get)]
218    expr: Vec<PyExprIR>,
219    #[pyo3(get)]
220    should_broadcast: bool,
221}
222
223#[pyclass(frozen)]
224/// Sort the table
225pub struct Sort {
226    #[pyo3(get)]
227    input: usize,
228    #[pyo3(get)]
229    by_column: Vec<PyExprIR>,
230    #[pyo3(get)]
231    sort_options: (bool, Vec<bool>, Vec<bool>),
232    #[pyo3(get)]
233    slice: Option<(i64, usize, Option<u128>)>,
234}
235
236#[pyclass(frozen)]
237/// Cache the input at this point in the LP
238pub struct Cache {
239    #[pyo3(get)]
240    input: usize,
241    #[pyo3(get)]
242    id_: u128,
243}
244
245#[pyclass(frozen)]
246/// Groupby aggregation
247pub struct GroupBy {
248    #[pyo3(get)]
249    input: usize,
250    #[pyo3(get)]
251    keys: Vec<PyExprIR>,
252    #[pyo3(get)]
253    aggs: Vec<PyExprIR>,
254    #[pyo3(get)]
255    apply: (),
256    #[pyo3(get)]
257    maintain_order: bool,
258    #[pyo3(get)]
259    options: Py<PyAny>,
260}
261
262#[pyclass(frozen)]
263/// Join operation
264pub struct Join {
265    #[pyo3(get)]
266    input_left: usize,
267    #[pyo3(get)]
268    input_right: usize,
269    #[pyo3(get)]
270    left_on: Vec<PyExprIR>,
271    #[pyo3(get)]
272    right_on: Vec<PyExprIR>,
273    #[pyo3(get)]
274    options: Py<PyAny>,
275}
276
277#[pyclass(frozen)]
278/// Join operation
279pub struct Gather {
280    #[pyo3(get)]
281    input: usize,
282    #[pyo3(get)]
283    idxs: usize,
284    #[pyo3(get)]
285    null_on_oob: bool,
286}
287
288#[pyclass(frozen)]
289/// Merge sorted operation
290pub struct MergeSorted {
291    #[pyo3(get)]
292    input_left: usize,
293    #[pyo3(get)]
294    input_right: usize,
295    #[pyo3(get)]
296    key: String,
297    #[pyo3(get)]
298    maintain_order: bool,
299}
300
301#[pyclass(frozen)]
302/// Adding columns to the table without a Join
303pub struct HStack {
304    #[pyo3(get)]
305    input: usize,
306    #[pyo3(get)]
307    exprs: Vec<PyExprIR>,
308    #[pyo3(get)]
309    should_broadcast: bool,
310}
311
312#[pyclass(frozen)]
313/// Like Select, but all operations produce a single row.
314pub struct Reduce {
315    #[pyo3(get)]
316    input: usize,
317    #[pyo3(get)]
318    exprs: Vec<PyExprIR>,
319}
320
321#[pyclass(frozen)]
322/// Remove duplicates from the table
323pub struct Distinct {
324    #[pyo3(get)]
325    input: usize,
326    #[pyo3(get)]
327    options: Py<PyAny>,
328}
329#[pyclass(frozen)]
330/// A (User Defined) Function
331pub struct MapFunction {
332    #[pyo3(get)]
333    input: usize,
334    #[pyo3(get)]
335    function: Py<PyAny>,
336}
337#[pyclass(frozen)]
338pub struct Union {
339    #[pyo3(get)]
340    inputs: Vec<usize>,
341    #[pyo3(get)]
342    slice: Option<(i64, usize)>,
343    #[pyo3(get)]
344    rows: (Option<usize>, usize),
345    #[pyo3(get)]
346    maintain_order: bool,
347}
348#[pyclass(frozen)]
349/// Horizontal concatenation of multiple plans
350pub struct HConcat {
351    #[pyo3(get)]
352    inputs: Vec<usize>,
353    #[pyo3(get)]
354    options: Py<PyAny>,
355}
356#[pyclass(frozen)]
357/// This allows expressions to access other tables
358pub struct ExtContext {
359    #[pyo3(get)]
360    input: usize,
361    #[pyo3(get)]
362    contexts: Vec<usize>,
363}
364
365#[pyclass(frozen)]
366pub struct Sink {
367    #[pyo3(get)]
368    input: usize,
369    #[pyo3(get)]
370    payload: Py<PyAny>,
371}
372
373pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<Py<PyAny>> {
374    match plan {
375        IR::PythonScan { options } => {
376            let python_src = match options.python_source {
377                PythonScanSource::Pyarrow => "pyarrow",
378                PythonScanSource::Cuda => "cuda",
379                PythonScanSource::IOPlugin => "io_plugin",
380            };
381
382            PythonScan {
383                options: (
384                    options
385                        .scan_fn
386                        .as_ref()
387                        .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
388                    options.with_columns.as_ref().map_or_else(
389                        || Ok(py.None()),
390                        |cols| {
391                            cols.iter()
392                                .map(|x| x.as_str())
393                                .collect::<Vec<_>>()
394                                .into_py_any(py)
395                        },
396                    )?,
397                    python_src,
398                    match &options.predicate {
399                        PythonPredicate::None => py.None(),
400                        PythonPredicate::PyArrow {
401                            predicate,
402                            has_residual,
403                        } => {
404                            ("pyarrow", predicate, "has_residual", has_residual).into_py_any(py)?
405                        },
406                        PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
407                    },
408                    options
409                        .n_rows
410                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
411                )
412                    .into_py_any(py)?,
413            }
414            .into_py_any(py)
415        },
416        IR::Slice { input, offset, len } => Slice {
417            input: input.0,
418            offset: *offset,
419            len: *len,
420        }
421        .into_py_any(py),
422        IR::Filter { input, predicate } => Filter {
423            input: input.0,
424            predicate: predicate.into(),
425        }
426        .into_py_any(py),
427        IR::Scan {
428            sources,
429            file_info: _,
430            hive_parts,
431            predicate,
432            predicate_file_skip_applied,
433            output_schema: _,
434            scan_type,
435            unified_scan_args,
436        } => {
437            Scan {
438                paths: {
439                    let paths = sources
440                        .into_paths()
441                        .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
442
443                    let out = PyList::new(py, [] as [(); 0])?;
444
445                    // Manual conversion to preserve `uri://...` - converting Rust `Path` to `PosixPath`
446                    // will corrupt to `uri:/...`
447                    for path in paths.iter() {
448                        out.append(path.as_str())?;
449                    }
450
451                    out.into_py_any(py)?
452                },
453                // TODO: file info
454                file_info: py.None(),
455                hive_parts: hive_parts
456                    .as_ref()
457                    .map(|h| PyDataFrame::new(h.df().clone())),
458                predicate: predicate
459                    .as_ref()
460                    .filter(|_| {
461                        !matches!(
462                            predicate_file_skip_applied,
463                            Some(PredicateFileSkip {
464                                no_residual_predicate: true,
465                                original_len: _,
466                            })
467                        )
468                    })
469                    .map(|e| e.into()),
470                file_options: PyFileOptions {
471                    inner: (**unified_scan_args).clone(),
472                },
473                scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
474            }
475        }
476        .into_py_any(py),
477        IR::DataFrameScan {
478            df,
479            schema: _,
480            output_schema,
481        } => DataFrameScan {
482            df: PyDataFrame::new((**df).clone()),
483            projection: output_schema.as_ref().map_or_else(
484                || Ok(py.None()),
485                |s| {
486                    s.iter_names()
487                        .map(|s| s.as_str())
488                        .collect::<Vec<_>>()
489                        .into_py_any(py)
490                },
491            )?,
492            selection: None,
493        }
494        .into_py_any(py),
495        IR::SimpleProjection { input, columns: _ } => {
496            SimpleProjection { input: input.0 }.into_py_any(py)
497        },
498        IR::Select {
499            input,
500            expr,
501            schema: _,
502            options,
503        } => Select {
504            expr: expr.iter().map(|e| e.into()).collect(),
505            input: input.0,
506            should_broadcast: options.should_broadcast,
507        }
508        .into_py_any(py),
509        IR::Sort {
510            input,
511            by_column,
512            slice,
513            sort_options,
514        } => Sort {
515            input: input.0,
516            by_column: by_column.iter().map(|e| e.into()).collect(),
517            sort_options: (
518                sort_options.maintain_order,
519                sort_options.nulls_last.clone(),
520                sort_options.descending.clone(),
521            ),
522            slice: slice
523                .as_ref()
524                .map(|t| (t.0, t.1, t.2.as_ref().map(|p| p.id().as_u128()))),
525        }
526        .into_py_any(py),
527        IR::Cache { input, id } => Cache {
528            input: input.0,
529            id_: id.as_u128(),
530        }
531        .into_py_any(py),
532        IR::GroupBy {
533            input,
534            keys,
535            aggs,
536            schema: _,
537            apply,
538            maintain_order,
539            options,
540        } => GroupBy {
541            input: input.0,
542            keys: keys.iter().map(|e| e.into()).collect(),
543            aggs: aggs.iter().map(|e| e.into()).collect(),
544            apply: apply.as_ref().map_or(Ok(()), |_| {
545                Err(PyNotImplementedError::new_err(format!(
546                    "apply inside GroupBy {plan:?}"
547                )))
548            })?,
549            maintain_order: *maintain_order,
550            options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
551        }
552        .into_py_any(py),
553        IR::Join {
554            input_left,
555            input_right,
556            schema: _,
557            left_on,
558            right_on,
559            options,
560        } => Join {
561            input_left: input_left.0,
562            input_right: input_right.0,
563            left_on: left_on.iter().map(|e| e.into()).collect(),
564            right_on: right_on.iter().map(|e| e.into()).collect(),
565            options: {
566                let how = &options.args.how;
567                let name = Into::<&str>::into(how).into_pyobject(py)?;
568                (
569                    match how {
570                        #[cfg(feature = "asof_join")]
571                        JoinType::AsOf(_) => {
572                            return Err(PyNotImplementedError::new_err("asof join"));
573                        },
574                        #[cfg(feature = "iejoin")]
575                        JoinType::IEJoin => {
576                            let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
577                            else {
578                                unreachable!()
579                            };
580                            (
581                                name,
582                                crate::Wrap(ie_options.operator1).into_py_any(py)?,
583                                ie_options.operator2.as_ref().map_or_else(
584                                    || Ok(py.None()),
585                                    |op| crate::Wrap(*op).into_py_any(py),
586                                )?,
587                            )
588                                .into_py_any(py)?
589                        },
590                        // This is a cross join fused with a predicate. Shown in the IR::explain as
591                        // NESTED LOOP JOIN
592                        JoinType::Cross if options.options.is_some() => {
593                            return Err(PyNotImplementedError::new_err("nested loop join"));
594                        },
595                        _ => name.into_any().unbind(),
596                    },
597                    options.args.nulls_equal,
598                    options.args.slice,
599                    options.args.suffix().as_str(),
600                    options.args.coalesce.coalesce(how),
601                    Into::<&str>::into(options.args.maintain_order),
602                )
603                    .into_py_any(py)?
604            },
605        }
606        .into_py_any(py),
607        IR::Gather {
608            input,
609            idxs,
610            null_on_oob,
611        } => Gather {
612            input: input.0,
613            idxs: idxs.0,
614            null_on_oob: *null_on_oob,
615        }
616        .into_py_any(py),
617        IR::HStack {
618            input,
619            exprs,
620            schema: _,
621            options,
622        } => HStack {
623            input: input.0,
624            exprs: exprs.iter().map(|e| e.into()).collect(),
625            should_broadcast: options.should_broadcast,
626        }
627        .into_py_any(py),
628        IR::Distinct { input, options } => Distinct {
629            input: input.0,
630            options: (
631                Into::<&str>::into(options.keep_strategy),
632                options.subset.as_ref().map_or_else(
633                    || Ok(py.None()),
634                    |f| {
635                        f.iter()
636                            .map(|s| s.as_ref())
637                            .collect::<Vec<&str>>()
638                            .into_py_any(py)
639                    },
640                )?,
641                options.maintain_order,
642                options.slice,
643            )
644                .into_py_any(py)?,
645        }
646        .into_py_any(py),
647        IR::MapFunction { input, function } => MapFunction {
648            input: input.0,
649            function: match function {
650                FunctionIR::OpaquePython(_) => {
651                    return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
652                },
653                FunctionIR::Opaque {
654                    function: _,
655                    schema: _,
656                    predicate_pd: _,
657                    projection_pd: _,
658                    streamable: _,
659                    fmt_str: _,
660                } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
661                FunctionIR::Unnest { columns, separator } => (
662                    "unnest",
663                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
664                    separator.as_ref().map(|s| s.to_string()),
665                )
666                    .into_py_any(py)?,
667                FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
668                FunctionIR::Explode {
669                    columns,
670                    options,
671                    schema: _,
672                } => (
673                    "explode",
674                    columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
675                    options.empty_as_null,
676                    options.keep_nulls,
677                )
678                    .into_py_any(py)?,
679                #[cfg(feature = "pivot")]
680                FunctionIR::Unpivot { args, schema: _ } => (
681                    "unpivot",
682                    args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
683                    args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
684                    args.variable_name.as_str().into_py_any(py)?,
685                    args.value_name.as_str().into_py_any(py)?,
686                )
687                    .into_py_any(py)?,
688                FunctionIR::RowIndex {
689                    name,
690                    schema: _,
691                    offset,
692                } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
693                FunctionIR::FastCount {
694                    sources,
695                    scan_type,
696                    alias,
697                    ..
698                } => {
699                    let sources = sources
700                        .into_paths()
701                        .ok_or_else(|| {
702                            PyNotImplementedError::new_err("FastCount with BytesIO sources")
703                        })?
704                        .iter()
705                        .map(|p| p.as_str())
706                        .collect::<Vec<_>>()
707                        .into_py_any(py)?;
708
709                    // FastCount does not support cloud options.
710                    let cloud_options = None;
711
712                    let scan_type = scan_type_to_pyobject(py, scan_type, &cloud_options)?;
713
714                    let alias = alias
715                        .as_ref()
716                        .map(|a| a.as_str())
717                        .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
718
719                    ("fast_count", sources, scan_type, alias).into_py_any(py)?
720                },
721                FunctionIR::Hint(hint) => match hint {
722                    HintIR::Sorted(sorted_vec) => {
723                        let sorted_info: Vec<_> = sorted_vec
724                            .iter()
725                            .map(|s| (s.column.as_str(), s.descending, s.nulls_last))
726                            .collect();
727                        ("hint_sorted", sorted_info).into_py_any(py)?
728                    },
729                },
730            },
731        }
732        .into_py_any(py),
733        IR::Union { inputs, options } => Union {
734            inputs: inputs.iter().map(|n| n.0).collect(),
735            // Remaining options are implementation detail, not logical
736            slice: options.slice,
737            rows: options.rows,
738            maintain_order: options.maintain_order,
739        }
740        .into_py_any(py),
741        IR::HConcat {
742            inputs,
743            schema: _,
744            options,
745        } => HConcat {
746            inputs: inputs.iter().map(|n| n.0).collect(),
747            options: (
748                options.parallel,
749                options.strict,
750                options.broadcast_unit_length,
751            )
752                .into_py_any(py)?,
753        }
754        .into_py_any(py),
755        IR::ExtContext {
756            input,
757            contexts,
758            schema: _,
759        } => ExtContext {
760            input: input.0,
761            contexts: contexts.iter().map(|n| n.0).collect(),
762        }
763        .into_py_any(py),
764        IR::Sink { input, payload } => Sink {
765            input: input.0,
766            payload: PyString::new(
767                py,
768                &serde_json::to_string(payload)
769                    .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
770            )
771            .into(),
772        }
773        .into_py_any(py),
774        IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
775            "Not expecting to see a SinkMultiple node",
776        )),
777        #[cfg(feature = "merge_sorted")]
778        IR::MergeSorted {
779            input_left,
780            input_right,
781            key,
782            maintain_order,
783        } => MergeSorted {
784            input_left: input_left.0,
785            input_right: input_right.0,
786            key: key.to_string(),
787            maintain_order: *maintain_order,
788        }
789        .into_py_any(py),
790        IR::UnoptimizedDispatch { .. } => Err(PyNotImplementedError::new_err(
791            "Not expecting to see a UnoptimizedDispatch node",
792        )),
793        IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
794    }
795}