1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars::prelude::python_dsl::PythonScanSource;
4use polars_core::prelude::IdxSize;
5use polars_io::cloud::CloudOptions;
6use polars_ops::prelude::JoinType;
7use polars_plan::plans::IR;
8use polars_plan::prelude::{FileScan, FunctionIR, PythonPredicate, UnifiedScanArgs};
9use pyo3::IntoPyObjectExt;
10use pyo3::exceptions::{PyNotImplementedError, PyValueError};
11use pyo3::prelude::*;
12use pyo3::types::PyString;
13
14use super::expr_nodes::PyGroupbyOptions;
15use crate::PyDataFrame;
16use crate::lazyframe::visit::PyExprIR;
17
18fn scan_type_to_pyobject(
19 py: Python<'_>,
20 scan_type: &FileScan,
21 cloud_options: &Option<CloudOptions>,
22) -> PyResult<PyObject> {
23 match scan_type {
24 #[cfg(feature = "csv")]
25 FileScan::Csv { options } => {
26 let options = serde_json::to_string(options)
27 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
28 let cloud_options = serde_json::to_string(cloud_options)
29 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30 Ok(("csv", options, cloud_options).into_py_any(py)?)
31 },
32 #[cfg(feature = "parquet")]
33 FileScan::Parquet { options, .. } => {
34 let options = serde_json::to_string(options)
35 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
36 let cloud_options = serde_json::to_string(cloud_options)
37 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38 Ok(("parquet", options, cloud_options).into_py_any(py)?)
39 },
40 #[cfg(feature = "ipc")]
41 FileScan::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
42 #[cfg(feature = "json")]
43 FileScan::NDJson { options, .. } => {
44 let options = serde_json::to_string(options)
45 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
46 Ok(("ndjson", options).into_py_any(py)?)
47 },
48 FileScan::PythonDataset { .. } => {
49 Err(PyNotImplementedError::new_err("python dataset scan"))
50 },
51 FileScan::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
52 }
53}
54
55#[pyclass]
56pub struct PythonScan {
58 #[pyo3(get)]
59 options: PyObject,
60}
61
62#[pyclass]
63pub struct Slice {
65 #[pyo3(get)]
66 input: usize,
67 #[pyo3(get)]
68 offset: i64,
69 #[pyo3(get)]
70 len: IdxSize,
71}
72
73#[pyclass]
74pub struct Filter {
76 #[pyo3(get)]
77 input: usize,
78 #[pyo3(get)]
79 predicate: PyExprIR,
80}
81
82#[pyclass]
83#[derive(Clone)]
84pub struct PyFileOptions {
85 inner: UnifiedScanArgs,
86}
87
88#[pymethods]
89impl PyFileOptions {
90 #[getter]
91 fn n_rows(&self) -> Option<(i64, usize)> {
92 self.inner
93 .pre_slice
94 .clone()
95 .map(|slice| <(i64, usize)>::try_from(slice).unwrap())
96 }
97 #[getter]
98 fn with_columns(&self) -> Option<Vec<&str>> {
99 self.inner
100 .projection
101 .as_ref()?
102 .iter()
103 .map(|x| x.as_str())
104 .collect::<Vec<_>>()
105 .into()
106 }
107 #[getter]
108 fn cache(&self, _py: Python<'_>) -> bool {
109 self.inner.cache
110 }
111 #[getter]
112 fn row_index(&self) -> Option<(&str, IdxSize)> {
113 self.inner
114 .row_index
115 .as_ref()
116 .map(|n| (n.name.as_str(), n.offset))
117 }
118 #[getter]
119 fn rechunk(&self, _py: Python<'_>) -> bool {
120 self.inner.rechunk
121 }
122 #[getter]
123 fn hive_options(&self, _py: Python<'_>) -> PyResult<PyObject> {
124 Err(PyNotImplementedError::new_err("hive options"))
125 }
126 #[getter]
127 fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
128 self.inner.include_file_paths.as_deref()
129 }
130}
131
132#[pyclass]
133pub struct Scan {
135 #[pyo3(get)]
136 paths: PyObject,
137 #[pyo3(get)]
138 file_info: PyObject,
139 #[pyo3(get)]
140 predicate: Option<PyExprIR>,
141 #[pyo3(get)]
142 file_options: PyFileOptions,
143 #[pyo3(get)]
144 scan_type: PyObject,
145}
146
147#[pyclass]
148pub struct DataFrameScan {
150 #[pyo3(get)]
151 df: PyDataFrame,
152 #[pyo3(get)]
153 projection: PyObject,
154 #[pyo3(get)]
155 selection: Option<PyExprIR>,
156}
157
158#[pyclass]
159pub struct SimpleProjection {
161 #[pyo3(get)]
162 input: usize,
163}
164
165#[pyclass]
166pub struct Select {
168 #[pyo3(get)]
169 input: usize,
170 #[pyo3(get)]
171 expr: Vec<PyExprIR>,
172 #[pyo3(get)]
173 should_broadcast: bool,
174}
175
176#[pyclass]
177pub struct Sort {
179 #[pyo3(get)]
180 input: usize,
181 #[pyo3(get)]
182 by_column: Vec<PyExprIR>,
183 #[pyo3(get)]
184 sort_options: (bool, Vec<bool>, Vec<bool>),
185 #[pyo3(get)]
186 slice: Option<(i64, usize)>,
187}
188
189#[pyclass]
190pub struct Cache {
192 #[pyo3(get)]
193 input: usize,
194 #[pyo3(get)]
195 id_: usize,
196 #[pyo3(get)]
197 cache_hits: u32,
198}
199
200#[pyclass]
201pub struct GroupBy {
203 #[pyo3(get)]
204 input: usize,
205 #[pyo3(get)]
206 keys: Vec<PyExprIR>,
207 #[pyo3(get)]
208 aggs: Vec<PyExprIR>,
209 #[pyo3(get)]
210 apply: (),
211 #[pyo3(get)]
212 maintain_order: bool,
213 #[pyo3(get)]
214 options: PyObject,
215}
216
217#[pyclass]
218pub struct Join {
220 #[pyo3(get)]
221 input_left: usize,
222 #[pyo3(get)]
223 input_right: usize,
224 #[pyo3(get)]
225 left_on: Vec<PyExprIR>,
226 #[pyo3(get)]
227 right_on: Vec<PyExprIR>,
228 #[pyo3(get)]
229 options: PyObject,
230}
231
232#[pyclass]
233pub struct MergeSorted {
235 #[pyo3(get)]
236 input_left: usize,
237 #[pyo3(get)]
238 input_right: usize,
239 #[pyo3(get)]
240 key: String,
241}
242
243#[pyclass]
244pub struct HStack {
246 #[pyo3(get)]
247 input: usize,
248 #[pyo3(get)]
249 exprs: Vec<PyExprIR>,
250 #[pyo3(get)]
251 should_broadcast: bool,
252}
253
254#[pyclass]
255pub struct Reduce {
257 #[pyo3(get)]
258 input: usize,
259 #[pyo3(get)]
260 exprs: Vec<PyExprIR>,
261}
262
263#[pyclass]
264pub struct Distinct {
266 #[pyo3(get)]
267 input: usize,
268 #[pyo3(get)]
269 options: PyObject,
270}
271#[pyclass]
272pub struct MapFunction {
274 #[pyo3(get)]
275 input: usize,
276 #[pyo3(get)]
277 function: PyObject,
278}
279#[pyclass]
280pub struct Union {
281 #[pyo3(get)]
282 inputs: Vec<usize>,
283 #[pyo3(get)]
284 options: Option<(i64, usize)>,
285}
286#[pyclass]
287pub struct HConcat {
289 #[pyo3(get)]
290 inputs: Vec<usize>,
291 #[pyo3(get)]
292 options: (),
293}
294#[pyclass]
295pub struct ExtContext {
297 #[pyo3(get)]
298 input: usize,
299 #[pyo3(get)]
300 contexts: Vec<usize>,
301}
302
303#[pyclass]
304pub struct Sink {
305 #[pyo3(get)]
306 input: usize,
307 #[pyo3(get)]
308 payload: PyObject,
309}
310
311pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
312 match plan {
313 IR::PythonScan { options } => {
314 let python_src = match options.python_source {
315 PythonScanSource::Pyarrow => "pyarrow",
316 PythonScanSource::Cuda => "cuda",
317 PythonScanSource::IOPlugin => "io_plugin",
318 };
319
320 PythonScan {
321 options: (
322 options
323 .scan_fn
324 .as_ref()
325 .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
326 options.with_columns.as_ref().map_or_else(
327 || Ok(py.None()),
328 |cols| {
329 cols.iter()
330 .map(|x| x.as_str())
331 .collect::<Vec<_>>()
332 .into_py_any(py)
333 },
334 )?,
335 python_src,
336 match &options.predicate {
337 PythonPredicate::None => py.None(),
338 PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
339 PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
340 },
341 options
342 .n_rows
343 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
344 )
345 .into_py_any(py)?,
346 }
347 .into_py_any(py)
348 },
349 IR::Slice { input, offset, len } => Slice {
350 input: input.0,
351 offset: *offset,
352 len: *len,
353 }
354 .into_py_any(py),
355 IR::Filter { input, predicate } => Filter {
356 input: input.0,
357 predicate: predicate.into(),
358 }
359 .into_py_any(py),
360 IR::Scan {
361 hive_parts: Some(_),
362 ..
363 } => Err(PyNotImplementedError::new_err(
364 "scan with hive partitioning",
365 )),
366 IR::Scan {
367 sources,
368 file_info: _,
369 hive_parts: _,
370 predicate,
371 output_schema: _,
372 scan_type,
373 unified_scan_args,
374 id: _,
375 } => Scan {
376 paths: sources
377 .into_paths()
378 .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?
379 .into_py_any(py)?,
380 file_info: py.None(),
382 predicate: predicate.as_ref().map(|e| e.into()),
383 file_options: PyFileOptions {
384 inner: (**unified_scan_args).clone(),
385 },
386 scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
387 }
388 .into_py_any(py),
389 IR::DataFrameScan {
390 df,
391 schema: _,
392 output_schema,
393 } => DataFrameScan {
394 df: PyDataFrame::new((**df).clone()),
395 projection: output_schema.as_ref().map_or_else(
396 || Ok(py.None()),
397 |s| {
398 s.iter_names()
399 .map(|s| s.as_str())
400 .collect::<Vec<_>>()
401 .into_py_any(py)
402 },
403 )?,
404 selection: None,
405 }
406 .into_py_any(py),
407 IR::SimpleProjection { input, columns: _ } => {
408 SimpleProjection { input: input.0 }.into_py_any(py)
409 },
410 IR::Select {
411 input,
412 expr,
413 schema: _,
414 options,
415 } => Select {
416 expr: expr.iter().map(|e| e.into()).collect(),
417 input: input.0,
418 should_broadcast: options.should_broadcast,
419 }
420 .into_py_any(py),
421 IR::Sort {
422 input,
423 by_column,
424 slice,
425 sort_options,
426 } => Sort {
427 input: input.0,
428 by_column: by_column.iter().map(|e| e.into()).collect(),
429 sort_options: (
430 sort_options.maintain_order,
431 sort_options.nulls_last.clone(),
432 sort_options.descending.clone(),
433 ),
434 slice: *slice,
435 }
436 .into_py_any(py),
437 IR::Cache {
438 input,
439 id,
440 cache_hits,
441 } => Cache {
442 input: input.0,
443 id_: *id,
444 cache_hits: *cache_hits,
445 }
446 .into_py_any(py),
447 IR::GroupBy {
448 input,
449 keys,
450 aggs,
451 schema: _,
452 apply,
453 maintain_order,
454 options,
455 } => GroupBy {
456 input: input.0,
457 keys: keys.iter().map(|e| e.into()).collect(),
458 aggs: aggs.iter().map(|e| e.into()).collect(),
459 apply: apply.as_ref().map_or(Ok(()), |_| {
460 Err(PyNotImplementedError::new_err(format!(
461 "apply inside GroupBy {:?}",
462 plan
463 )))
464 })?,
465 maintain_order: *maintain_order,
466 options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
467 }
468 .into_py_any(py),
469 IR::Join {
470 input_left,
471 input_right,
472 schema: _,
473 left_on,
474 right_on,
475 options,
476 } => Join {
477 input_left: input_left.0,
478 input_right: input_right.0,
479 left_on: left_on.iter().map(|e| e.into()).collect(),
480 right_on: right_on.iter().map(|e| e.into()).collect(),
481 options: {
482 let how = &options.args.how;
483 let name = Into::<&str>::into(how).into_pyobject(py)?;
484 (
485 match how {
486 #[cfg(feature = "asof_join")]
487 JoinType::AsOf(_) => {
488 return Err(PyNotImplementedError::new_err("asof join"));
489 },
490 #[cfg(feature = "iejoin")]
491 JoinType::IEJoin => {
492 let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
493 else {
494 unreachable!()
495 };
496 (
497 name,
498 crate::Wrap(ie_options.operator1).into_py_any(py)?,
499 ie_options.operator2.as_ref().map_or_else(
500 || Ok(py.None()),
501 |op| crate::Wrap(*op).into_py_any(py),
502 )?,
503 )
504 .into_py_any(py)?
505 },
506 JoinType::Cross if options.options.is_some() => {
509 return Err(PyNotImplementedError::new_err("nested loop join"));
510 },
511 _ => name.into_any().unbind(),
512 },
513 options.args.nulls_equal,
514 options.args.slice,
515 options.args.suffix().as_str(),
516 options.args.coalesce.coalesce(how),
517 Into::<&str>::into(options.args.maintain_order),
518 )
519 .into_py_any(py)?
520 },
521 }
522 .into_py_any(py),
523 IR::HStack {
524 input,
525 exprs,
526 schema: _,
527 options,
528 } => HStack {
529 input: input.0,
530 exprs: exprs.iter().map(|e| e.into()).collect(),
531 should_broadcast: options.should_broadcast,
532 }
533 .into_py_any(py),
534 IR::Distinct { input, options } => Distinct {
535 input: input.0,
536 options: (
537 Into::<&str>::into(options.keep_strategy),
538 options.subset.as_ref().map_or_else(
539 || Ok(py.None()),
540 |f| {
541 f.iter()
542 .map(|s| s.as_ref())
543 .collect::<Vec<&str>>()
544 .into_py_any(py)
545 },
546 )?,
547 options.maintain_order,
548 options.slice,
549 )
550 .into_py_any(py)?,
551 }
552 .into_py_any(py),
553 IR::MapFunction { input, function } => MapFunction {
554 input: input.0,
555 function: match function {
556 FunctionIR::OpaquePython(_) => {
557 return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
558 },
559 FunctionIR::Opaque {
560 function: _,
561 schema: _,
562 predicate_pd: _,
563 projection_pd: _,
564 streamable: _,
565 fmt_str: _,
566 } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
567 FunctionIR::Pipeline {
568 function: _,
569 schema: _,
570 original: _,
571 } => return Err(PyNotImplementedError::new_err("pipeline mapfunction")),
572 FunctionIR::Unnest { columns } => (
573 "unnest",
574 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
575 )
576 .into_py_any(py)?,
577 FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
578 FunctionIR::Rename {
579 existing,
580 new,
581 swapping,
582 schema: _,
583 } => (
584 "rename",
585 existing.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
586 new.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
587 *swapping,
588 )
589 .into_py_any(py)?,
590 FunctionIR::Explode { columns, schema: _ } => (
591 "explode",
592 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
593 )
594 .into_py_any(py)?,
595 #[cfg(feature = "pivot")]
596 FunctionIR::Unpivot { args, schema: _ } => (
597 "unpivot",
598 args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
599 args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
600 args.variable_name
601 .as_ref()
602 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
603 args.value_name
604 .as_ref()
605 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
606 )
607 .into_py_any(py)?,
608 FunctionIR::RowIndex {
609 name,
610 schema: _,
611 offset,
612 } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
613 FunctionIR::FastCount {
614 sources,
615 scan_type,
616 cloud_options,
617 alias,
618 } => {
619 let sources = sources
620 .into_paths()
621 .ok_or_else(|| {
622 PyNotImplementedError::new_err("FastCount with BytesIO sources")
623 })?
624 .into_py_any(py)?;
625
626 let scan_type = scan_type_to_pyobject(py, scan_type, cloud_options)?;
627
628 let alias = alias
629 .as_ref()
630 .map(|a| a.as_str())
631 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
632
633 ("fast_count", sources, scan_type, alias).into_py_any(py)?
634 },
635 },
636 }
637 .into_py_any(py),
638 IR::Union { inputs, options } => Union {
639 inputs: inputs.iter().map(|n| n.0).collect(),
640 options: options.slice,
642 }
643 .into_py_any(py),
644 IR::HConcat {
645 inputs,
646 schema: _,
647 options: _,
648 } => HConcat {
649 inputs: inputs.iter().map(|n| n.0).collect(),
650 options: (),
651 }
652 .into_py_any(py),
653 IR::ExtContext {
654 input,
655 contexts,
656 schema: _,
657 } => ExtContext {
658 input: input.0,
659 contexts: contexts.iter().map(|n| n.0).collect(),
660 }
661 .into_py_any(py),
662 IR::Sink { input, payload } => Sink {
663 input: input.0,
664 payload: PyString::new(
665 py,
666 &serde_json::to_string(payload)
667 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
668 )
669 .into(),
670 }
671 .into_py_any(py),
672 IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
673 "Not expecting to see a SinkMultiple node",
674 )),
675 #[cfg(feature = "merge_sorted")]
676 IR::MergeSorted {
677 input_left,
678 input_right,
679 key,
680 } => MergeSorted {
681 input_left: input_left.0,
682 input_right: input_right.0,
683 key: key.to_string(),
684 }
685 .into_py_any(py),
686 IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
687 }
688}