1use polars::prelude::ColumnMapping;
2#[cfg(feature = "iejoin")]
3use polars::prelude::JoinTypeOptionsIR;
4use polars::prelude::deletion::DeletionFilesList;
5use polars::prelude::python_dsl::PythonScanSource;
6use polars_core::prelude::IdxSize;
7use polars_io::cloud::CloudOptions;
8use polars_ops::prelude::JoinType;
9use polars_plan::plans::IR;
10use polars_plan::prelude::{FileScanIR, FunctionIR, PythonPredicate, UnifiedScanArgs};
11use pyo3::IntoPyObjectExt;
12use pyo3::exceptions::{PyNotImplementedError, PyValueError};
13use pyo3::prelude::*;
14use pyo3::types::{PyDict, PyList, PyString};
15
16use super::expr_nodes::PyGroupbyOptions;
17use crate::PyDataFrame;
18use crate::lazyframe::visit::PyExprIR;
19
20fn scan_type_to_pyobject(
21 py: Python<'_>,
22 scan_type: &FileScanIR,
23 cloud_options: &Option<CloudOptions>,
24) -> PyResult<Py<PyAny>> {
25 match scan_type {
26 #[cfg(feature = "csv")]
27 FileScanIR::Csv { options } => {
28 let options = serde_json::to_string(options)
29 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30 let cloud_options = serde_json::to_string(cloud_options)
31 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
32 Ok(("csv", options, cloud_options).into_py_any(py)?)
33 },
34 #[cfg(feature = "parquet")]
35 FileScanIR::Parquet { options, .. } => {
36 let options = serde_json::to_string(options)
37 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38 let cloud_options = serde_json::to_string(cloud_options)
39 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
40 Ok(("parquet", options, cloud_options).into_py_any(py)?)
41 },
42 #[cfg(feature = "ipc")]
43 FileScanIR::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
44 #[cfg(feature = "json")]
45 FileScanIR::NDJson { options, .. } => {
46 let options = serde_json::to_string(options)
47 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
48 Ok(("ndjson", options).into_py_any(py)?)
49 },
50 #[cfg(feature = "scan_lines")]
51 FileScanIR::Lines { name } => Ok(("lines", name.as_str()).into_py_any(py)?),
52 FileScanIR::PythonDataset { .. } => {
53 Err(PyNotImplementedError::new_err("python dataset scan"))
54 },
55 FileScanIR::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
56 }
57}
58
59#[pyclass(frozen)]
60pub struct PythonScan {
62 #[pyo3(get)]
63 options: Py<PyAny>,
64}
65
66#[pyclass(frozen)]
67pub struct Slice {
69 #[pyo3(get)]
70 input: usize,
71 #[pyo3(get)]
72 offset: i64,
73 #[pyo3(get)]
74 len: IdxSize,
75}
76
77#[pyclass(frozen)]
78pub struct Filter {
80 #[pyo3(get)]
81 input: usize,
82 #[pyo3(get)]
83 predicate: PyExprIR,
84}
85
86#[pyclass(frozen)]
87#[derive(Clone)]
88pub struct PyFileOptions {
89 inner: UnifiedScanArgs,
90}
91
92#[pymethods]
93impl PyFileOptions {
94 #[getter]
95 fn n_rows(&self) -> Option<(i64, usize)> {
96 self.inner
97 .pre_slice
98 .clone()
99 .map(|slice| <(i64, usize)>::try_from(slice).unwrap())
100 }
101 #[getter]
102 fn with_columns(&self) -> Option<Vec<&str>> {
103 self.inner
104 .projection
105 .as_ref()?
106 .iter()
107 .map(|x| x.as_str())
108 .collect::<Vec<_>>()
109 .into()
110 }
111 #[getter]
112 fn cache(&self, _py: Python<'_>) -> bool {
113 self.inner.cache
114 }
115 #[getter]
116 fn row_index(&self) -> Option<(&str, IdxSize)> {
117 self.inner
118 .row_index
119 .as_ref()
120 .map(|n| (n.name.as_str(), n.offset))
121 }
122 #[getter]
123 fn rechunk(&self, _py: Python<'_>) -> bool {
124 self.inner.rechunk
125 }
126 #[getter]
127 fn hive_options(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {
128 Err(PyNotImplementedError::new_err("hive options"))
129 }
130 #[getter]
131 fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
132 self.inner.include_file_paths.as_deref()
133 }
134
135 #[getter]
139 fn deletion_files(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
140 Ok(match &self.inner.deletion_files {
141 None => py.None().into_any(),
142
143 Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
144 let out = PyDict::new(py);
145
146 for (k, v) in paths.iter() {
147 out.set_item(*k, v.as_ref())?;
148 }
149
150 ("iceberg-position-delete", out)
151 .into_pyobject(py)?
152 .into_any()
153 .unbind()
154 },
155 })
156 }
157
158 #[getter]
162 fn column_mapping(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
163 Ok(match &self.inner.column_mapping {
164 None => py.None().into_any(),
165
166 Some(ColumnMapping::Iceberg { .. }) => unimplemented!(),
167 })
168 }
169}
170
171#[pyclass(frozen)]
172pub struct Scan {
174 #[pyo3(get)]
175 paths: Py<PyAny>,
176 #[pyo3(get)]
177 file_info: Py<PyAny>,
178 #[pyo3(get)]
179 predicate: Option<PyExprIR>,
180 #[pyo3(get)]
181 file_options: PyFileOptions,
182 #[pyo3(get)]
183 scan_type: Py<PyAny>,
184}
185
186#[pyclass(frozen)]
187pub struct DataFrameScan {
189 #[pyo3(get)]
190 df: PyDataFrame,
191 #[pyo3(get)]
192 projection: Py<PyAny>,
193 #[pyo3(get)]
194 selection: Option<PyExprIR>,
195}
196
197#[pyclass(frozen)]
198pub struct SimpleProjection {
200 #[pyo3(get)]
201 input: usize,
202}
203
204#[pyclass(frozen)]
205pub struct Select {
207 #[pyo3(get)]
208 input: usize,
209 #[pyo3(get)]
210 expr: Vec<PyExprIR>,
211 #[pyo3(get)]
212 should_broadcast: bool,
213}
214
215#[pyclass(frozen)]
216pub struct Sort {
218 #[pyo3(get)]
219 input: usize,
220 #[pyo3(get)]
221 by_column: Vec<PyExprIR>,
222 #[pyo3(get)]
223 sort_options: (bool, Vec<bool>, Vec<bool>),
224 #[pyo3(get)]
225 slice: Option<(i64, usize)>,
226}
227
228#[pyclass(frozen)]
229pub struct Cache {
231 #[pyo3(get)]
232 input: usize,
233 #[pyo3(get)]
234 id_: u128,
235}
236
237#[pyclass(frozen)]
238pub struct GroupBy {
240 #[pyo3(get)]
241 input: usize,
242 #[pyo3(get)]
243 keys: Vec<PyExprIR>,
244 #[pyo3(get)]
245 aggs: Vec<PyExprIR>,
246 #[pyo3(get)]
247 apply: (),
248 #[pyo3(get)]
249 maintain_order: bool,
250 #[pyo3(get)]
251 options: Py<PyAny>,
252}
253
254#[pyclass(frozen)]
255pub struct Join {
257 #[pyo3(get)]
258 input_left: usize,
259 #[pyo3(get)]
260 input_right: usize,
261 #[pyo3(get)]
262 left_on: Vec<PyExprIR>,
263 #[pyo3(get)]
264 right_on: Vec<PyExprIR>,
265 #[pyo3(get)]
266 options: Py<PyAny>,
267}
268
269#[pyclass(frozen)]
270pub struct MergeSorted {
272 #[pyo3(get)]
273 input_left: usize,
274 #[pyo3(get)]
275 input_right: usize,
276 #[pyo3(get)]
277 key: String,
278}
279
280#[pyclass(frozen)]
281pub struct HStack {
283 #[pyo3(get)]
284 input: usize,
285 #[pyo3(get)]
286 exprs: Vec<PyExprIR>,
287 #[pyo3(get)]
288 should_broadcast: bool,
289}
290
291#[pyclass(frozen)]
292pub struct Reduce {
294 #[pyo3(get)]
295 input: usize,
296 #[pyo3(get)]
297 exprs: Vec<PyExprIR>,
298}
299
300#[pyclass(frozen)]
301pub struct Distinct {
303 #[pyo3(get)]
304 input: usize,
305 #[pyo3(get)]
306 options: Py<PyAny>,
307}
308#[pyclass(frozen)]
309pub struct MapFunction {
311 #[pyo3(get)]
312 input: usize,
313 #[pyo3(get)]
314 function: Py<PyAny>,
315}
316#[pyclass(frozen)]
317pub struct Union {
318 #[pyo3(get)]
319 inputs: Vec<usize>,
320 #[pyo3(get)]
321 options: Option<(i64, usize)>,
322}
323#[pyclass(frozen)]
324pub struct HConcat {
326 #[pyo3(get)]
327 inputs: Vec<usize>,
328 #[pyo3(get)]
329 options: (),
330}
331#[pyclass(frozen)]
332pub struct ExtContext {
334 #[pyo3(get)]
335 input: usize,
336 #[pyo3(get)]
337 contexts: Vec<usize>,
338}
339
340#[pyclass(frozen)]
341pub struct Sink {
342 #[pyo3(get)]
343 input: usize,
344 #[pyo3(get)]
345 payload: Py<PyAny>,
346}
347
348pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<Py<PyAny>> {
349 match plan {
350 IR::PythonScan { options } => {
351 let python_src = match options.python_source {
352 PythonScanSource::Pyarrow => "pyarrow",
353 PythonScanSource::Cuda => "cuda",
354 PythonScanSource::IOPlugin => "io_plugin",
355 };
356
357 PythonScan {
358 options: (
359 options
360 .scan_fn
361 .as_ref()
362 .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
363 options.with_columns.as_ref().map_or_else(
364 || Ok(py.None()),
365 |cols| {
366 cols.iter()
367 .map(|x| x.as_str())
368 .collect::<Vec<_>>()
369 .into_py_any(py)
370 },
371 )?,
372 python_src,
373 match &options.predicate {
374 PythonPredicate::None => py.None(),
375 PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
376 PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
377 },
378 options
379 .n_rows
380 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
381 )
382 .into_py_any(py)?,
383 }
384 .into_py_any(py)
385 },
386 IR::Slice { input, offset, len } => Slice {
387 input: input.0,
388 offset: *offset,
389 len: *len,
390 }
391 .into_py_any(py),
392 IR::Filter { input, predicate } => Filter {
393 input: input.0,
394 predicate: predicate.into(),
395 }
396 .into_py_any(py),
397 IR::Scan {
398 hive_parts: Some(_),
399 ..
400 } => Err(PyNotImplementedError::new_err(
401 "scan with hive partitioning",
402 )),
403 IR::Scan {
404 sources,
405 file_info: _,
406 hive_parts: _,
407 predicate,
408 predicate_file_skip_applied,
409 output_schema: _,
410 scan_type,
411 unified_scan_args,
412 } => {
413 Scan {
414 paths: {
415 let paths = sources
416 .into_paths()
417 .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
418
419 let out = PyList::new(py, [] as [(); 0])?;
420
421 for path in paths.iter() {
424 out.append(path.to_str())?;
425 }
426
427 out.into_py_any(py)?
428 },
429 file_info: py.None(),
431 predicate: predicate
432 .as_ref()
433 .filter(|_| *predicate_file_skip_applied != Some(true))
434 .map(|e| e.into()),
435 file_options: PyFileOptions {
436 inner: (**unified_scan_args).clone(),
437 },
438 scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
439 }
440 }
441 .into_py_any(py),
442 IR::DataFrameScan {
443 df,
444 schema: _,
445 output_schema,
446 } => DataFrameScan {
447 df: PyDataFrame::new((**df).clone()),
448 projection: output_schema.as_ref().map_or_else(
449 || Ok(py.None()),
450 |s| {
451 s.iter_names()
452 .map(|s| s.as_str())
453 .collect::<Vec<_>>()
454 .into_py_any(py)
455 },
456 )?,
457 selection: None,
458 }
459 .into_py_any(py),
460 IR::SimpleProjection { input, columns: _ } => {
461 SimpleProjection { input: input.0 }.into_py_any(py)
462 },
463 IR::Select {
464 input,
465 expr,
466 schema: _,
467 options,
468 } => Select {
469 expr: expr.iter().map(|e| e.into()).collect(),
470 input: input.0,
471 should_broadcast: options.should_broadcast,
472 }
473 .into_py_any(py),
474 IR::Sort {
475 input,
476 by_column,
477 slice,
478 sort_options,
479 } => Sort {
480 input: input.0,
481 by_column: by_column.iter().map(|e| e.into()).collect(),
482 sort_options: (
483 sort_options.maintain_order,
484 sort_options.nulls_last.clone(),
485 sort_options.descending.clone(),
486 ),
487 slice: *slice,
488 }
489 .into_py_any(py),
490 IR::Cache { input, id } => Cache {
491 input: input.0,
492 id_: id.as_u128(),
493 }
494 .into_py_any(py),
495 IR::GroupBy {
496 input,
497 keys,
498 aggs,
499 schema: _,
500 apply,
501 maintain_order,
502 options,
503 } => GroupBy {
504 input: input.0,
505 keys: keys.iter().map(|e| e.into()).collect(),
506 aggs: aggs.iter().map(|e| e.into()).collect(),
507 apply: apply.as_ref().map_or(Ok(()), |_| {
508 Err(PyNotImplementedError::new_err(format!(
509 "apply inside GroupBy {plan:?}"
510 )))
511 })?,
512 maintain_order: *maintain_order,
513 options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
514 }
515 .into_py_any(py),
516 IR::Join {
517 input_left,
518 input_right,
519 schema: _,
520 left_on,
521 right_on,
522 options,
523 } => Join {
524 input_left: input_left.0,
525 input_right: input_right.0,
526 left_on: left_on.iter().map(|e| e.into()).collect(),
527 right_on: right_on.iter().map(|e| e.into()).collect(),
528 options: {
529 let how = &options.args.how;
530 let name = Into::<&str>::into(how).into_pyobject(py)?;
531 (
532 match how {
533 #[cfg(feature = "asof_join")]
534 JoinType::AsOf(_) => {
535 return Err(PyNotImplementedError::new_err("asof join"));
536 },
537 #[cfg(feature = "iejoin")]
538 JoinType::IEJoin => {
539 let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
540 else {
541 unreachable!()
542 };
543 (
544 name,
545 crate::Wrap(ie_options.operator1).into_py_any(py)?,
546 ie_options.operator2.as_ref().map_or_else(
547 || Ok(py.None()),
548 |op| crate::Wrap(*op).into_py_any(py),
549 )?,
550 )
551 .into_py_any(py)?
552 },
553 JoinType::Cross if options.options.is_some() => {
556 return Err(PyNotImplementedError::new_err("nested loop join"));
557 },
558 _ => name.into_any().unbind(),
559 },
560 options.args.nulls_equal,
561 options.args.slice,
562 options.args.suffix().as_str(),
563 options.args.coalesce.coalesce(how),
564 Into::<&str>::into(options.args.maintain_order),
565 )
566 .into_py_any(py)?
567 },
568 }
569 .into_py_any(py),
570 IR::HStack {
571 input,
572 exprs,
573 schema: _,
574 options,
575 } => HStack {
576 input: input.0,
577 exprs: exprs.iter().map(|e| e.into()).collect(),
578 should_broadcast: options.should_broadcast,
579 }
580 .into_py_any(py),
581 IR::Distinct { input, options } => Distinct {
582 input: input.0,
583 options: (
584 Into::<&str>::into(options.keep_strategy),
585 options.subset.as_ref().map_or_else(
586 || Ok(py.None()),
587 |f| {
588 f.iter()
589 .map(|s| s.as_ref())
590 .collect::<Vec<&str>>()
591 .into_py_any(py)
592 },
593 )?,
594 options.maintain_order,
595 options.slice,
596 )
597 .into_py_any(py)?,
598 }
599 .into_py_any(py),
600 IR::MapFunction { input, function } => MapFunction {
601 input: input.0,
602 function: match function {
603 FunctionIR::OpaquePython(_) => {
604 return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
605 },
606 FunctionIR::Opaque {
607 function: _,
608 schema: _,
609 predicate_pd: _,
610 projection_pd: _,
611 streamable: _,
612 fmt_str: _,
613 } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
614 FunctionIR::Unnest { columns, separator } => (
615 "unnest",
616 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
617 separator.as_ref().map(|s| s.to_string()),
618 )
619 .into_py_any(py)?,
620 FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
621 FunctionIR::Explode { columns, schema: _ } => (
622 "explode",
623 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
624 )
625 .into_py_any(py)?,
626 #[cfg(feature = "pivot")]
627 FunctionIR::Unpivot { args, schema: _ } => (
628 "unpivot",
629 args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
630 args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
631 args.variable_name
632 .as_ref()
633 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
634 args.value_name
635 .as_ref()
636 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
637 )
638 .into_py_any(py)?,
639 FunctionIR::RowIndex {
640 name,
641 schema: _,
642 offset,
643 } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
644 FunctionIR::FastCount {
645 sources,
646 scan_type,
647 cloud_options,
648 alias,
649 } => {
650 let sources = sources
651 .into_paths()
652 .ok_or_else(|| {
653 PyNotImplementedError::new_err("FastCount with BytesIO sources")
654 })?
655 .iter()
656 .map(|p| p.to_str())
657 .collect::<Vec<_>>()
658 .into_py_any(py)?;
659
660 let scan_type = scan_type_to_pyobject(py, scan_type, cloud_options)?;
661
662 let alias = alias
663 .as_ref()
664 .map(|a| a.as_str())
665 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
666
667 ("fast_count", sources, scan_type, alias).into_py_any(py)?
668 },
669 FunctionIR::Hint(_) => return Err(PyNotImplementedError::new_err("hint ir")),
670 },
671 }
672 .into_py_any(py),
673 IR::Union { inputs, options } => Union {
674 inputs: inputs.iter().map(|n| n.0).collect(),
675 options: options.slice,
677 }
678 .into_py_any(py),
679 IR::HConcat {
680 inputs,
681 schema: _,
682 options: _,
683 } => HConcat {
684 inputs: inputs.iter().map(|n| n.0).collect(),
685 options: (),
686 }
687 .into_py_any(py),
688 IR::ExtContext {
689 input,
690 contexts,
691 schema: _,
692 } => ExtContext {
693 input: input.0,
694 contexts: contexts.iter().map(|n| n.0).collect(),
695 }
696 .into_py_any(py),
697 IR::Sink { input, payload } => Sink {
698 input: input.0,
699 payload: PyString::new(
700 py,
701 &serde_json::to_string(payload)
702 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
703 )
704 .into(),
705 }
706 .into_py_any(py),
707 IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
708 "Not expecting to see a SinkMultiple node",
709 )),
710 #[cfg(feature = "merge_sorted")]
711 IR::MergeSorted {
712 input_left,
713 input_right,
714 key,
715 } => MergeSorted {
716 input_left: input_left.0,
717 input_right: input_right.0,
718 key: key.to_string(),
719 }
720 .into_py_any(py),
721 IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
722 }
723}