1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars::prelude::deletion::DeletionFilesList;
4use polars::prelude::python_dsl::PythonScanSource;
5use polars::prelude::{ColumnMapping, PredicateFileSkip};
6use polars_core::prelude::IdxSize;
7use polars_io::cloud::CloudOptions;
8use polars_ops::prelude::JoinType;
9use polars_plan::plans::{HintIR, IR};
10use polars_plan::prelude::{FileScanIR, FunctionIR, PythonPredicate, UnifiedScanArgs};
11use pyo3::IntoPyObjectExt;
12use pyo3::exceptions::{PyNotImplementedError, PyValueError};
13use pyo3::prelude::*;
14use pyo3::types::{PyDict, PyList, PyString};
15
16use super::expr_nodes::PyGroupbyOptions;
17use crate::PyDataFrame;
18use crate::lazyframe::visit::PyExprIR;
19
20fn scan_type_to_pyobject(
21 py: Python<'_>,
22 scan_type: &FileScanIR,
23 cloud_options: &Option<CloudOptions>,
24) -> PyResult<Py<PyAny>> {
25 match scan_type {
26 #[cfg(feature = "csv")]
27 FileScanIR::Csv { options } => {
28 let options = serde_json::to_string(options)
29 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30 let cloud_options = serde_json::to_string(cloud_options)
31 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
32 Ok(("csv", options, cloud_options).into_py_any(py)?)
33 },
34 #[cfg(feature = "parquet")]
35 FileScanIR::Parquet { options, .. } => {
36 let options = serde_json::to_string(options)
37 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38 let cloud_options = serde_json::to_string(cloud_options)
39 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
40 Ok(("parquet", options, cloud_options).into_py_any(py)?)
41 },
42 #[cfg(feature = "ipc")]
43 FileScanIR::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
44 #[cfg(feature = "json")]
45 FileScanIR::NDJson { options, .. } => {
46 let options = serde_json::to_string(options)
47 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
48 Ok(("ndjson", options).into_py_any(py)?)
49 },
50 #[cfg(feature = "scan_lines")]
51 FileScanIR::Lines { name } => Ok(("lines", name.as_str()).into_py_any(py)?),
52 FileScanIR::ExpandedPaths { name } => {
53 Ok(("expanded-paths", name.as_str()).into_py_any(py)?)
54 },
55 FileScanIR::PythonDataset { .. } => {
56 Err(PyNotImplementedError::new_err("python dataset scan"))
57 },
58 FileScanIR::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
59 }
60}
61
62#[pyclass(frozen)]
63pub struct PythonScan {
65 #[pyo3(get)]
66 options: Py<PyAny>,
67}
68
69#[pyclass(frozen)]
70pub struct Slice {
72 #[pyo3(get)]
73 input: usize,
74 #[pyo3(get)]
75 offset: i64,
76 #[pyo3(get)]
77 len: IdxSize,
78}
79
80#[pyclass(frozen)]
81pub struct Filter {
83 #[pyo3(get)]
84 input: usize,
85 #[pyo3(get)]
86 predicate: PyExprIR,
87}
88
89#[pyclass(frozen, skip_from_py_object)]
90#[derive(Clone)]
91pub struct PyFileOptions {
92 inner: UnifiedScanArgs,
93}
94
95#[pymethods]
96impl PyFileOptions {
97 #[getter]
98 fn n_rows(&self) -> Option<(i64, IdxSize)> {
99 self.inner
100 .pre_slice
101 .clone()
102 .map(|slice| slice.to_signed_offset_len())
103 }
104 #[getter]
105 fn with_columns(&self) -> Option<Vec<&str>> {
106 self.inner
107 .projection
108 .as_ref()?
109 .iter()
110 .map(|x| x.as_str())
111 .collect::<Vec<_>>()
112 .into()
113 }
114 #[getter]
115 fn cache(&self, _py: Python<'_>) -> bool {
116 self.inner.cache
117 }
118 #[getter]
119 fn row_index(&self) -> Option<(&str, IdxSize)> {
120 self.inner
121 .row_index
122 .as_ref()
123 .map(|n| (n.name.as_str(), n.offset))
124 }
125 #[getter]
126 fn rechunk(&self, _py: Python<'_>) -> bool {
127 self.inner.rechunk
128 }
129 #[getter]
130 fn hive_options(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {
131 Err(PyNotImplementedError::new_err("hive options"))
132 }
133 #[getter]
134 fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
135 self.inner.include_file_paths.as_deref()
136 }
137
138 #[getter]
142 fn deletion_files(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
143 Ok(match &self.inner.deletion_files {
144 None => py.None().into_any(),
145 Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
146 let out = PyDict::new(py);
147 for (k, v) in paths.iter() {
148 out.set_item(*k, v.as_ref())?;
149 }
150 ("iceberg-position-delete", out)
151 .into_pyobject(py)?
152 .into_any()
153 .unbind()
154 },
155 Some(DeletionFilesList::Delta(provider)) => {
156 ("delta-deletion-vector", provider.callback().0.clone_ref(py))
157 .into_pyobject(py)?
158 .into_any()
159 .unbind()
160 },
161 })
162 }
163
164 #[getter]
168 fn column_mapping(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
169 Ok(match &self.inner.column_mapping {
170 None => py.None().into_any(),
171
172 Some(ColumnMapping::Iceberg { .. }) => unimplemented!(),
173 })
174 }
175}
176
177#[pyclass(frozen)]
178pub struct Scan {
180 #[pyo3(get)]
181 paths: Py<PyAny>,
182 #[pyo3(get)]
183 file_info: Py<PyAny>,
184 #[pyo3(get)]
185 predicate: Option<PyExprIR>,
186 #[pyo3(get)]
187 file_options: PyFileOptions,
188 #[pyo3(get)]
189 scan_type: Py<PyAny>,
190}
191
192#[pyclass(frozen)]
193pub struct DataFrameScan {
195 #[pyo3(get)]
196 df: PyDataFrame,
197 #[pyo3(get)]
198 projection: Py<PyAny>,
199 #[pyo3(get)]
200 selection: Option<PyExprIR>,
201}
202
203#[pyclass(frozen)]
204pub struct SimpleProjection {
206 #[pyo3(get)]
207 input: usize,
208}
209
210#[pyclass(frozen)]
211pub struct Select {
213 #[pyo3(get)]
214 input: usize,
215 #[pyo3(get)]
216 expr: Vec<PyExprIR>,
217 #[pyo3(get)]
218 should_broadcast: bool,
219}
220
221#[pyclass(frozen)]
222pub struct Sort {
224 #[pyo3(get)]
225 input: usize,
226 #[pyo3(get)]
227 by_column: Vec<PyExprIR>,
228 #[pyo3(get)]
229 sort_options: (bool, Vec<bool>, Vec<bool>),
230 #[pyo3(get)]
231 slice: Option<(i64, usize)>,
232}
233
234#[pyclass(frozen)]
235pub struct Cache {
237 #[pyo3(get)]
238 input: usize,
239 #[pyo3(get)]
240 id_: u128,
241}
242
243#[pyclass(frozen)]
244pub struct GroupBy {
246 #[pyo3(get)]
247 input: usize,
248 #[pyo3(get)]
249 keys: Vec<PyExprIR>,
250 #[pyo3(get)]
251 aggs: Vec<PyExprIR>,
252 #[pyo3(get)]
253 apply: (),
254 #[pyo3(get)]
255 maintain_order: bool,
256 #[pyo3(get)]
257 options: Py<PyAny>,
258}
259
260#[pyclass(frozen)]
261pub struct Join {
263 #[pyo3(get)]
264 input_left: usize,
265 #[pyo3(get)]
266 input_right: usize,
267 #[pyo3(get)]
268 left_on: Vec<PyExprIR>,
269 #[pyo3(get)]
270 right_on: Vec<PyExprIR>,
271 #[pyo3(get)]
272 options: Py<PyAny>,
273}
274
275#[pyclass(frozen)]
276pub struct Gather {
278 #[pyo3(get)]
279 input: usize,
280 #[pyo3(get)]
281 idxs: usize,
282 #[pyo3(get)]
283 null_on_oob: bool,
284}
285
286#[pyclass(frozen)]
287pub struct MergeSorted {
289 #[pyo3(get)]
290 input_left: usize,
291 #[pyo3(get)]
292 input_right: usize,
293 #[pyo3(get)]
294 key: String,
295 #[pyo3(get)]
296 maintain_order: bool,
297}
298
299#[pyclass(frozen)]
300pub struct HStack {
302 #[pyo3(get)]
303 input: usize,
304 #[pyo3(get)]
305 exprs: Vec<PyExprIR>,
306 #[pyo3(get)]
307 should_broadcast: bool,
308}
309
310#[pyclass(frozen)]
311pub struct Reduce {
313 #[pyo3(get)]
314 input: usize,
315 #[pyo3(get)]
316 exprs: Vec<PyExprIR>,
317}
318
319#[pyclass(frozen)]
320pub struct Distinct {
322 #[pyo3(get)]
323 input: usize,
324 #[pyo3(get)]
325 options: Py<PyAny>,
326}
327#[pyclass(frozen)]
328pub struct MapFunction {
330 #[pyo3(get)]
331 input: usize,
332 #[pyo3(get)]
333 function: Py<PyAny>,
334}
335#[pyclass(frozen)]
336pub struct Union {
337 #[pyo3(get)]
338 inputs: Vec<usize>,
339 #[pyo3(get)]
340 slice: Option<(i64, usize)>,
341 #[pyo3(get)]
342 rows: (Option<usize>, usize),
343 #[pyo3(get)]
344 maintain_order: bool,
345}
346#[pyclass(frozen)]
347pub struct HConcat {
349 #[pyo3(get)]
350 inputs: Vec<usize>,
351 #[pyo3(get)]
352 options: Py<PyAny>,
353}
354#[pyclass(frozen)]
355pub struct ExtContext {
357 #[pyo3(get)]
358 input: usize,
359 #[pyo3(get)]
360 contexts: Vec<usize>,
361}
362
363#[pyclass(frozen)]
364pub struct Sink {
365 #[pyo3(get)]
366 input: usize,
367 #[pyo3(get)]
368 payload: Py<PyAny>,
369}
370
371pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<Py<PyAny>> {
372 match plan {
373 IR::PythonScan { options } => {
374 let python_src = match options.python_source {
375 PythonScanSource::Pyarrow => "pyarrow",
376 PythonScanSource::Cuda => "cuda",
377 PythonScanSource::IOPlugin => "io_plugin",
378 };
379
380 PythonScan {
381 options: (
382 options
383 .scan_fn
384 .as_ref()
385 .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
386 options.with_columns.as_ref().map_or_else(
387 || Ok(py.None()),
388 |cols| {
389 cols.iter()
390 .map(|x| x.as_str())
391 .collect::<Vec<_>>()
392 .into_py_any(py)
393 },
394 )?,
395 python_src,
396 match &options.predicate {
397 PythonPredicate::None => py.None(),
398 PythonPredicate::PyArrow {
399 predicate,
400 has_residual,
401 } => {
402 ("pyarrow", predicate, "has_residual", has_residual).into_py_any(py)?
403 },
404 PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
405 },
406 options
407 .n_rows
408 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
409 )
410 .into_py_any(py)?,
411 }
412 .into_py_any(py)
413 },
414 IR::Slice { input, offset, len } => Slice {
415 input: input.0,
416 offset: *offset,
417 len: *len,
418 }
419 .into_py_any(py),
420 IR::Filter { input, predicate } => Filter {
421 input: input.0,
422 predicate: predicate.into(),
423 }
424 .into_py_any(py),
425 IR::Scan {
426 hive_parts: Some(_),
427 ..
428 } => Err(PyNotImplementedError::new_err(
429 "scan with hive partitioning",
430 )),
431 IR::Scan {
432 sources,
433 file_info: _,
434 hive_parts: _,
435 predicate,
436 predicate_file_skip_applied,
437 output_schema: _,
438 scan_type,
439 unified_scan_args,
440 } => {
441 Scan {
442 paths: {
443 let paths = sources
444 .into_paths()
445 .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
446
447 let out = PyList::new(py, [] as [(); 0])?;
448
449 for path in paths.iter() {
452 out.append(path.as_str())?;
453 }
454
455 out.into_py_any(py)?
456 },
457 file_info: py.None(),
459 predicate: predicate
460 .as_ref()
461 .filter(|_| {
462 !matches!(
463 predicate_file_skip_applied,
464 Some(PredicateFileSkip {
465 no_residual_predicate: true,
466 original_len: _,
467 })
468 )
469 })
470 .map(|e| e.into()),
471 file_options: PyFileOptions {
472 inner: (**unified_scan_args).clone(),
473 },
474 scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
475 }
476 }
477 .into_py_any(py),
478 IR::DataFrameScan {
479 df,
480 schema: _,
481 output_schema,
482 } => DataFrameScan {
483 df: PyDataFrame::new((**df).clone()),
484 projection: output_schema.as_ref().map_or_else(
485 || Ok(py.None()),
486 |s| {
487 s.iter_names()
488 .map(|s| s.as_str())
489 .collect::<Vec<_>>()
490 .into_py_any(py)
491 },
492 )?,
493 selection: None,
494 }
495 .into_py_any(py),
496 IR::SimpleProjection { input, columns: _ } => {
497 SimpleProjection { input: input.0 }.into_py_any(py)
498 },
499 IR::Select {
500 input,
501 expr,
502 schema: _,
503 options,
504 } => Select {
505 expr: expr.iter().map(|e| e.into()).collect(),
506 input: input.0,
507 should_broadcast: options.should_broadcast,
508 }
509 .into_py_any(py),
510 IR::Sort {
511 input,
512 by_column,
513 slice,
514 sort_options,
515 } => Sort {
516 input: input.0,
517 by_column: by_column.iter().map(|e| e.into()).collect(),
518 sort_options: (
519 sort_options.maintain_order,
520 sort_options.nulls_last.clone(),
521 sort_options.descending.clone(),
522 ),
523 slice: slice.as_ref().map(|t| (t.0, t.1)),
524 }
525 .into_py_any(py),
526 IR::Cache { input, id } => Cache {
527 input: input.0,
528 id_: id.as_u128(),
529 }
530 .into_py_any(py),
531 IR::GroupBy {
532 input,
533 keys,
534 aggs,
535 schema: _,
536 apply,
537 maintain_order,
538 options,
539 } => GroupBy {
540 input: input.0,
541 keys: keys.iter().map(|e| e.into()).collect(),
542 aggs: aggs.iter().map(|e| e.into()).collect(),
543 apply: apply.as_ref().map_or(Ok(()), |_| {
544 Err(PyNotImplementedError::new_err(format!(
545 "apply inside GroupBy {plan:?}"
546 )))
547 })?,
548 maintain_order: *maintain_order,
549 options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
550 }
551 .into_py_any(py),
552 IR::Join {
553 input_left,
554 input_right,
555 schema: _,
556 left_on,
557 right_on,
558 options,
559 } => Join {
560 input_left: input_left.0,
561 input_right: input_right.0,
562 left_on: left_on.iter().map(|e| e.into()).collect(),
563 right_on: right_on.iter().map(|e| e.into()).collect(),
564 options: {
565 let how = &options.args.how;
566 let name = Into::<&str>::into(how).into_pyobject(py)?;
567 (
568 match how {
569 #[cfg(feature = "asof_join")]
570 JoinType::AsOf(_) => {
571 return Err(PyNotImplementedError::new_err("asof join"));
572 },
573 #[cfg(feature = "iejoin")]
574 JoinType::IEJoin => {
575 let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
576 else {
577 unreachable!()
578 };
579 (
580 name,
581 crate::Wrap(ie_options.operator1).into_py_any(py)?,
582 ie_options.operator2.as_ref().map_or_else(
583 || Ok(py.None()),
584 |op| crate::Wrap(*op).into_py_any(py),
585 )?,
586 )
587 .into_py_any(py)?
588 },
589 JoinType::Cross if options.options.is_some() => {
592 return Err(PyNotImplementedError::new_err("nested loop join"));
593 },
594 _ => name.into_any().unbind(),
595 },
596 options.args.nulls_equal,
597 options.args.slice,
598 options.args.suffix().as_str(),
599 options.args.coalesce.coalesce(how),
600 Into::<&str>::into(options.args.maintain_order),
601 )
602 .into_py_any(py)?
603 },
604 }
605 .into_py_any(py),
606 IR::Gather {
607 input,
608 idxs,
609 null_on_oob,
610 } => Gather {
611 input: input.0,
612 idxs: idxs.0,
613 null_on_oob: *null_on_oob,
614 }
615 .into_py_any(py),
616 IR::HStack {
617 input,
618 exprs,
619 schema: _,
620 options,
621 } => HStack {
622 input: input.0,
623 exprs: exprs.iter().map(|e| e.into()).collect(),
624 should_broadcast: options.should_broadcast,
625 }
626 .into_py_any(py),
627 IR::Distinct { input, options } => Distinct {
628 input: input.0,
629 options: (
630 Into::<&str>::into(options.keep_strategy),
631 options.subset.as_ref().map_or_else(
632 || Ok(py.None()),
633 |f| {
634 f.iter()
635 .map(|s| s.as_ref())
636 .collect::<Vec<&str>>()
637 .into_py_any(py)
638 },
639 )?,
640 options.maintain_order,
641 options.slice,
642 )
643 .into_py_any(py)?,
644 }
645 .into_py_any(py),
646 IR::MapFunction { input, function } => MapFunction {
647 input: input.0,
648 function: match function {
649 FunctionIR::OpaquePython(_) => {
650 return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
651 },
652 FunctionIR::Opaque {
653 function: _,
654 schema: _,
655 predicate_pd: _,
656 projection_pd: _,
657 streamable: _,
658 fmt_str: _,
659 } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
660 FunctionIR::Unnest { columns, separator } => (
661 "unnest",
662 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
663 separator.as_ref().map(|s| s.to_string()),
664 )
665 .into_py_any(py)?,
666 FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
667 FunctionIR::Explode {
668 columns,
669 options,
670 schema: _,
671 } => (
672 "explode",
673 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
674 options.empty_as_null,
675 options.keep_nulls,
676 )
677 .into_py_any(py)?,
678 #[cfg(feature = "pivot")]
679 FunctionIR::Unpivot { args, schema: _ } => (
680 "unpivot",
681 args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
682 args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
683 args.variable_name.as_str().into_py_any(py)?,
684 args.value_name.as_str().into_py_any(py)?,
685 )
686 .into_py_any(py)?,
687 FunctionIR::RowIndex {
688 name,
689 schema: _,
690 offset,
691 } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
692 FunctionIR::FastCount {
693 sources,
694 scan_type,
695 alias,
696 ..
697 } => {
698 let sources = sources
699 .into_paths()
700 .ok_or_else(|| {
701 PyNotImplementedError::new_err("FastCount with BytesIO sources")
702 })?
703 .iter()
704 .map(|p| p.as_str())
705 .collect::<Vec<_>>()
706 .into_py_any(py)?;
707
708 let cloud_options = None;
710
711 let scan_type = scan_type_to_pyobject(py, scan_type, &cloud_options)?;
712
713 let alias = alias
714 .as_ref()
715 .map(|a| a.as_str())
716 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
717
718 ("fast_count", sources, scan_type, alias).into_py_any(py)?
719 },
720 FunctionIR::Hint(hint) => match hint {
721 HintIR::Sorted(sorted_vec) => {
722 let sorted_info: Vec<_> = sorted_vec
723 .iter()
724 .map(|s| (s.column.as_str(), s.descending, s.nulls_last))
725 .collect();
726 ("hint_sorted", sorted_info).into_py_any(py)?
727 },
728 },
729 },
730 }
731 .into_py_any(py),
732 IR::Union { inputs, options } => Union {
733 inputs: inputs.iter().map(|n| n.0).collect(),
734 slice: options.slice,
736 rows: options.rows,
737 maintain_order: options.maintain_order,
738 }
739 .into_py_any(py),
740 IR::HConcat {
741 inputs,
742 schema: _,
743 options,
744 } => HConcat {
745 inputs: inputs.iter().map(|n| n.0).collect(),
746 options: (
747 options.parallel,
748 options.strict,
749 options.broadcast_unit_length,
750 )
751 .into_py_any(py)?,
752 }
753 .into_py_any(py),
754 IR::ExtContext {
755 input,
756 contexts,
757 schema: _,
758 } => ExtContext {
759 input: input.0,
760 contexts: contexts.iter().map(|n| n.0).collect(),
761 }
762 .into_py_any(py),
763 IR::Sink { input, payload } => Sink {
764 input: input.0,
765 payload: PyString::new(
766 py,
767 &serde_json::to_string(payload)
768 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
769 )
770 .into(),
771 }
772 .into_py_any(py),
773 IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
774 "Not expecting to see a SinkMultiple node",
775 )),
776 #[cfg(feature = "merge_sorted")]
777 IR::MergeSorted {
778 input_left,
779 input_right,
780 key,
781 maintain_order,
782 } => MergeSorted {
783 input_left: input_left.0,
784 input_right: input_right.0,
785 key: key.to_string(),
786 maintain_order: *maintain_order,
787 }
788 .into_py_any(py),
789 IR::UnoptimizedDispatch { .. } => Err(PyNotImplementedError::new_err(
790 "Not expecting to see a UnoptimizedDispatch node",
791 )),
792 IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
793 }
794}