1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars::prelude::deletion::DeletionFilesList;
4use polars::prelude::python_dsl::PythonScanSource;
5use polars::prelude::{ColumnMapping, PredicateFileSkip};
6use polars_core::prelude::IdxSize;
7use polars_io::cloud::CloudOptions;
8use polars_ops::prelude::JoinType;
9use polars_plan::plans::{HintIR, IR};
10use polars_plan::prelude::{FileScanIR, FunctionIR, PythonPredicate, UnifiedScanArgs};
11use pyo3::IntoPyObjectExt;
12use pyo3::exceptions::{PyNotImplementedError, PyValueError};
13use pyo3::prelude::*;
14use pyo3::types::{PyDict, PyList, PyString};
15
16use super::expr_nodes::PyGroupbyOptions;
17use crate::PyDataFrame;
18use crate::lazyframe::visit::PyExprIR;
19
20fn scan_type_to_pyobject(
21 py: Python<'_>,
22 scan_type: &FileScanIR,
23 cloud_options: &Option<CloudOptions>,
24) -> PyResult<Py<PyAny>> {
25 match scan_type {
26 #[cfg(feature = "csv")]
27 FileScanIR::Csv { options } => {
28 let options = serde_json::to_string(options)
29 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30 let cloud_options = serde_json::to_string(cloud_options)
31 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
32 Ok(("csv", options, cloud_options).into_py_any(py)?)
33 },
34 #[cfg(feature = "parquet")]
35 FileScanIR::Parquet { options, .. } => {
36 let options = serde_json::to_string(options)
37 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38 let cloud_options = serde_json::to_string(cloud_options)
39 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
40 Ok(("parquet", options, cloud_options).into_py_any(py)?)
41 },
42 #[cfg(feature = "ipc")]
43 FileScanIR::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
44 #[cfg(feature = "json")]
45 FileScanIR::NDJson { options, .. } => {
46 let options = serde_json::to_string(options)
47 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
48 Ok(("ndjson", options).into_py_any(py)?)
49 },
50 #[cfg(feature = "scan_lines")]
51 FileScanIR::Lines { name } => Ok(("lines", name.as_str()).into_py_any(py)?),
52 FileScanIR::PythonDataset { .. } => {
53 Err(PyNotImplementedError::new_err("python dataset scan"))
54 },
55 FileScanIR::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
56 }
57}
58
59#[pyclass(frozen)]
60pub struct PythonScan {
62 #[pyo3(get)]
63 options: Py<PyAny>,
64}
65
66#[pyclass(frozen)]
67pub struct Slice {
69 #[pyo3(get)]
70 input: usize,
71 #[pyo3(get)]
72 offset: i64,
73 #[pyo3(get)]
74 len: IdxSize,
75}
76
77#[pyclass(frozen)]
78pub struct Filter {
80 #[pyo3(get)]
81 input: usize,
82 #[pyo3(get)]
83 predicate: PyExprIR,
84}
85
86#[pyclass(frozen)]
87#[derive(Clone)]
88pub struct PyFileOptions {
89 inner: UnifiedScanArgs,
90}
91
92#[pymethods]
93impl PyFileOptions {
94 #[getter]
95 fn n_rows(&self) -> Option<(i64, usize)> {
96 self.inner
97 .pre_slice
98 .clone()
99 .map(|slice| <(i64, usize)>::try_from(slice).unwrap())
100 }
101 #[getter]
102 fn with_columns(&self) -> Option<Vec<&str>> {
103 self.inner
104 .projection
105 .as_ref()?
106 .iter()
107 .map(|x| x.as_str())
108 .collect::<Vec<_>>()
109 .into()
110 }
111 #[getter]
112 fn cache(&self, _py: Python<'_>) -> bool {
113 self.inner.cache
114 }
115 #[getter]
116 fn row_index(&self) -> Option<(&str, IdxSize)> {
117 self.inner
118 .row_index
119 .as_ref()
120 .map(|n| (n.name.as_str(), n.offset))
121 }
122 #[getter]
123 fn rechunk(&self, _py: Python<'_>) -> bool {
124 self.inner.rechunk
125 }
126 #[getter]
127 fn hive_options(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {
128 Err(PyNotImplementedError::new_err("hive options"))
129 }
130 #[getter]
131 fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
132 self.inner.include_file_paths.as_deref()
133 }
134
135 #[getter]
139 fn deletion_files(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
140 Ok(match &self.inner.deletion_files {
141 None => py.None().into_any(),
142
143 Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
144 let out = PyDict::new(py);
145
146 for (k, v) in paths.iter() {
147 out.set_item(*k, v.as_ref())?;
148 }
149
150 ("iceberg-position-delete", out)
151 .into_pyobject(py)?
152 .into_any()
153 .unbind()
154 },
155 })
156 }
157
158 #[getter]
162 fn column_mapping(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
163 Ok(match &self.inner.column_mapping {
164 None => py.None().into_any(),
165
166 Some(ColumnMapping::Iceberg { .. }) => unimplemented!(),
167 })
168 }
169}
170
171#[pyclass(frozen)]
172pub struct Scan {
174 #[pyo3(get)]
175 paths: Py<PyAny>,
176 #[pyo3(get)]
177 file_info: Py<PyAny>,
178 #[pyo3(get)]
179 predicate: Option<PyExprIR>,
180 #[pyo3(get)]
181 file_options: PyFileOptions,
182 #[pyo3(get)]
183 scan_type: Py<PyAny>,
184}
185
186#[pyclass(frozen)]
187pub struct DataFrameScan {
189 #[pyo3(get)]
190 df: PyDataFrame,
191 #[pyo3(get)]
192 projection: Py<PyAny>,
193 #[pyo3(get)]
194 selection: Option<PyExprIR>,
195}
196
197#[pyclass(frozen)]
198pub struct SimpleProjection {
200 #[pyo3(get)]
201 input: usize,
202}
203
204#[pyclass(frozen)]
205pub struct Select {
207 #[pyo3(get)]
208 input: usize,
209 #[pyo3(get)]
210 expr: Vec<PyExprIR>,
211 #[pyo3(get)]
212 should_broadcast: bool,
213}
214
215#[pyclass(frozen)]
216pub struct Sort {
218 #[pyo3(get)]
219 input: usize,
220 #[pyo3(get)]
221 by_column: Vec<PyExprIR>,
222 #[pyo3(get)]
223 sort_options: (bool, Vec<bool>, Vec<bool>),
224 #[pyo3(get)]
225 slice: Option<(i64, usize)>,
226}
227
228#[pyclass(frozen)]
229pub struct Cache {
231 #[pyo3(get)]
232 input: usize,
233 #[pyo3(get)]
234 id_: u128,
235}
236
237#[pyclass(frozen)]
238pub struct GroupBy {
240 #[pyo3(get)]
241 input: usize,
242 #[pyo3(get)]
243 keys: Vec<PyExprIR>,
244 #[pyo3(get)]
245 aggs: Vec<PyExprIR>,
246 #[pyo3(get)]
247 apply: (),
248 #[pyo3(get)]
249 maintain_order: bool,
250 #[pyo3(get)]
251 options: Py<PyAny>,
252}
253
254#[pyclass(frozen)]
255pub struct Join {
257 #[pyo3(get)]
258 input_left: usize,
259 #[pyo3(get)]
260 input_right: usize,
261 #[pyo3(get)]
262 left_on: Vec<PyExprIR>,
263 #[pyo3(get)]
264 right_on: Vec<PyExprIR>,
265 #[pyo3(get)]
266 options: Py<PyAny>,
267}
268
269#[pyclass(frozen)]
270pub struct MergeSorted {
272 #[pyo3(get)]
273 input_left: usize,
274 #[pyo3(get)]
275 input_right: usize,
276 #[pyo3(get)]
277 key: String,
278}
279
280#[pyclass(frozen)]
281pub struct HStack {
283 #[pyo3(get)]
284 input: usize,
285 #[pyo3(get)]
286 exprs: Vec<PyExprIR>,
287 #[pyo3(get)]
288 should_broadcast: bool,
289}
290
291#[pyclass(frozen)]
292pub struct Reduce {
294 #[pyo3(get)]
295 input: usize,
296 #[pyo3(get)]
297 exprs: Vec<PyExprIR>,
298}
299
300#[pyclass(frozen)]
301pub struct Distinct {
303 #[pyo3(get)]
304 input: usize,
305 #[pyo3(get)]
306 options: Py<PyAny>,
307}
308#[pyclass(frozen)]
309pub struct MapFunction {
311 #[pyo3(get)]
312 input: usize,
313 #[pyo3(get)]
314 function: Py<PyAny>,
315}
316#[pyclass(frozen)]
317pub struct Union {
318 #[pyo3(get)]
319 inputs: Vec<usize>,
320 #[pyo3(get)]
321 options: Option<(i64, usize)>,
322}
323#[pyclass(frozen)]
324pub struct HConcat {
326 #[pyo3(get)]
327 inputs: Vec<usize>,
328 #[pyo3(get)]
329 options: (),
330}
331#[pyclass(frozen)]
332pub struct ExtContext {
334 #[pyo3(get)]
335 input: usize,
336 #[pyo3(get)]
337 contexts: Vec<usize>,
338}
339
340#[pyclass(frozen)]
341pub struct Sink {
342 #[pyo3(get)]
343 input: usize,
344 #[pyo3(get)]
345 payload: Py<PyAny>,
346}
347
348pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<Py<PyAny>> {
349 match plan {
350 IR::PythonScan { options } => {
351 let python_src = match options.python_source {
352 PythonScanSource::Pyarrow => "pyarrow",
353 PythonScanSource::Cuda => "cuda",
354 PythonScanSource::IOPlugin => "io_plugin",
355 };
356
357 PythonScan {
358 options: (
359 options
360 .scan_fn
361 .as_ref()
362 .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
363 options.with_columns.as_ref().map_or_else(
364 || Ok(py.None()),
365 |cols| {
366 cols.iter()
367 .map(|x| x.as_str())
368 .collect::<Vec<_>>()
369 .into_py_any(py)
370 },
371 )?,
372 python_src,
373 match &options.predicate {
374 PythonPredicate::None => py.None(),
375 PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
376 PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
377 },
378 options
379 .n_rows
380 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
381 )
382 .into_py_any(py)?,
383 }
384 .into_py_any(py)
385 },
386 IR::Slice { input, offset, len } => Slice {
387 input: input.0,
388 offset: *offset,
389 len: *len,
390 }
391 .into_py_any(py),
392 IR::Filter { input, predicate } => Filter {
393 input: input.0,
394 predicate: predicate.into(),
395 }
396 .into_py_any(py),
397 IR::Scan {
398 hive_parts: Some(_),
399 ..
400 } => Err(PyNotImplementedError::new_err(
401 "scan with hive partitioning",
402 )),
403 IR::Scan {
404 sources,
405 file_info: _,
406 hive_parts: _,
407 predicate,
408 predicate_file_skip_applied,
409 output_schema: _,
410 scan_type,
411 unified_scan_args,
412 } => {
413 Scan {
414 paths: {
415 let paths = sources
416 .into_paths()
417 .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
418
419 let out = PyList::new(py, [] as [(); 0])?;
420
421 for path in paths.iter() {
424 out.append(path.as_str())?;
425 }
426
427 out.into_py_any(py)?
428 },
429 file_info: py.None(),
431 predicate: predicate
432 .as_ref()
433 .filter(|_| {
434 !matches!(
435 predicate_file_skip_applied,
436 Some(PredicateFileSkip {
437 no_residual_predicate: true,
438 original_len: _,
439 })
440 )
441 })
442 .map(|e| e.into()),
443 file_options: PyFileOptions {
444 inner: (**unified_scan_args).clone(),
445 },
446 scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
447 }
448 }
449 .into_py_any(py),
450 IR::DataFrameScan {
451 df,
452 schema: _,
453 output_schema,
454 } => DataFrameScan {
455 df: PyDataFrame::new((**df).clone()),
456 projection: output_schema.as_ref().map_or_else(
457 || Ok(py.None()),
458 |s| {
459 s.iter_names()
460 .map(|s| s.as_str())
461 .collect::<Vec<_>>()
462 .into_py_any(py)
463 },
464 )?,
465 selection: None,
466 }
467 .into_py_any(py),
468 IR::SimpleProjection { input, columns: _ } => {
469 SimpleProjection { input: input.0 }.into_py_any(py)
470 },
471 IR::Select {
472 input,
473 expr,
474 schema: _,
475 options,
476 } => Select {
477 expr: expr.iter().map(|e| e.into()).collect(),
478 input: input.0,
479 should_broadcast: options.should_broadcast,
480 }
481 .into_py_any(py),
482 IR::Sort {
483 input,
484 by_column,
485 slice,
486 sort_options,
487 } => Sort {
488 input: input.0,
489 by_column: by_column.iter().map(|e| e.into()).collect(),
490 sort_options: (
491 sort_options.maintain_order,
492 sort_options.nulls_last.clone(),
493 sort_options.descending.clone(),
494 ),
495 slice: *slice,
496 }
497 .into_py_any(py),
498 IR::Cache { input, id } => Cache {
499 input: input.0,
500 id_: id.as_u128(),
501 }
502 .into_py_any(py),
503 IR::GroupBy {
504 input,
505 keys,
506 aggs,
507 schema: _,
508 apply,
509 maintain_order,
510 options,
511 } => GroupBy {
512 input: input.0,
513 keys: keys.iter().map(|e| e.into()).collect(),
514 aggs: aggs.iter().map(|e| e.into()).collect(),
515 apply: apply.as_ref().map_or(Ok(()), |_| {
516 Err(PyNotImplementedError::new_err(format!(
517 "apply inside GroupBy {plan:?}"
518 )))
519 })?,
520 maintain_order: *maintain_order,
521 options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
522 }
523 .into_py_any(py),
524 IR::Join {
525 input_left,
526 input_right,
527 schema: _,
528 left_on,
529 right_on,
530 options,
531 } => Join {
532 input_left: input_left.0,
533 input_right: input_right.0,
534 left_on: left_on.iter().map(|e| e.into()).collect(),
535 right_on: right_on.iter().map(|e| e.into()).collect(),
536 options: {
537 let how = &options.args.how;
538 let name = Into::<&str>::into(how).into_pyobject(py)?;
539 (
540 match how {
541 #[cfg(feature = "asof_join")]
542 JoinType::AsOf(_) => {
543 return Err(PyNotImplementedError::new_err("asof join"));
544 },
545 #[cfg(feature = "iejoin")]
546 JoinType::IEJoin => {
547 let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
548 else {
549 unreachable!()
550 };
551 (
552 name,
553 crate::Wrap(ie_options.operator1).into_py_any(py)?,
554 ie_options.operator2.as_ref().map_or_else(
555 || Ok(py.None()),
556 |op| crate::Wrap(*op).into_py_any(py),
557 )?,
558 )
559 .into_py_any(py)?
560 },
561 JoinType::Cross if options.options.is_some() => {
564 return Err(PyNotImplementedError::new_err("nested loop join"));
565 },
566 _ => name.into_any().unbind(),
567 },
568 options.args.nulls_equal,
569 options.args.slice,
570 options.args.suffix().as_str(),
571 options.args.coalesce.coalesce(how),
572 Into::<&str>::into(options.args.maintain_order),
573 )
574 .into_py_any(py)?
575 },
576 }
577 .into_py_any(py),
578 IR::HStack {
579 input,
580 exprs,
581 schema: _,
582 options,
583 } => HStack {
584 input: input.0,
585 exprs: exprs.iter().map(|e| e.into()).collect(),
586 should_broadcast: options.should_broadcast,
587 }
588 .into_py_any(py),
589 IR::Distinct { input, options } => Distinct {
590 input: input.0,
591 options: (
592 Into::<&str>::into(options.keep_strategy),
593 options.subset.as_ref().map_or_else(
594 || Ok(py.None()),
595 |f| {
596 f.iter()
597 .map(|s| s.as_ref())
598 .collect::<Vec<&str>>()
599 .into_py_any(py)
600 },
601 )?,
602 options.maintain_order,
603 options.slice,
604 )
605 .into_py_any(py)?,
606 }
607 .into_py_any(py),
608 IR::MapFunction { input, function } => MapFunction {
609 input: input.0,
610 function: match function {
611 FunctionIR::OpaquePython(_) => {
612 return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
613 },
614 FunctionIR::Opaque {
615 function: _,
616 schema: _,
617 predicate_pd: _,
618 projection_pd: _,
619 streamable: _,
620 fmt_str: _,
621 } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
622 FunctionIR::Unnest { columns, separator } => (
623 "unnest",
624 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
625 separator.as_ref().map(|s| s.to_string()),
626 )
627 .into_py_any(py)?,
628 FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
629 FunctionIR::Explode {
630 columns,
631 options,
632 schema: _,
633 } => (
634 "explode",
635 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
636 options.empty_as_null,
637 options.keep_nulls,
638 )
639 .into_py_any(py)?,
640 #[cfg(feature = "pivot")]
641 FunctionIR::Unpivot { args, schema: _ } => (
642 "unpivot",
643 args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
644 args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
645 args.variable_name.as_str().into_py_any(py)?,
646 args.value_name.as_str().into_py_any(py)?,
647 )
648 .into_py_any(py)?,
649 FunctionIR::RowIndex {
650 name,
651 schema: _,
652 offset,
653 } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
654 FunctionIR::FastCount {
655 sources,
656 scan_type,
657 alias,
658 } => {
659 let sources = sources
660 .into_paths()
661 .ok_or_else(|| {
662 PyNotImplementedError::new_err("FastCount with BytesIO sources")
663 })?
664 .iter()
665 .map(|p| p.as_str())
666 .collect::<Vec<_>>()
667 .into_py_any(py)?;
668
669 let cloud_options = None;
671
672 let scan_type = scan_type_to_pyobject(py, scan_type, &cloud_options)?;
673
674 let alias = alias
675 .as_ref()
676 .map(|a| a.as_str())
677 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
678
679 ("fast_count", sources, scan_type, alias).into_py_any(py)?
680 },
681 FunctionIR::Hint(hint) => match hint {
682 HintIR::Sorted(sorted_vec) => {
683 let sorted_info: Vec<_> = sorted_vec
684 .iter()
685 .map(|s| (s.column.as_str(), s.descending, s.nulls_last))
686 .collect();
687 ("hint_sorted", sorted_info).into_py_any(py)?
688 },
689 },
690 },
691 }
692 .into_py_any(py),
693 IR::Union { inputs, options } => Union {
694 inputs: inputs.iter().map(|n| n.0).collect(),
695 options: options.slice,
697 }
698 .into_py_any(py),
699 IR::HConcat {
700 inputs,
701 schema: _,
702 options: _,
703 } => HConcat {
704 inputs: inputs.iter().map(|n| n.0).collect(),
705 options: (),
706 }
707 .into_py_any(py),
708 IR::ExtContext {
709 input,
710 contexts,
711 schema: _,
712 } => ExtContext {
713 input: input.0,
714 contexts: contexts.iter().map(|n| n.0).collect(),
715 }
716 .into_py_any(py),
717 IR::Sink { input, payload } => Sink {
718 input: input.0,
719 payload: PyString::new(
720 py,
721 &serde_json::to_string(payload)
722 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
723 )
724 .into(),
725 }
726 .into_py_any(py),
727 IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
728 "Not expecting to see a SinkMultiple node",
729 )),
730 #[cfg(feature = "merge_sorted")]
731 IR::MergeSorted {
732 input_left,
733 input_right,
734 key,
735 } => MergeSorted {
736 input_left: input_left.0,
737 input_right: input_right.0,
738 key: key.to_string(),
739 }
740 .into_py_any(py),
741 IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
742 }
743}