1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars::prelude::deletion::DeletionFilesList;
4use polars::prelude::python_dsl::PythonScanSource;
5use polars_core::prelude::IdxSize;
6use polars_io::cloud::CloudOptions;
7use polars_ops::prelude::JoinType;
8use polars_plan::plans::IR;
9use polars_plan::prelude::{FileScan, FunctionIR, PythonPredicate, UnifiedScanArgs};
10use pyo3::IntoPyObjectExt;
11use pyo3::exceptions::{PyNotImplementedError, PyValueError};
12use pyo3::prelude::*;
13use pyo3::types::{PyDict, PyList, PyString};
14
15use super::expr_nodes::PyGroupbyOptions;
16use crate::PyDataFrame;
17use crate::lazyframe::visit::PyExprIR;
18
19fn scan_type_to_pyobject(
20 py: Python<'_>,
21 scan_type: &FileScan,
22 cloud_options: &Option<CloudOptions>,
23) -> PyResult<PyObject> {
24 match scan_type {
25 #[cfg(feature = "csv")]
26 FileScan::Csv { options } => {
27 let options = serde_json::to_string(options)
28 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
29 let cloud_options = serde_json::to_string(cloud_options)
30 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
31 Ok(("csv", options, cloud_options).into_py_any(py)?)
32 },
33 #[cfg(feature = "parquet")]
34 FileScan::Parquet { options, .. } => {
35 let options = serde_json::to_string(options)
36 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
37 let cloud_options = serde_json::to_string(cloud_options)
38 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
39 Ok(("parquet", options, cloud_options).into_py_any(py)?)
40 },
41 #[cfg(feature = "ipc")]
42 FileScan::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
43 #[cfg(feature = "json")]
44 FileScan::NDJson { options, .. } => {
45 let options = serde_json::to_string(options)
46 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
47 Ok(("ndjson", options).into_py_any(py)?)
48 },
49 FileScan::PythonDataset { .. } => {
50 Err(PyNotImplementedError::new_err("python dataset scan"))
51 },
52 FileScan::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
53 }
54}
55
56#[pyclass]
57pub struct PythonScan {
59 #[pyo3(get)]
60 options: PyObject,
61}
62
63#[pyclass]
64pub struct Slice {
66 #[pyo3(get)]
67 input: usize,
68 #[pyo3(get)]
69 offset: i64,
70 #[pyo3(get)]
71 len: IdxSize,
72}
73
74#[pyclass]
75pub struct Filter {
77 #[pyo3(get)]
78 input: usize,
79 #[pyo3(get)]
80 predicate: PyExprIR,
81}
82
83#[pyclass]
84#[derive(Clone)]
85pub struct PyFileOptions {
86 inner: UnifiedScanArgs,
87}
88
89#[pymethods]
90impl PyFileOptions {
91 #[getter]
92 fn n_rows(&self) -> Option<(i64, usize)> {
93 self.inner
94 .pre_slice
95 .clone()
96 .map(|slice| <(i64, usize)>::try_from(slice).unwrap())
97 }
98 #[getter]
99 fn with_columns(&self) -> Option<Vec<&str>> {
100 self.inner
101 .projection
102 .as_ref()?
103 .iter()
104 .map(|x| x.as_str())
105 .collect::<Vec<_>>()
106 .into()
107 }
108 #[getter]
109 fn cache(&self, _py: Python<'_>) -> bool {
110 self.inner.cache
111 }
112 #[getter]
113 fn row_index(&self) -> Option<(&str, IdxSize)> {
114 self.inner
115 .row_index
116 .as_ref()
117 .map(|n| (n.name.as_str(), n.offset))
118 }
119 #[getter]
120 fn rechunk(&self, _py: Python<'_>) -> bool {
121 self.inner.rechunk
122 }
123 #[getter]
124 fn hive_options(&self, _py: Python<'_>) -> PyResult<PyObject> {
125 Err(PyNotImplementedError::new_err("hive options"))
126 }
127 #[getter]
128 fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
129 self.inner.include_file_paths.as_deref()
130 }
131
132 #[getter]
136 fn deletion_files(&self, py: Python<'_>) -> PyResult<PyObject> {
137 Ok(match &self.inner.deletion_files {
138 None => py.None().into_any(),
139
140 Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
141 let out = PyDict::new(py);
142
143 for (k, v) in paths.iter() {
144 out.set_item(*k, v.as_ref())?;
145 }
146
147 ("iceberg-position-delete", out)
148 .into_pyobject(py)?
149 .into_any()
150 .unbind()
151 },
152 })
153 }
154}
155
156#[pyclass]
157pub struct Scan {
159 #[pyo3(get)]
160 paths: PyObject,
161 #[pyo3(get)]
162 file_info: PyObject,
163 #[pyo3(get)]
164 predicate: Option<PyExprIR>,
165 #[pyo3(get)]
166 file_options: PyFileOptions,
167 #[pyo3(get)]
168 scan_type: PyObject,
169}
170
171#[pyclass]
172pub struct DataFrameScan {
174 #[pyo3(get)]
175 df: PyDataFrame,
176 #[pyo3(get)]
177 projection: PyObject,
178 #[pyo3(get)]
179 selection: Option<PyExprIR>,
180}
181
182#[pyclass]
183pub struct SimpleProjection {
185 #[pyo3(get)]
186 input: usize,
187}
188
189#[pyclass]
190pub struct Select {
192 #[pyo3(get)]
193 input: usize,
194 #[pyo3(get)]
195 expr: Vec<PyExprIR>,
196 #[pyo3(get)]
197 should_broadcast: bool,
198}
199
200#[pyclass]
201pub struct Sort {
203 #[pyo3(get)]
204 input: usize,
205 #[pyo3(get)]
206 by_column: Vec<PyExprIR>,
207 #[pyo3(get)]
208 sort_options: (bool, Vec<bool>, Vec<bool>),
209 #[pyo3(get)]
210 slice: Option<(i64, usize)>,
211}
212
213#[pyclass]
214pub struct Cache {
216 #[pyo3(get)]
217 input: usize,
218 #[pyo3(get)]
219 id_: usize,
220 #[pyo3(get)]
221 cache_hits: u32,
222}
223
224#[pyclass]
225pub struct GroupBy {
227 #[pyo3(get)]
228 input: usize,
229 #[pyo3(get)]
230 keys: Vec<PyExprIR>,
231 #[pyo3(get)]
232 aggs: Vec<PyExprIR>,
233 #[pyo3(get)]
234 apply: (),
235 #[pyo3(get)]
236 maintain_order: bool,
237 #[pyo3(get)]
238 options: PyObject,
239}
240
241#[pyclass]
242pub struct Join {
244 #[pyo3(get)]
245 input_left: usize,
246 #[pyo3(get)]
247 input_right: usize,
248 #[pyo3(get)]
249 left_on: Vec<PyExprIR>,
250 #[pyo3(get)]
251 right_on: Vec<PyExprIR>,
252 #[pyo3(get)]
253 options: PyObject,
254}
255
256#[pyclass]
257pub struct MergeSorted {
259 #[pyo3(get)]
260 input_left: usize,
261 #[pyo3(get)]
262 input_right: usize,
263 #[pyo3(get)]
264 key: String,
265}
266
267#[pyclass]
268pub struct HStack {
270 #[pyo3(get)]
271 input: usize,
272 #[pyo3(get)]
273 exprs: Vec<PyExprIR>,
274 #[pyo3(get)]
275 should_broadcast: bool,
276}
277
278#[pyclass]
279pub struct Reduce {
281 #[pyo3(get)]
282 input: usize,
283 #[pyo3(get)]
284 exprs: Vec<PyExprIR>,
285}
286
287#[pyclass]
288pub struct Distinct {
290 #[pyo3(get)]
291 input: usize,
292 #[pyo3(get)]
293 options: PyObject,
294}
295#[pyclass]
296pub struct MapFunction {
298 #[pyo3(get)]
299 input: usize,
300 #[pyo3(get)]
301 function: PyObject,
302}
303#[pyclass]
304pub struct Union {
305 #[pyo3(get)]
306 inputs: Vec<usize>,
307 #[pyo3(get)]
308 options: Option<(i64, usize)>,
309}
310#[pyclass]
311pub struct HConcat {
313 #[pyo3(get)]
314 inputs: Vec<usize>,
315 #[pyo3(get)]
316 options: (),
317}
318#[pyclass]
319pub struct ExtContext {
321 #[pyo3(get)]
322 input: usize,
323 #[pyo3(get)]
324 contexts: Vec<usize>,
325}
326
327#[pyclass]
328pub struct Sink {
329 #[pyo3(get)]
330 input: usize,
331 #[pyo3(get)]
332 payload: PyObject,
333}
334
335pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
336 match plan {
337 IR::PythonScan { options } => {
338 let python_src = match options.python_source {
339 PythonScanSource::Pyarrow => "pyarrow",
340 PythonScanSource::Cuda => "cuda",
341 PythonScanSource::IOPlugin => "io_plugin",
342 };
343
344 PythonScan {
345 options: (
346 options
347 .scan_fn
348 .as_ref()
349 .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
350 options.with_columns.as_ref().map_or_else(
351 || Ok(py.None()),
352 |cols| {
353 cols.iter()
354 .map(|x| x.as_str())
355 .collect::<Vec<_>>()
356 .into_py_any(py)
357 },
358 )?,
359 python_src,
360 match &options.predicate {
361 PythonPredicate::None => py.None(),
362 PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
363 PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
364 },
365 options
366 .n_rows
367 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
368 )
369 .into_py_any(py)?,
370 }
371 .into_py_any(py)
372 },
373 IR::Slice { input, offset, len } => Slice {
374 input: input.0,
375 offset: *offset,
376 len: *len,
377 }
378 .into_py_any(py),
379 IR::Filter { input, predicate } => Filter {
380 input: input.0,
381 predicate: predicate.into(),
382 }
383 .into_py_any(py),
384 IR::Scan {
385 hive_parts: Some(_),
386 ..
387 } => Err(PyNotImplementedError::new_err(
388 "scan with hive partitioning",
389 )),
390 IR::Scan {
391 sources,
392 file_info: _,
393 hive_parts: _,
394 predicate,
395 output_schema: _,
396 scan_type,
397 unified_scan_args,
398 id: _,
399 } => {
400 Scan {
401 paths: {
402 let paths = sources
403 .into_paths()
404 .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
405
406 let out = PyList::new(py, [] as [(); 0])?;
407
408 for path in paths.iter() {
411 if let Some(path) = path.to_str() {
412 out.append(path)?
413 } else {
414 out.append(path)?
415 }
416 }
417
418 out.into_py_any(py)?
419 },
420 file_info: py.None(),
422 predicate: predicate.as_ref().map(|e| e.into()),
423 file_options: PyFileOptions {
424 inner: (**unified_scan_args).clone(),
425 },
426 scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
427 }
428 }
429 .into_py_any(py),
430 IR::DataFrameScan {
431 df,
432 schema: _,
433 output_schema,
434 } => DataFrameScan {
435 df: PyDataFrame::new((**df).clone()),
436 projection: output_schema.as_ref().map_or_else(
437 || Ok(py.None()),
438 |s| {
439 s.iter_names()
440 .map(|s| s.as_str())
441 .collect::<Vec<_>>()
442 .into_py_any(py)
443 },
444 )?,
445 selection: None,
446 }
447 .into_py_any(py),
448 IR::SimpleProjection { input, columns: _ } => {
449 SimpleProjection { input: input.0 }.into_py_any(py)
450 },
451 IR::Select {
452 input,
453 expr,
454 schema: _,
455 options,
456 } => Select {
457 expr: expr.iter().map(|e| e.into()).collect(),
458 input: input.0,
459 should_broadcast: options.should_broadcast,
460 }
461 .into_py_any(py),
462 IR::Sort {
463 input,
464 by_column,
465 slice,
466 sort_options,
467 } => Sort {
468 input: input.0,
469 by_column: by_column.iter().map(|e| e.into()).collect(),
470 sort_options: (
471 sort_options.maintain_order,
472 sort_options.nulls_last.clone(),
473 sort_options.descending.clone(),
474 ),
475 slice: *slice,
476 }
477 .into_py_any(py),
478 IR::Cache {
479 input,
480 id,
481 cache_hits,
482 } => Cache {
483 input: input.0,
484 id_: id.to_usize(),
485 cache_hits: *cache_hits,
486 }
487 .into_py_any(py),
488 IR::GroupBy {
489 input,
490 keys,
491 aggs,
492 schema: _,
493 apply,
494 maintain_order,
495 options,
496 } => GroupBy {
497 input: input.0,
498 keys: keys.iter().map(|e| e.into()).collect(),
499 aggs: aggs.iter().map(|e| e.into()).collect(),
500 apply: apply.as_ref().map_or(Ok(()), |_| {
501 Err(PyNotImplementedError::new_err(format!(
502 "apply inside GroupBy {plan:?}"
503 )))
504 })?,
505 maintain_order: *maintain_order,
506 options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
507 }
508 .into_py_any(py),
509 IR::Join {
510 input_left,
511 input_right,
512 schema: _,
513 left_on,
514 right_on,
515 options,
516 } => Join {
517 input_left: input_left.0,
518 input_right: input_right.0,
519 left_on: left_on.iter().map(|e| e.into()).collect(),
520 right_on: right_on.iter().map(|e| e.into()).collect(),
521 options: {
522 let how = &options.args.how;
523 let name = Into::<&str>::into(how).into_pyobject(py)?;
524 (
525 match how {
526 #[cfg(feature = "asof_join")]
527 JoinType::AsOf(_) => {
528 return Err(PyNotImplementedError::new_err("asof join"));
529 },
530 #[cfg(feature = "iejoin")]
531 JoinType::IEJoin => {
532 let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
533 else {
534 unreachable!()
535 };
536 (
537 name,
538 crate::Wrap(ie_options.operator1).into_py_any(py)?,
539 ie_options.operator2.as_ref().map_or_else(
540 || Ok(py.None()),
541 |op| crate::Wrap(*op).into_py_any(py),
542 )?,
543 )
544 .into_py_any(py)?
545 },
546 JoinType::Cross if options.options.is_some() => {
549 return Err(PyNotImplementedError::new_err("nested loop join"));
550 },
551 _ => name.into_any().unbind(),
552 },
553 options.args.nulls_equal,
554 options.args.slice,
555 options.args.suffix().as_str(),
556 options.args.coalesce.coalesce(how),
557 Into::<&str>::into(options.args.maintain_order),
558 )
559 .into_py_any(py)?
560 },
561 }
562 .into_py_any(py),
563 IR::HStack {
564 input,
565 exprs,
566 schema: _,
567 options,
568 } => HStack {
569 input: input.0,
570 exprs: exprs.iter().map(|e| e.into()).collect(),
571 should_broadcast: options.should_broadcast,
572 }
573 .into_py_any(py),
574 IR::Distinct { input, options } => Distinct {
575 input: input.0,
576 options: (
577 Into::<&str>::into(options.keep_strategy),
578 options.subset.as_ref().map_or_else(
579 || Ok(py.None()),
580 |f| {
581 f.iter()
582 .map(|s| s.as_ref())
583 .collect::<Vec<&str>>()
584 .into_py_any(py)
585 },
586 )?,
587 options.maintain_order,
588 options.slice,
589 )
590 .into_py_any(py)?,
591 }
592 .into_py_any(py),
593 IR::MapFunction { input, function } => MapFunction {
594 input: input.0,
595 function: match function {
596 FunctionIR::OpaquePython(_) => {
597 return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
598 },
599 FunctionIR::Opaque {
600 function: _,
601 schema: _,
602 predicate_pd: _,
603 projection_pd: _,
604 streamable: _,
605 fmt_str: _,
606 } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
607 FunctionIR::Unnest { columns } => (
608 "unnest",
609 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
610 )
611 .into_py_any(py)?,
612 FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
613 FunctionIR::Explode { columns, schema: _ } => (
614 "explode",
615 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
616 )
617 .into_py_any(py)?,
618 #[cfg(feature = "pivot")]
619 FunctionIR::Unpivot { args, schema: _ } => (
620 "unpivot",
621 args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
622 args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
623 args.variable_name
624 .as_ref()
625 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
626 args.value_name
627 .as_ref()
628 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
629 )
630 .into_py_any(py)?,
631 FunctionIR::RowIndex {
632 name,
633 schema: _,
634 offset,
635 } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
636 FunctionIR::FastCount {
637 sources,
638 scan_type,
639 cloud_options,
640 alias,
641 } => {
642 let sources = sources
643 .into_paths()
644 .ok_or_else(|| {
645 PyNotImplementedError::new_err("FastCount with BytesIO sources")
646 })?
647 .into_py_any(py)?;
648
649 let scan_type = scan_type_to_pyobject(py, scan_type, cloud_options)?;
650
651 let alias = alias
652 .as_ref()
653 .map(|a| a.as_str())
654 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
655
656 ("fast_count", sources, scan_type, alias).into_py_any(py)?
657 },
658 },
659 }
660 .into_py_any(py),
661 IR::Union { inputs, options } => Union {
662 inputs: inputs.iter().map(|n| n.0).collect(),
663 options: options.slice,
665 }
666 .into_py_any(py),
667 IR::HConcat {
668 inputs,
669 schema: _,
670 options: _,
671 } => HConcat {
672 inputs: inputs.iter().map(|n| n.0).collect(),
673 options: (),
674 }
675 .into_py_any(py),
676 IR::ExtContext {
677 input,
678 contexts,
679 schema: _,
680 } => ExtContext {
681 input: input.0,
682 contexts: contexts.iter().map(|n| n.0).collect(),
683 }
684 .into_py_any(py),
685 IR::Sink { input, payload } => Sink {
686 input: input.0,
687 payload: PyString::new(
688 py,
689 &serde_json::to_string(payload)
690 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
691 )
692 .into(),
693 }
694 .into_py_any(py),
695 IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
696 "Not expecting to see a SinkMultiple node",
697 )),
698 #[cfg(feature = "merge_sorted")]
699 IR::MergeSorted {
700 input_left,
701 input_right,
702 key,
703 } => MergeSorted {
704 input_left: input_left.0,
705 input_right: input_right.0,
706 key: key.to_string(),
707 }
708 .into_py_any(py),
709 IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
710 }
711}