1use polars::prelude::ColumnMapping;
2#[cfg(feature = "iejoin")]
3use polars::prelude::JoinTypeOptionsIR;
4use polars::prelude::deletion::DeletionFilesList;
5use polars::prelude::python_dsl::PythonScanSource;
6use polars_core::prelude::IdxSize;
7use polars_io::cloud::CloudOptions;
8use polars_ops::prelude::JoinType;
9use polars_plan::plans::IR;
10use polars_plan::prelude::{FileScanIR, FunctionIR, PythonPredicate, UnifiedScanArgs};
11use pyo3::IntoPyObjectExt;
12use pyo3::exceptions::{PyNotImplementedError, PyValueError};
13use pyo3::prelude::*;
14use pyo3::types::{PyDict, PyList, PyString};
15
16use super::expr_nodes::PyGroupbyOptions;
17use crate::PyDataFrame;
18use crate::lazyframe::visit::PyExprIR;
19
20fn scan_type_to_pyobject(
21 py: Python<'_>,
22 scan_type: &FileScanIR,
23 cloud_options: &Option<CloudOptions>,
24) -> PyResult<PyObject> {
25 match scan_type {
26 #[cfg(feature = "csv")]
27 FileScanIR::Csv { options } => {
28 let options = serde_json::to_string(options)
29 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30 let cloud_options = serde_json::to_string(cloud_options)
31 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
32 Ok(("csv", options, cloud_options).into_py_any(py)?)
33 },
34 #[cfg(feature = "parquet")]
35 FileScanIR::Parquet { options, .. } => {
36 let options = serde_json::to_string(options)
37 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38 let cloud_options = serde_json::to_string(cloud_options)
39 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
40 Ok(("parquet", options, cloud_options).into_py_any(py)?)
41 },
42 #[cfg(feature = "ipc")]
43 FileScanIR::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
44 #[cfg(feature = "json")]
45 FileScanIR::NDJson { options, .. } => {
46 let options = serde_json::to_string(options)
47 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
48 Ok(("ndjson", options).into_py_any(py)?)
49 },
50 FileScanIR::PythonDataset { .. } => {
51 Err(PyNotImplementedError::new_err("python dataset scan"))
52 },
53 FileScanIR::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
54 }
55}
56
57#[pyclass(frozen)]
58pub struct PythonScan {
60 #[pyo3(get)]
61 options: PyObject,
62}
63
64#[pyclass(frozen)]
65pub struct Slice {
67 #[pyo3(get)]
68 input: usize,
69 #[pyo3(get)]
70 offset: i64,
71 #[pyo3(get)]
72 len: IdxSize,
73}
74
75#[pyclass(frozen)]
76pub struct Filter {
78 #[pyo3(get)]
79 input: usize,
80 #[pyo3(get)]
81 predicate: PyExprIR,
82}
83
84#[pyclass(frozen)]
85#[derive(Clone)]
86pub struct PyFileOptions {
87 inner: UnifiedScanArgs,
88}
89
90#[pymethods]
91impl PyFileOptions {
92 #[getter]
93 fn n_rows(&self) -> Option<(i64, usize)> {
94 self.inner
95 .pre_slice
96 .clone()
97 .map(|slice| <(i64, usize)>::try_from(slice).unwrap())
98 }
99 #[getter]
100 fn with_columns(&self) -> Option<Vec<&str>> {
101 self.inner
102 .projection
103 .as_ref()?
104 .iter()
105 .map(|x| x.as_str())
106 .collect::<Vec<_>>()
107 .into()
108 }
109 #[getter]
110 fn cache(&self, _py: Python<'_>) -> bool {
111 self.inner.cache
112 }
113 #[getter]
114 fn row_index(&self) -> Option<(&str, IdxSize)> {
115 self.inner
116 .row_index
117 .as_ref()
118 .map(|n| (n.name.as_str(), n.offset))
119 }
120 #[getter]
121 fn rechunk(&self, _py: Python<'_>) -> bool {
122 self.inner.rechunk
123 }
124 #[getter]
125 fn hive_options(&self, _py: Python<'_>) -> PyResult<PyObject> {
126 Err(PyNotImplementedError::new_err("hive options"))
127 }
128 #[getter]
129 fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
130 self.inner.include_file_paths.as_deref()
131 }
132
133 #[getter]
137 fn deletion_files(&self, py: Python<'_>) -> PyResult<PyObject> {
138 Ok(match &self.inner.deletion_files {
139 None => py.None().into_any(),
140
141 Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
142 let out = PyDict::new(py);
143
144 for (k, v) in paths.iter() {
145 out.set_item(*k, v.as_ref())?;
146 }
147
148 ("iceberg-position-delete", out)
149 .into_pyobject(py)?
150 .into_any()
151 .unbind()
152 },
153 })
154 }
155
156 #[getter]
160 fn column_mapping(&self, py: Python<'_>) -> PyResult<PyObject> {
161 Ok(match &self.inner.column_mapping {
162 None => py.None().into_any(),
163
164 Some(ColumnMapping::Iceberg { .. }) => unimplemented!(),
165 })
166 }
167}
168
169#[pyclass(frozen)]
170pub struct Scan {
172 #[pyo3(get)]
173 paths: PyObject,
174 #[pyo3(get)]
175 file_info: PyObject,
176 #[pyo3(get)]
177 predicate: Option<PyExprIR>,
178 #[pyo3(get)]
179 file_options: PyFileOptions,
180 #[pyo3(get)]
181 scan_type: PyObject,
182}
183
184#[pyclass(frozen)]
185pub struct DataFrameScan {
187 #[pyo3(get)]
188 df: PyDataFrame,
189 #[pyo3(get)]
190 projection: PyObject,
191 #[pyo3(get)]
192 selection: Option<PyExprIR>,
193}
194
195#[pyclass(frozen)]
196pub struct SimpleProjection {
198 #[pyo3(get)]
199 input: usize,
200}
201
202#[pyclass(frozen)]
203pub struct Select {
205 #[pyo3(get)]
206 input: usize,
207 #[pyo3(get)]
208 expr: Vec<PyExprIR>,
209 #[pyo3(get)]
210 should_broadcast: bool,
211}
212
213#[pyclass(frozen)]
214pub struct Sort {
216 #[pyo3(get)]
217 input: usize,
218 #[pyo3(get)]
219 by_column: Vec<PyExprIR>,
220 #[pyo3(get)]
221 sort_options: (bool, Vec<bool>, Vec<bool>),
222 #[pyo3(get)]
223 slice: Option<(i64, usize)>,
224}
225
226#[pyclass(frozen)]
227pub struct Cache {
229 #[pyo3(get)]
230 input: usize,
231 #[pyo3(get)]
232 id_: u128,
233}
234
235#[pyclass(frozen)]
236pub struct GroupBy {
238 #[pyo3(get)]
239 input: usize,
240 #[pyo3(get)]
241 keys: Vec<PyExprIR>,
242 #[pyo3(get)]
243 aggs: Vec<PyExprIR>,
244 #[pyo3(get)]
245 apply: (),
246 #[pyo3(get)]
247 maintain_order: bool,
248 #[pyo3(get)]
249 options: PyObject,
250}
251
252#[pyclass(frozen)]
253pub struct Join {
255 #[pyo3(get)]
256 input_left: usize,
257 #[pyo3(get)]
258 input_right: usize,
259 #[pyo3(get)]
260 left_on: Vec<PyExprIR>,
261 #[pyo3(get)]
262 right_on: Vec<PyExprIR>,
263 #[pyo3(get)]
264 options: PyObject,
265}
266
267#[pyclass(frozen)]
268pub struct MergeSorted {
270 #[pyo3(get)]
271 input_left: usize,
272 #[pyo3(get)]
273 input_right: usize,
274 #[pyo3(get)]
275 key: String,
276}
277
278#[pyclass(frozen)]
279pub struct HStack {
281 #[pyo3(get)]
282 input: usize,
283 #[pyo3(get)]
284 exprs: Vec<PyExprIR>,
285 #[pyo3(get)]
286 should_broadcast: bool,
287}
288
289#[pyclass(frozen)]
290pub struct Reduce {
292 #[pyo3(get)]
293 input: usize,
294 #[pyo3(get)]
295 exprs: Vec<PyExprIR>,
296}
297
298#[pyclass(frozen)]
299pub struct Distinct {
301 #[pyo3(get)]
302 input: usize,
303 #[pyo3(get)]
304 options: PyObject,
305}
306#[pyclass(frozen)]
307pub struct MapFunction {
309 #[pyo3(get)]
310 input: usize,
311 #[pyo3(get)]
312 function: PyObject,
313}
314#[pyclass(frozen)]
315pub struct Union {
316 #[pyo3(get)]
317 inputs: Vec<usize>,
318 #[pyo3(get)]
319 options: Option<(i64, usize)>,
320}
321#[pyclass(frozen)]
322pub struct HConcat {
324 #[pyo3(get)]
325 inputs: Vec<usize>,
326 #[pyo3(get)]
327 options: (),
328}
329#[pyclass(frozen)]
330pub struct ExtContext {
332 #[pyo3(get)]
333 input: usize,
334 #[pyo3(get)]
335 contexts: Vec<usize>,
336}
337
338#[pyclass(frozen)]
339pub struct Sink {
340 #[pyo3(get)]
341 input: usize,
342 #[pyo3(get)]
343 payload: PyObject,
344}
345
346pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
347 match plan {
348 IR::PythonScan { options } => {
349 let python_src = match options.python_source {
350 PythonScanSource::Pyarrow => "pyarrow",
351 PythonScanSource::Cuda => "cuda",
352 PythonScanSource::IOPlugin => "io_plugin",
353 };
354
355 PythonScan {
356 options: (
357 options
358 .scan_fn
359 .as_ref()
360 .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
361 options.with_columns.as_ref().map_or_else(
362 || Ok(py.None()),
363 |cols| {
364 cols.iter()
365 .map(|x| x.as_str())
366 .collect::<Vec<_>>()
367 .into_py_any(py)
368 },
369 )?,
370 python_src,
371 match &options.predicate {
372 PythonPredicate::None => py.None(),
373 PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
374 PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
375 },
376 options
377 .n_rows
378 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
379 )
380 .into_py_any(py)?,
381 }
382 .into_py_any(py)
383 },
384 IR::Slice { input, offset, len } => Slice {
385 input: input.0,
386 offset: *offset,
387 len: *len,
388 }
389 .into_py_any(py),
390 IR::Filter { input, predicate } => Filter {
391 input: input.0,
392 predicate: predicate.into(),
393 }
394 .into_py_any(py),
395 IR::Scan {
396 hive_parts: Some(_),
397 ..
398 } => Err(PyNotImplementedError::new_err(
399 "scan with hive partitioning",
400 )),
401 IR::Scan {
402 sources,
403 file_info: _,
404 hive_parts: _,
405 predicate,
406 output_schema: _,
407 scan_type,
408 unified_scan_args,
409 } => {
410 Scan {
411 paths: {
412 let paths = sources
413 .into_paths()
414 .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
415
416 let out = PyList::new(py, [] as [(); 0])?;
417
418 for path in paths.iter() {
421 out.append(path.to_str())?;
422 }
423
424 out.into_py_any(py)?
425 },
426 file_info: py.None(),
428 predicate: predicate.as_ref().map(|e| e.into()),
429 file_options: PyFileOptions {
430 inner: (**unified_scan_args).clone(),
431 },
432 scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
433 }
434 }
435 .into_py_any(py),
436 IR::DataFrameScan {
437 df,
438 schema: _,
439 output_schema,
440 } => DataFrameScan {
441 df: PyDataFrame::new((**df).clone()),
442 projection: output_schema.as_ref().map_or_else(
443 || Ok(py.None()),
444 |s| {
445 s.iter_names()
446 .map(|s| s.as_str())
447 .collect::<Vec<_>>()
448 .into_py_any(py)
449 },
450 )?,
451 selection: None,
452 }
453 .into_py_any(py),
454 IR::SimpleProjection { input, columns: _ } => {
455 SimpleProjection { input: input.0 }.into_py_any(py)
456 },
457 IR::Select {
458 input,
459 expr,
460 schema: _,
461 options,
462 } => Select {
463 expr: expr.iter().map(|e| e.into()).collect(),
464 input: input.0,
465 should_broadcast: options.should_broadcast,
466 }
467 .into_py_any(py),
468 IR::Sort {
469 input,
470 by_column,
471 slice,
472 sort_options,
473 } => Sort {
474 input: input.0,
475 by_column: by_column.iter().map(|e| e.into()).collect(),
476 sort_options: (
477 sort_options.maintain_order,
478 sort_options.nulls_last.clone(),
479 sort_options.descending.clone(),
480 ),
481 slice: *slice,
482 }
483 .into_py_any(py),
484 IR::Cache { input, id } => Cache {
485 input: input.0,
486 id_: id.as_u128(),
487 }
488 .into_py_any(py),
489 IR::GroupBy {
490 input,
491 keys,
492 aggs,
493 schema: _,
494 apply,
495 maintain_order,
496 options,
497 } => GroupBy {
498 input: input.0,
499 keys: keys.iter().map(|e| e.into()).collect(),
500 aggs: aggs.iter().map(|e| e.into()).collect(),
501 apply: apply.as_ref().map_or(Ok(()), |_| {
502 Err(PyNotImplementedError::new_err(format!(
503 "apply inside GroupBy {plan:?}"
504 )))
505 })?,
506 maintain_order: *maintain_order,
507 options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
508 }
509 .into_py_any(py),
510 IR::Join {
511 input_left,
512 input_right,
513 schema: _,
514 left_on,
515 right_on,
516 options,
517 } => Join {
518 input_left: input_left.0,
519 input_right: input_right.0,
520 left_on: left_on.iter().map(|e| e.into()).collect(),
521 right_on: right_on.iter().map(|e| e.into()).collect(),
522 options: {
523 let how = &options.args.how;
524 let name = Into::<&str>::into(how).into_pyobject(py)?;
525 (
526 match how {
527 #[cfg(feature = "asof_join")]
528 JoinType::AsOf(_) => {
529 return Err(PyNotImplementedError::new_err("asof join"));
530 },
531 #[cfg(feature = "iejoin")]
532 JoinType::IEJoin => {
533 let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
534 else {
535 unreachable!()
536 };
537 (
538 name,
539 crate::Wrap(ie_options.operator1).into_py_any(py)?,
540 ie_options.operator2.as_ref().map_or_else(
541 || Ok(py.None()),
542 |op| crate::Wrap(*op).into_py_any(py),
543 )?,
544 )
545 .into_py_any(py)?
546 },
547 JoinType::Cross if options.options.is_some() => {
550 return Err(PyNotImplementedError::new_err("nested loop join"));
551 },
552 _ => name.into_any().unbind(),
553 },
554 options.args.nulls_equal,
555 options.args.slice,
556 options.args.suffix().as_str(),
557 options.args.coalesce.coalesce(how),
558 Into::<&str>::into(options.args.maintain_order),
559 )
560 .into_py_any(py)?
561 },
562 }
563 .into_py_any(py),
564 IR::HStack {
565 input,
566 exprs,
567 schema: _,
568 options,
569 } => HStack {
570 input: input.0,
571 exprs: exprs.iter().map(|e| e.into()).collect(),
572 should_broadcast: options.should_broadcast,
573 }
574 .into_py_any(py),
575 IR::Distinct { input, options } => Distinct {
576 input: input.0,
577 options: (
578 Into::<&str>::into(options.keep_strategy),
579 options.subset.as_ref().map_or_else(
580 || Ok(py.None()),
581 |f| {
582 f.iter()
583 .map(|s| s.as_ref())
584 .collect::<Vec<&str>>()
585 .into_py_any(py)
586 },
587 )?,
588 options.maintain_order,
589 options.slice,
590 )
591 .into_py_any(py)?,
592 }
593 .into_py_any(py),
594 IR::MapFunction { input, function } => MapFunction {
595 input: input.0,
596 function: match function {
597 FunctionIR::OpaquePython(_) => {
598 return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
599 },
600 FunctionIR::Opaque {
601 function: _,
602 schema: _,
603 predicate_pd: _,
604 projection_pd: _,
605 streamable: _,
606 fmt_str: _,
607 } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
608 FunctionIR::Unnest { columns } => (
609 "unnest",
610 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
611 )
612 .into_py_any(py)?,
613 FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
614 FunctionIR::Explode { columns, schema: _ } => (
615 "explode",
616 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
617 )
618 .into_py_any(py)?,
619 #[cfg(feature = "pivot")]
620 FunctionIR::Unpivot { args, schema: _ } => (
621 "unpivot",
622 args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
623 args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
624 args.variable_name
625 .as_ref()
626 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
627 args.value_name
628 .as_ref()
629 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
630 )
631 .into_py_any(py)?,
632 FunctionIR::RowIndex {
633 name,
634 schema: _,
635 offset,
636 } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
637 FunctionIR::FastCount {
638 sources,
639 scan_type,
640 cloud_options,
641 alias,
642 } => {
643 let sources = sources
644 .into_paths()
645 .ok_or_else(|| {
646 PyNotImplementedError::new_err("FastCount with BytesIO sources")
647 })?
648 .iter()
649 .map(|p| p.to_str())
650 .collect::<Vec<_>>()
651 .into_py_any(py)?;
652
653 let scan_type = scan_type_to_pyobject(py, scan_type, cloud_options)?;
654
655 let alias = alias
656 .as_ref()
657 .map(|a| a.as_str())
658 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
659
660 ("fast_count", sources, scan_type, alias).into_py_any(py)?
661 },
662 },
663 }
664 .into_py_any(py),
665 IR::Union { inputs, options } => Union {
666 inputs: inputs.iter().map(|n| n.0).collect(),
667 options: options.slice,
669 }
670 .into_py_any(py),
671 IR::HConcat {
672 inputs,
673 schema: _,
674 options: _,
675 } => HConcat {
676 inputs: inputs.iter().map(|n| n.0).collect(),
677 options: (),
678 }
679 .into_py_any(py),
680 IR::ExtContext {
681 input,
682 contexts,
683 schema: _,
684 } => ExtContext {
685 input: input.0,
686 contexts: contexts.iter().map(|n| n.0).collect(),
687 }
688 .into_py_any(py),
689 IR::Sink { input, payload } => Sink {
690 input: input.0,
691 payload: PyString::new(
692 py,
693 &serde_json::to_string(payload)
694 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
695 )
696 .into(),
697 }
698 .into_py_any(py),
699 IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
700 "Not expecting to see a SinkMultiple node",
701 )),
702 #[cfg(feature = "merge_sorted")]
703 IR::MergeSorted {
704 input_left,
705 input_right,
706 key,
707 } => MergeSorted {
708 input_left: input_left.0,
709 input_right: input_right.0,
710 key: key.to_string(),
711 }
712 .into_py_any(py),
713 IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
714 }
715}