1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars::prelude::deletion::DeletionFilesList;
4use polars::prelude::python_dsl::PythonScanSource;
5use polars::prelude::{ColumnMapping, PredicateFileSkip};
6use polars_core::prelude::IdxSize;
7use polars_io::cloud::CloudOptions;
8use polars_ops::prelude::JoinType;
9use polars_plan::plans::{HintIR, IR};
10use polars_plan::prelude::{FileScanIR, FunctionIR, PythonPredicate, UnifiedScanArgs};
11use pyo3::IntoPyObjectExt;
12use pyo3::exceptions::{PyNotImplementedError, PyValueError};
13use pyo3::prelude::*;
14use pyo3::types::{PyDict, PyList, PyString};
15
16use super::expr_nodes::PyGroupbyOptions;
17use crate::PyDataFrame;
18use crate::lazyframe::visit::PyExprIR;
19
20fn scan_type_to_pyobject(
21 py: Python<'_>,
22 scan_type: &FileScanIR,
23 cloud_options: &Option<CloudOptions>,
24) -> PyResult<Py<PyAny>> {
25 match scan_type {
26 #[cfg(feature = "csv")]
27 FileScanIR::Csv { options } => {
28 let options = serde_json::to_string(options)
29 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30 let cloud_options = serde_json::to_string(cloud_options)
31 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
32 Ok(("csv", options, cloud_options).into_py_any(py)?)
33 },
34 #[cfg(feature = "parquet")]
35 FileScanIR::Parquet { options, .. } => {
36 let options = serde_json::to_string(options)
37 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38 let cloud_options = serde_json::to_string(cloud_options)
39 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
40 Ok(("parquet", options, cloud_options).into_py_any(py)?)
41 },
42 #[cfg(feature = "ipc")]
43 FileScanIR::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
44 #[cfg(feature = "json")]
45 FileScanIR::NDJson { options, .. } => {
46 let options = serde_json::to_string(options)
47 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
48 Ok(("ndjson", options).into_py_any(py)?)
49 },
50 #[cfg(feature = "scan_lines")]
51 FileScanIR::Lines { name } => Ok(("lines", name.as_str()).into_py_any(py)?),
52 FileScanIR::ExpandedPaths { name } => {
53 Ok(("expanded-paths", name.as_str()).into_py_any(py)?)
54 },
55 FileScanIR::PythonDataset { .. } => {
56 Err(PyNotImplementedError::new_err("python dataset scan"))
57 },
58 FileScanIR::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
59 }
60}
61
62#[pyclass(frozen)]
63pub struct PythonScan {
65 #[pyo3(get)]
66 options: Py<PyAny>,
67}
68
69#[pyclass(frozen)]
70pub struct Slice {
72 #[pyo3(get)]
73 input: usize,
74 #[pyo3(get)]
75 offset: i64,
76 #[pyo3(get)]
77 len: IdxSize,
78}
79
80#[pyclass(frozen)]
81pub struct Filter {
83 #[pyo3(get)]
84 input: usize,
85 #[pyo3(get)]
86 predicate: PyExprIR,
87}
88
89#[pyclass(frozen, skip_from_py_object)]
90#[derive(Clone)]
91pub struct PyFileOptions {
92 inner: UnifiedScanArgs,
93}
94
95#[pymethods]
96impl PyFileOptions {
97 #[getter]
98 fn n_rows(&self) -> Option<(i64, IdxSize)> {
99 self.inner
100 .pre_slice
101 .clone()
102 .map(|slice| slice.to_signed_offset_len())
103 }
104 #[getter]
105 fn with_columns(&self) -> Option<Vec<&str>> {
106 self.inner
107 .projection
108 .as_ref()?
109 .iter()
110 .map(|x| x.as_str())
111 .collect::<Vec<_>>()
112 .into()
113 }
114 #[getter]
115 fn cache(&self, _py: Python<'_>) -> bool {
116 self.inner.cache
117 }
118 #[getter]
119 fn row_index(&self) -> Option<(&str, IdxSize)> {
120 self.inner
121 .row_index
122 .as_ref()
123 .map(|n| (n.name.as_str(), n.offset))
124 }
125 #[getter]
126 fn rechunk(&self, _py: Python<'_>) -> bool {
127 self.inner.rechunk
128 }
129 #[getter]
130 fn hive_options(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {
131 Err(PyNotImplementedError::new_err("hive options"))
132 }
133 #[getter]
134 fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
135 self.inner.include_file_paths.as_deref()
136 }
137
138 #[getter]
142 fn deletion_files(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
143 Ok(match &self.inner.deletion_files {
144 None => py.None().into_any(),
145 Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
146 let out = PyDict::new(py);
147 for (k, v) in paths.iter() {
148 out.set_item(*k, v.as_ref())?;
149 }
150 ("iceberg-position-delete", out)
151 .into_pyobject(py)?
152 .into_any()
153 .unbind()
154 },
155 Some(DeletionFilesList::Delta(provider)) => {
156 ("delta-deletion-vector", provider.callback().0.clone_ref(py))
157 .into_pyobject(py)?
158 .into_any()
159 .unbind()
160 },
161 })
162 }
163
164 #[getter]
168 fn column_mapping(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
169 Ok(match &self.inner.column_mapping {
170 None => py.None().into_any(),
171
172 Some(ColumnMapping::Iceberg { .. }) => unimplemented!(),
173 })
174 }
175}
176
177#[pyclass(frozen)]
178pub struct Scan {
180 #[pyo3(get)]
181 paths: Py<PyAny>,
182 #[pyo3(get)]
183 file_info: Py<PyAny>,
184 #[pyo3(get)]
185 hive_parts: Option<PyDataFrame>,
186 #[pyo3(get)]
187 predicate: Option<PyExprIR>,
188 #[pyo3(get)]
189 file_options: PyFileOptions,
190 #[pyo3(get)]
191 scan_type: Py<PyAny>,
192}
193
194#[pyclass(frozen)]
195pub struct DataFrameScan {
197 #[pyo3(get)]
198 df: PyDataFrame,
199 #[pyo3(get)]
200 projection: Py<PyAny>,
201 #[pyo3(get)]
202 selection: Option<PyExprIR>,
203}
204
205#[pyclass(frozen)]
206pub struct SimpleProjection {
208 #[pyo3(get)]
209 input: usize,
210}
211
212#[pyclass(frozen)]
213pub struct Select {
215 #[pyo3(get)]
216 input: usize,
217 #[pyo3(get)]
218 expr: Vec<PyExprIR>,
219 #[pyo3(get)]
220 should_broadcast: bool,
221}
222
223#[pyclass(frozen)]
224pub struct Sort {
226 #[pyo3(get)]
227 input: usize,
228 #[pyo3(get)]
229 by_column: Vec<PyExprIR>,
230 #[pyo3(get)]
231 sort_options: (bool, Vec<bool>, Vec<bool>),
232 #[pyo3(get)]
233 slice: Option<(i64, usize, Option<u128>)>,
234}
235
236#[pyclass(frozen)]
237pub struct Cache {
239 #[pyo3(get)]
240 input: usize,
241 #[pyo3(get)]
242 id_: u128,
243}
244
245#[pyclass(frozen)]
246pub struct GroupBy {
248 #[pyo3(get)]
249 input: usize,
250 #[pyo3(get)]
251 keys: Vec<PyExprIR>,
252 #[pyo3(get)]
253 aggs: Vec<PyExprIR>,
254 #[pyo3(get)]
255 apply: (),
256 #[pyo3(get)]
257 maintain_order: bool,
258 #[pyo3(get)]
259 options: Py<PyAny>,
260}
261
262#[pyclass(frozen)]
263pub struct Join {
265 #[pyo3(get)]
266 input_left: usize,
267 #[pyo3(get)]
268 input_right: usize,
269 #[pyo3(get)]
270 left_on: Vec<PyExprIR>,
271 #[pyo3(get)]
272 right_on: Vec<PyExprIR>,
273 #[pyo3(get)]
274 options: Py<PyAny>,
275}
276
277#[pyclass(frozen)]
278pub struct Gather {
280 #[pyo3(get)]
281 input: usize,
282 #[pyo3(get)]
283 idxs: usize,
284 #[pyo3(get)]
285 null_on_oob: bool,
286}
287
288#[pyclass(frozen)]
289pub struct MergeSorted {
291 #[pyo3(get)]
292 input_left: usize,
293 #[pyo3(get)]
294 input_right: usize,
295 #[pyo3(get)]
296 key: String,
297 #[pyo3(get)]
298 maintain_order: bool,
299}
300
301#[pyclass(frozen)]
302pub struct HStack {
304 #[pyo3(get)]
305 input: usize,
306 #[pyo3(get)]
307 exprs: Vec<PyExprIR>,
308 #[pyo3(get)]
309 should_broadcast: bool,
310}
311
312#[pyclass(frozen)]
313pub struct Reduce {
315 #[pyo3(get)]
316 input: usize,
317 #[pyo3(get)]
318 exprs: Vec<PyExprIR>,
319}
320
321#[pyclass(frozen)]
322pub struct Distinct {
324 #[pyo3(get)]
325 input: usize,
326 #[pyo3(get)]
327 options: Py<PyAny>,
328}
329#[pyclass(frozen)]
330pub struct MapFunction {
332 #[pyo3(get)]
333 input: usize,
334 #[pyo3(get)]
335 function: Py<PyAny>,
336}
337#[pyclass(frozen)]
338pub struct Union {
339 #[pyo3(get)]
340 inputs: Vec<usize>,
341 #[pyo3(get)]
342 slice: Option<(i64, usize)>,
343 #[pyo3(get)]
344 rows: (Option<usize>, usize),
345 #[pyo3(get)]
346 maintain_order: bool,
347}
348#[pyclass(frozen)]
349pub struct HConcat {
351 #[pyo3(get)]
352 inputs: Vec<usize>,
353 #[pyo3(get)]
354 options: Py<PyAny>,
355}
356#[pyclass(frozen)]
357pub struct ExtContext {
359 #[pyo3(get)]
360 input: usize,
361 #[pyo3(get)]
362 contexts: Vec<usize>,
363}
364
365#[pyclass(frozen)]
366pub struct Sink {
367 #[pyo3(get)]
368 input: usize,
369 #[pyo3(get)]
370 payload: Py<PyAny>,
371}
372
373pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<Py<PyAny>> {
374 match plan {
375 IR::PythonScan { options } => {
376 let python_src = match options.python_source {
377 PythonScanSource::Pyarrow => "pyarrow",
378 PythonScanSource::Cuda => "cuda",
379 PythonScanSource::IOPlugin => "io_plugin",
380 };
381
382 PythonScan {
383 options: (
384 options
385 .scan_fn
386 .as_ref()
387 .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
388 options.with_columns.as_ref().map_or_else(
389 || Ok(py.None()),
390 |cols| {
391 cols.iter()
392 .map(|x| x.as_str())
393 .collect::<Vec<_>>()
394 .into_py_any(py)
395 },
396 )?,
397 python_src,
398 match &options.predicate {
399 PythonPredicate::None => py.None(),
400 PythonPredicate::PyArrow {
401 predicate,
402 has_residual,
403 } => {
404 ("pyarrow", predicate, "has_residual", has_residual).into_py_any(py)?
405 },
406 PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
407 },
408 options
409 .n_rows
410 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
411 )
412 .into_py_any(py)?,
413 }
414 .into_py_any(py)
415 },
416 IR::Slice { input, offset, len } => Slice {
417 input: input.0,
418 offset: *offset,
419 len: *len,
420 }
421 .into_py_any(py),
422 IR::Filter { input, predicate } => Filter {
423 input: input.0,
424 predicate: predicate.into(),
425 }
426 .into_py_any(py),
427 IR::Scan {
428 sources,
429 file_info: _,
430 hive_parts,
431 predicate,
432 predicate_file_skip_applied,
433 output_schema: _,
434 scan_type,
435 unified_scan_args,
436 } => {
437 Scan {
438 paths: {
439 let paths = sources
440 .into_paths()
441 .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
442
443 let out = PyList::new(py, [] as [(); 0])?;
444
445 for path in paths.iter() {
448 out.append(path.as_str())?;
449 }
450
451 out.into_py_any(py)?
452 },
453 file_info: py.None(),
455 hive_parts: hive_parts
456 .as_ref()
457 .map(|h| PyDataFrame::new(h.df().clone())),
458 predicate: predicate
459 .as_ref()
460 .filter(|_| {
461 !matches!(
462 predicate_file_skip_applied,
463 Some(PredicateFileSkip {
464 no_residual_predicate: true,
465 original_len: _,
466 })
467 )
468 })
469 .map(|e| e.into()),
470 file_options: PyFileOptions {
471 inner: (**unified_scan_args).clone(),
472 },
473 scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
474 }
475 }
476 .into_py_any(py),
477 IR::DataFrameScan {
478 df,
479 schema: _,
480 output_schema,
481 } => DataFrameScan {
482 df: PyDataFrame::new((**df).clone()),
483 projection: output_schema.as_ref().map_or_else(
484 || Ok(py.None()),
485 |s| {
486 s.iter_names()
487 .map(|s| s.as_str())
488 .collect::<Vec<_>>()
489 .into_py_any(py)
490 },
491 )?,
492 selection: None,
493 }
494 .into_py_any(py),
495 IR::SimpleProjection { input, columns: _ } => {
496 SimpleProjection { input: input.0 }.into_py_any(py)
497 },
498 IR::Select {
499 input,
500 expr,
501 schema: _,
502 options,
503 } => Select {
504 expr: expr.iter().map(|e| e.into()).collect(),
505 input: input.0,
506 should_broadcast: options.should_broadcast,
507 }
508 .into_py_any(py),
509 IR::Sort {
510 input,
511 by_column,
512 slice,
513 sort_options,
514 } => Sort {
515 input: input.0,
516 by_column: by_column.iter().map(|e| e.into()).collect(),
517 sort_options: (
518 sort_options.maintain_order,
519 sort_options.nulls_last.clone(),
520 sort_options.descending.clone(),
521 ),
522 slice: slice
523 .as_ref()
524 .map(|t| (t.0, t.1, t.2.as_ref().map(|p| p.id().as_u128()))),
525 }
526 .into_py_any(py),
527 IR::Cache { input, id } => Cache {
528 input: input.0,
529 id_: id.as_u128(),
530 }
531 .into_py_any(py),
532 IR::GroupBy {
533 input,
534 keys,
535 aggs,
536 schema: _,
537 apply,
538 maintain_order,
539 options,
540 } => GroupBy {
541 input: input.0,
542 keys: keys.iter().map(|e| e.into()).collect(),
543 aggs: aggs.iter().map(|e| e.into()).collect(),
544 apply: apply.as_ref().map_or(Ok(()), |_| {
545 Err(PyNotImplementedError::new_err(format!(
546 "apply inside GroupBy {plan:?}"
547 )))
548 })?,
549 maintain_order: *maintain_order,
550 options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
551 }
552 .into_py_any(py),
553 IR::Join {
554 input_left,
555 input_right,
556 schema: _,
557 left_on,
558 right_on,
559 options,
560 } => Join {
561 input_left: input_left.0,
562 input_right: input_right.0,
563 left_on: left_on.iter().map(|e| e.into()).collect(),
564 right_on: right_on.iter().map(|e| e.into()).collect(),
565 options: {
566 let how = &options.args.how;
567 let name = Into::<&str>::into(how).into_pyobject(py)?;
568 (
569 match how {
570 #[cfg(feature = "asof_join")]
571 JoinType::AsOf(_) => {
572 return Err(PyNotImplementedError::new_err("asof join"));
573 },
574 #[cfg(feature = "iejoin")]
575 JoinType::IEJoin => {
576 let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
577 else {
578 unreachable!()
579 };
580 (
581 name,
582 crate::Wrap(ie_options.operator1).into_py_any(py)?,
583 ie_options.operator2.as_ref().map_or_else(
584 || Ok(py.None()),
585 |op| crate::Wrap(*op).into_py_any(py),
586 )?,
587 )
588 .into_py_any(py)?
589 },
590 JoinType::Cross if options.options.is_some() => {
593 return Err(PyNotImplementedError::new_err("nested loop join"));
594 },
595 _ => name.into_any().unbind(),
596 },
597 options.args.nulls_equal,
598 options.args.slice,
599 options.args.suffix().as_str(),
600 options.args.coalesce.coalesce(how),
601 Into::<&str>::into(options.args.maintain_order),
602 )
603 .into_py_any(py)?
604 },
605 }
606 .into_py_any(py),
607 IR::Gather {
608 input,
609 idxs,
610 null_on_oob,
611 } => Gather {
612 input: input.0,
613 idxs: idxs.0,
614 null_on_oob: *null_on_oob,
615 }
616 .into_py_any(py),
617 IR::HStack {
618 input,
619 exprs,
620 schema: _,
621 options,
622 } => HStack {
623 input: input.0,
624 exprs: exprs.iter().map(|e| e.into()).collect(),
625 should_broadcast: options.should_broadcast,
626 }
627 .into_py_any(py),
628 IR::Distinct { input, options } => Distinct {
629 input: input.0,
630 options: (
631 Into::<&str>::into(options.keep_strategy),
632 options.subset.as_ref().map_or_else(
633 || Ok(py.None()),
634 |f| {
635 f.iter()
636 .map(|s| s.as_ref())
637 .collect::<Vec<&str>>()
638 .into_py_any(py)
639 },
640 )?,
641 options.maintain_order,
642 options.slice,
643 )
644 .into_py_any(py)?,
645 }
646 .into_py_any(py),
647 IR::MapFunction { input, function } => MapFunction {
648 input: input.0,
649 function: match function {
650 FunctionIR::OpaquePython(_) => {
651 return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
652 },
653 FunctionIR::Opaque {
654 function: _,
655 schema: _,
656 predicate_pd: _,
657 projection_pd: _,
658 streamable: _,
659 fmt_str: _,
660 } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
661 FunctionIR::Unnest { columns, separator } => (
662 "unnest",
663 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
664 separator.as_ref().map(|s| s.to_string()),
665 )
666 .into_py_any(py)?,
667 FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
668 FunctionIR::Explode {
669 columns,
670 options,
671 schema: _,
672 } => (
673 "explode",
674 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
675 options.empty_as_null,
676 options.keep_nulls,
677 )
678 .into_py_any(py)?,
679 #[cfg(feature = "pivot")]
680 FunctionIR::Unpivot { args, schema: _ } => (
681 "unpivot",
682 args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
683 args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
684 args.variable_name.as_str().into_py_any(py)?,
685 args.value_name.as_str().into_py_any(py)?,
686 )
687 .into_py_any(py)?,
688 FunctionIR::RowIndex {
689 name,
690 schema: _,
691 offset,
692 } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
693 FunctionIR::FastCount {
694 sources,
695 scan_type,
696 alias,
697 ..
698 } => {
699 let sources = sources
700 .into_paths()
701 .ok_or_else(|| {
702 PyNotImplementedError::new_err("FastCount with BytesIO sources")
703 })?
704 .iter()
705 .map(|p| p.as_str())
706 .collect::<Vec<_>>()
707 .into_py_any(py)?;
708
709 let cloud_options = None;
711
712 let scan_type = scan_type_to_pyobject(py, scan_type, &cloud_options)?;
713
714 let alias = alias
715 .as_ref()
716 .map(|a| a.as_str())
717 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
718
719 ("fast_count", sources, scan_type, alias).into_py_any(py)?
720 },
721 FunctionIR::Hint(hint) => match hint {
722 HintIR::Sorted(sorted_vec) => {
723 let sorted_info: Vec<_> = sorted_vec
724 .iter()
725 .map(|s| (s.column.as_str(), s.descending, s.nulls_last))
726 .collect();
727 ("hint_sorted", sorted_info).into_py_any(py)?
728 },
729 },
730 },
731 }
732 .into_py_any(py),
733 IR::Union { inputs, options } => Union {
734 inputs: inputs.iter().map(|n| n.0).collect(),
735 slice: options.slice,
737 rows: options.rows,
738 maintain_order: options.maintain_order,
739 }
740 .into_py_any(py),
741 IR::HConcat {
742 inputs,
743 schema: _,
744 options,
745 } => HConcat {
746 inputs: inputs.iter().map(|n| n.0).collect(),
747 options: (
748 options.parallel,
749 options.strict,
750 options.broadcast_unit_length,
751 )
752 .into_py_any(py)?,
753 }
754 .into_py_any(py),
755 IR::ExtContext {
756 input,
757 contexts,
758 schema: _,
759 } => ExtContext {
760 input: input.0,
761 contexts: contexts.iter().map(|n| n.0).collect(),
762 }
763 .into_py_any(py),
764 IR::Sink { input, payload } => Sink {
765 input: input.0,
766 payload: PyString::new(
767 py,
768 &serde_json::to_string(payload)
769 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
770 )
771 .into(),
772 }
773 .into_py_any(py),
774 IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
775 "Not expecting to see a SinkMultiple node",
776 )),
777 #[cfg(feature = "merge_sorted")]
778 IR::MergeSorted {
779 input_left,
780 input_right,
781 key,
782 maintain_order,
783 } => MergeSorted {
784 input_left: input_left.0,
785 input_right: input_right.0,
786 key: key.to_string(),
787 maintain_order: *maintain_order,
788 }
789 .into_py_any(py),
790 IR::UnoptimizedDispatch { .. } => Err(PyNotImplementedError::new_err(
791 "Not expecting to see a UnoptimizedDispatch node",
792 )),
793 IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
794 }
795}