1use polars::prelude::ColumnMapping;
2#[cfg(feature = "iejoin")]
3use polars::prelude::JoinTypeOptionsIR;
4use polars::prelude::deletion::DeletionFilesList;
5use polars::prelude::python_dsl::PythonScanSource;
6use polars_core::prelude::IdxSize;
7use polars_io::cloud::CloudOptions;
8use polars_ops::prelude::JoinType;
9use polars_plan::plans::IR;
10use polars_plan::prelude::{FileScanIR, FunctionIR, PythonPredicate, UnifiedScanArgs};
11use pyo3::IntoPyObjectExt;
12use pyo3::exceptions::{PyNotImplementedError, PyValueError};
13use pyo3::prelude::*;
14use pyo3::types::{PyDict, PyList, PyString};
15
16use super::expr_nodes::PyGroupbyOptions;
17use crate::PyDataFrame;
18use crate::lazyframe::visit::PyExprIR;
19
20fn scan_type_to_pyobject(
21 py: Python<'_>,
22 scan_type: &FileScanIR,
23 cloud_options: &Option<CloudOptions>,
24) -> PyResult<PyObject> {
25 match scan_type {
26 #[cfg(feature = "csv")]
27 FileScanIR::Csv { options } => {
28 let options = serde_json::to_string(options)
29 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30 let cloud_options = serde_json::to_string(cloud_options)
31 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
32 Ok(("csv", options, cloud_options).into_py_any(py)?)
33 },
34 #[cfg(feature = "parquet")]
35 FileScanIR::Parquet { options, .. } => {
36 let options = serde_json::to_string(options)
37 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38 let cloud_options = serde_json::to_string(cloud_options)
39 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
40 Ok(("parquet", options, cloud_options).into_py_any(py)?)
41 },
42 #[cfg(feature = "ipc")]
43 FileScanIR::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
44 #[cfg(feature = "json")]
45 FileScanIR::NDJson { options, .. } => {
46 let options = serde_json::to_string(options)
47 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
48 Ok(("ndjson", options).into_py_any(py)?)
49 },
50 FileScanIR::PythonDataset { .. } => {
51 Err(PyNotImplementedError::new_err("python dataset scan"))
52 },
53 FileScanIR::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
54 }
55}
56
57#[pyclass]
58pub struct PythonScan {
60 #[pyo3(get)]
61 options: PyObject,
62}
63
64#[pyclass]
65pub struct Slice {
67 #[pyo3(get)]
68 input: usize,
69 #[pyo3(get)]
70 offset: i64,
71 #[pyo3(get)]
72 len: IdxSize,
73}
74
75#[pyclass]
76pub struct Filter {
78 #[pyo3(get)]
79 input: usize,
80 #[pyo3(get)]
81 predicate: PyExprIR,
82}
83
84#[pyclass]
85#[derive(Clone)]
86pub struct PyFileOptions {
87 inner: UnifiedScanArgs,
88}
89
90#[pymethods]
91impl PyFileOptions {
92 #[getter]
93 fn n_rows(&self) -> Option<(i64, usize)> {
94 self.inner
95 .pre_slice
96 .clone()
97 .map(|slice| <(i64, usize)>::try_from(slice).unwrap())
98 }
99 #[getter]
100 fn with_columns(&self) -> Option<Vec<&str>> {
101 self.inner
102 .projection
103 .as_ref()?
104 .iter()
105 .map(|x| x.as_str())
106 .collect::<Vec<_>>()
107 .into()
108 }
109 #[getter]
110 fn cache(&self, _py: Python<'_>) -> bool {
111 self.inner.cache
112 }
113 #[getter]
114 fn row_index(&self) -> Option<(&str, IdxSize)> {
115 self.inner
116 .row_index
117 .as_ref()
118 .map(|n| (n.name.as_str(), n.offset))
119 }
120 #[getter]
121 fn rechunk(&self, _py: Python<'_>) -> bool {
122 self.inner.rechunk
123 }
124 #[getter]
125 fn hive_options(&self, _py: Python<'_>) -> PyResult<PyObject> {
126 Err(PyNotImplementedError::new_err("hive options"))
127 }
128 #[getter]
129 fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
130 self.inner.include_file_paths.as_deref()
131 }
132
133 #[getter]
137 fn deletion_files(&self, py: Python<'_>) -> PyResult<PyObject> {
138 Ok(match &self.inner.deletion_files {
139 None => py.None().into_any(),
140
141 Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
142 let out = PyDict::new(py);
143
144 for (k, v) in paths.iter() {
145 out.set_item(*k, v.as_ref())?;
146 }
147
148 ("iceberg-position-delete", out)
149 .into_pyobject(py)?
150 .into_any()
151 .unbind()
152 },
153 })
154 }
155
156 #[getter]
160 fn column_mapping(&self, py: Python<'_>) -> PyResult<PyObject> {
161 Ok(match &self.inner.column_mapping {
162 None => py.None().into_any(),
163
164 Some(ColumnMapping::Iceberg { .. }) => unimplemented!(),
165 })
166 }
167}
168
169#[pyclass]
170pub struct Scan {
172 #[pyo3(get)]
173 paths: PyObject,
174 #[pyo3(get)]
175 file_info: PyObject,
176 #[pyo3(get)]
177 predicate: Option<PyExprIR>,
178 #[pyo3(get)]
179 file_options: PyFileOptions,
180 #[pyo3(get)]
181 scan_type: PyObject,
182}
183
184#[pyclass]
185pub struct DataFrameScan {
187 #[pyo3(get)]
188 df: PyDataFrame,
189 #[pyo3(get)]
190 projection: PyObject,
191 #[pyo3(get)]
192 selection: Option<PyExprIR>,
193}
194
195#[pyclass]
196pub struct SimpleProjection {
198 #[pyo3(get)]
199 input: usize,
200}
201
202#[pyclass]
203pub struct Select {
205 #[pyo3(get)]
206 input: usize,
207 #[pyo3(get)]
208 expr: Vec<PyExprIR>,
209 #[pyo3(get)]
210 should_broadcast: bool,
211}
212
213#[pyclass]
214pub struct Sort {
216 #[pyo3(get)]
217 input: usize,
218 #[pyo3(get)]
219 by_column: Vec<PyExprIR>,
220 #[pyo3(get)]
221 sort_options: (bool, Vec<bool>, Vec<bool>),
222 #[pyo3(get)]
223 slice: Option<(i64, usize)>,
224}
225
226#[pyclass]
227pub struct Cache {
229 #[pyo3(get)]
230 input: usize,
231 #[pyo3(get)]
232 id_: u128,
233 #[pyo3(get)]
234 cache_hits: u32,
235}
236
237#[pyclass]
238pub struct GroupBy {
240 #[pyo3(get)]
241 input: usize,
242 #[pyo3(get)]
243 keys: Vec<PyExprIR>,
244 #[pyo3(get)]
245 aggs: Vec<PyExprIR>,
246 #[pyo3(get)]
247 apply: (),
248 #[pyo3(get)]
249 maintain_order: bool,
250 #[pyo3(get)]
251 options: PyObject,
252}
253
254#[pyclass]
255pub struct Join {
257 #[pyo3(get)]
258 input_left: usize,
259 #[pyo3(get)]
260 input_right: usize,
261 #[pyo3(get)]
262 left_on: Vec<PyExprIR>,
263 #[pyo3(get)]
264 right_on: Vec<PyExprIR>,
265 #[pyo3(get)]
266 options: PyObject,
267}
268
269#[pyclass]
270pub struct MergeSorted {
272 #[pyo3(get)]
273 input_left: usize,
274 #[pyo3(get)]
275 input_right: usize,
276 #[pyo3(get)]
277 key: String,
278}
279
280#[pyclass]
281pub struct HStack {
283 #[pyo3(get)]
284 input: usize,
285 #[pyo3(get)]
286 exprs: Vec<PyExprIR>,
287 #[pyo3(get)]
288 should_broadcast: bool,
289}
290
291#[pyclass]
292pub struct Reduce {
294 #[pyo3(get)]
295 input: usize,
296 #[pyo3(get)]
297 exprs: Vec<PyExprIR>,
298}
299
300#[pyclass]
301pub struct Distinct {
303 #[pyo3(get)]
304 input: usize,
305 #[pyo3(get)]
306 options: PyObject,
307}
308#[pyclass]
309pub struct MapFunction {
311 #[pyo3(get)]
312 input: usize,
313 #[pyo3(get)]
314 function: PyObject,
315}
316#[pyclass]
317pub struct Union {
318 #[pyo3(get)]
319 inputs: Vec<usize>,
320 #[pyo3(get)]
321 options: Option<(i64, usize)>,
322}
323#[pyclass]
324pub struct HConcat {
326 #[pyo3(get)]
327 inputs: Vec<usize>,
328 #[pyo3(get)]
329 options: (),
330}
331#[pyclass]
332pub struct ExtContext {
334 #[pyo3(get)]
335 input: usize,
336 #[pyo3(get)]
337 contexts: Vec<usize>,
338}
339
340#[pyclass]
341pub struct Sink {
342 #[pyo3(get)]
343 input: usize,
344 #[pyo3(get)]
345 payload: PyObject,
346}
347
348pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
349 match plan {
350 IR::PythonScan { options } => {
351 let python_src = match options.python_source {
352 PythonScanSource::Pyarrow => "pyarrow",
353 PythonScanSource::Cuda => "cuda",
354 PythonScanSource::IOPlugin => "io_plugin",
355 };
356
357 PythonScan {
358 options: (
359 options
360 .scan_fn
361 .as_ref()
362 .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
363 options.with_columns.as_ref().map_or_else(
364 || Ok(py.None()),
365 |cols| {
366 cols.iter()
367 .map(|x| x.as_str())
368 .collect::<Vec<_>>()
369 .into_py_any(py)
370 },
371 )?,
372 python_src,
373 match &options.predicate {
374 PythonPredicate::None => py.None(),
375 PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
376 PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
377 },
378 options
379 .n_rows
380 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
381 )
382 .into_py_any(py)?,
383 }
384 .into_py_any(py)
385 },
386 IR::Slice { input, offset, len } => Slice {
387 input: input.0,
388 offset: *offset,
389 len: *len,
390 }
391 .into_py_any(py),
392 IR::Filter { input, predicate } => Filter {
393 input: input.0,
394 predicate: predicate.into(),
395 }
396 .into_py_any(py),
397 IR::Scan {
398 hive_parts: Some(_),
399 ..
400 } => Err(PyNotImplementedError::new_err(
401 "scan with hive partitioning",
402 )),
403 IR::Scan {
404 sources,
405 file_info: _,
406 hive_parts: _,
407 predicate,
408 output_schema: _,
409 scan_type,
410 unified_scan_args,
411 } => {
412 Scan {
413 paths: {
414 let paths = sources
415 .into_paths()
416 .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
417
418 let out = PyList::new(py, [] as [(); 0])?;
419
420 for path in paths.iter() {
423 out.append(path.to_str())?;
424 }
425
426 out.into_py_any(py)?
427 },
428 file_info: py.None(),
430 predicate: predicate.as_ref().map(|e| e.into()),
431 file_options: PyFileOptions {
432 inner: (**unified_scan_args).clone(),
433 },
434 scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
435 }
436 }
437 .into_py_any(py),
438 IR::DataFrameScan {
439 df,
440 schema: _,
441 output_schema,
442 } => DataFrameScan {
443 df: PyDataFrame::new((**df).clone()),
444 projection: output_schema.as_ref().map_or_else(
445 || Ok(py.None()),
446 |s| {
447 s.iter_names()
448 .map(|s| s.as_str())
449 .collect::<Vec<_>>()
450 .into_py_any(py)
451 },
452 )?,
453 selection: None,
454 }
455 .into_py_any(py),
456 IR::SimpleProjection { input, columns: _ } => {
457 SimpleProjection { input: input.0 }.into_py_any(py)
458 },
459 IR::Select {
460 input,
461 expr,
462 schema: _,
463 options,
464 } => Select {
465 expr: expr.iter().map(|e| e.into()).collect(),
466 input: input.0,
467 should_broadcast: options.should_broadcast,
468 }
469 .into_py_any(py),
470 IR::Sort {
471 input,
472 by_column,
473 slice,
474 sort_options,
475 } => Sort {
476 input: input.0,
477 by_column: by_column.iter().map(|e| e.into()).collect(),
478 sort_options: (
479 sort_options.maintain_order,
480 sort_options.nulls_last.clone(),
481 sort_options.descending.clone(),
482 ),
483 slice: *slice,
484 }
485 .into_py_any(py),
486 IR::Cache {
487 input,
488 id,
489 cache_hits,
490 } => Cache {
491 input: input.0,
492 id_: id.as_u128(),
493 cache_hits: *cache_hits,
494 }
495 .into_py_any(py),
496 IR::GroupBy {
497 input,
498 keys,
499 aggs,
500 schema: _,
501 apply,
502 maintain_order,
503 options,
504 } => GroupBy {
505 input: input.0,
506 keys: keys.iter().map(|e| e.into()).collect(),
507 aggs: aggs.iter().map(|e| e.into()).collect(),
508 apply: apply.as_ref().map_or(Ok(()), |_| {
509 Err(PyNotImplementedError::new_err(format!(
510 "apply inside GroupBy {plan:?}"
511 )))
512 })?,
513 maintain_order: *maintain_order,
514 options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
515 }
516 .into_py_any(py),
517 IR::Join {
518 input_left,
519 input_right,
520 schema: _,
521 left_on,
522 right_on,
523 options,
524 } => Join {
525 input_left: input_left.0,
526 input_right: input_right.0,
527 left_on: left_on.iter().map(|e| e.into()).collect(),
528 right_on: right_on.iter().map(|e| e.into()).collect(),
529 options: {
530 let how = &options.args.how;
531 let name = Into::<&str>::into(how).into_pyobject(py)?;
532 (
533 match how {
534 #[cfg(feature = "asof_join")]
535 JoinType::AsOf(_) => {
536 return Err(PyNotImplementedError::new_err("asof join"));
537 },
538 #[cfg(feature = "iejoin")]
539 JoinType::IEJoin => {
540 let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
541 else {
542 unreachable!()
543 };
544 (
545 name,
546 crate::Wrap(ie_options.operator1).into_py_any(py)?,
547 ie_options.operator2.as_ref().map_or_else(
548 || Ok(py.None()),
549 |op| crate::Wrap(*op).into_py_any(py),
550 )?,
551 )
552 .into_py_any(py)?
553 },
554 JoinType::Cross if options.options.is_some() => {
557 return Err(PyNotImplementedError::new_err("nested loop join"));
558 },
559 _ => name.into_any().unbind(),
560 },
561 options.args.nulls_equal,
562 options.args.slice,
563 options.args.suffix().as_str(),
564 options.args.coalesce.coalesce(how),
565 Into::<&str>::into(options.args.maintain_order),
566 )
567 .into_py_any(py)?
568 },
569 }
570 .into_py_any(py),
571 IR::HStack {
572 input,
573 exprs,
574 schema: _,
575 options,
576 } => HStack {
577 input: input.0,
578 exprs: exprs.iter().map(|e| e.into()).collect(),
579 should_broadcast: options.should_broadcast,
580 }
581 .into_py_any(py),
582 IR::Distinct { input, options } => Distinct {
583 input: input.0,
584 options: (
585 Into::<&str>::into(options.keep_strategy),
586 options.subset.as_ref().map_or_else(
587 || Ok(py.None()),
588 |f| {
589 f.iter()
590 .map(|s| s.as_ref())
591 .collect::<Vec<&str>>()
592 .into_py_any(py)
593 },
594 )?,
595 options.maintain_order,
596 options.slice,
597 )
598 .into_py_any(py)?,
599 }
600 .into_py_any(py),
601 IR::MapFunction { input, function } => MapFunction {
602 input: input.0,
603 function: match function {
604 FunctionIR::OpaquePython(_) => {
605 return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
606 },
607 FunctionIR::Opaque {
608 function: _,
609 schema: _,
610 predicate_pd: _,
611 projection_pd: _,
612 streamable: _,
613 fmt_str: _,
614 } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
615 FunctionIR::Unnest { columns } => (
616 "unnest",
617 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
618 )
619 .into_py_any(py)?,
620 FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
621 FunctionIR::Explode { columns, schema: _ } => (
622 "explode",
623 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
624 )
625 .into_py_any(py)?,
626 #[cfg(feature = "pivot")]
627 FunctionIR::Unpivot { args, schema: _ } => (
628 "unpivot",
629 args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
630 args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
631 args.variable_name
632 .as_ref()
633 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
634 args.value_name
635 .as_ref()
636 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
637 )
638 .into_py_any(py)?,
639 FunctionIR::RowIndex {
640 name,
641 schema: _,
642 offset,
643 } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
644 FunctionIR::FastCount {
645 sources,
646 scan_type,
647 cloud_options,
648 alias,
649 } => {
650 let sources = sources
651 .into_paths()
652 .ok_or_else(|| {
653 PyNotImplementedError::new_err("FastCount with BytesIO sources")
654 })?
655 .iter()
656 .map(|p| p.to_str())
657 .collect::<Vec<_>>()
658 .into_py_any(py)?;
659
660 let scan_type = scan_type_to_pyobject(py, scan_type, cloud_options)?;
661
662 let alias = alias
663 .as_ref()
664 .map(|a| a.as_str())
665 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
666
667 ("fast_count", sources, scan_type, alias).into_py_any(py)?
668 },
669 },
670 }
671 .into_py_any(py),
672 IR::Union { inputs, options } => Union {
673 inputs: inputs.iter().map(|n| n.0).collect(),
674 options: options.slice,
676 }
677 .into_py_any(py),
678 IR::HConcat {
679 inputs,
680 schema: _,
681 options: _,
682 } => HConcat {
683 inputs: inputs.iter().map(|n| n.0).collect(),
684 options: (),
685 }
686 .into_py_any(py),
687 IR::ExtContext {
688 input,
689 contexts,
690 schema: _,
691 } => ExtContext {
692 input: input.0,
693 contexts: contexts.iter().map(|n| n.0).collect(),
694 }
695 .into_py_any(py),
696 IR::Sink { input, payload } => Sink {
697 input: input.0,
698 payload: PyString::new(
699 py,
700 &serde_json::to_string(payload)
701 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
702 )
703 .into(),
704 }
705 .into_py_any(py),
706 IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
707 "Not expecting to see a SinkMultiple node",
708 )),
709 #[cfg(feature = "merge_sorted")]
710 IR::MergeSorted {
711 input_left,
712 input_right,
713 key,
714 } => MergeSorted {
715 input_left: input_left.0,
716 input_right: input_right.0,
717 key: key.to_string(),
718 }
719 .into_py_any(py),
720 IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
721 }
722}