1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars::prelude::python_dsl::PythonScanSource;
4use polars_core::prelude::IdxSize;
5use polars_io::cloud::CloudOptions;
6use polars_ops::prelude::JoinType;
7use polars_plan::plans::IR;
8use polars_plan::prelude::{FileScan, FunctionIR, PythonPredicate, UnifiedScanArgs};
9use pyo3::IntoPyObjectExt;
10use pyo3::exceptions::{PyNotImplementedError, PyValueError};
11use pyo3::prelude::*;
12use pyo3::types::PyString;
13
14use super::expr_nodes::PyGroupbyOptions;
15use crate::PyDataFrame;
16use crate::lazyframe::visit::PyExprIR;
17
18fn scan_type_to_pyobject(
19 py: Python,
20 scan_type: &FileScan,
21 cloud_options: &Option<CloudOptions>,
22) -> PyResult<PyObject> {
23 match scan_type {
24 #[cfg(feature = "csv")]
25 FileScan::Csv { options } => {
26 let options = serde_json::to_string(options)
27 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
28 let cloud_options = serde_json::to_string(cloud_options)
29 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30 Ok(("csv", options, cloud_options).into_py_any(py)?)
31 },
32 #[cfg(feature = "parquet")]
33 FileScan::Parquet { options, .. } => {
34 let options = serde_json::to_string(options)
35 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
36 let cloud_options = serde_json::to_string(cloud_options)
37 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38 Ok(("parquet", options, cloud_options).into_py_any(py)?)
39 },
40 #[cfg(feature = "ipc")]
41 FileScan::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
42 #[cfg(feature = "json")]
43 FileScan::NDJson { options, .. } => {
44 let options = serde_json::to_string(options)
45 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
46 Ok(("ndjson", options).into_py_any(py)?)
47 },
48 FileScan::PythonDataset { .. } => {
49 Err(PyNotImplementedError::new_err("python dataset scan"))
50 },
51 FileScan::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
52 }
53}
54
55#[pyclass]
56pub struct PythonScan {
58 #[pyo3(get)]
59 options: PyObject,
60}
61
62#[pyclass]
63pub struct Slice {
65 #[pyo3(get)]
66 input: usize,
67 #[pyo3(get)]
68 offset: i64,
69 #[pyo3(get)]
70 len: IdxSize,
71}
72
73#[pyclass]
74pub struct Filter {
76 #[pyo3(get)]
77 input: usize,
78 #[pyo3(get)]
79 predicate: PyExprIR,
80}
81
82#[pyclass]
83#[derive(Clone)]
84pub struct PyFileOptions {
85 inner: UnifiedScanArgs,
86}
87
88#[pymethods]
89impl PyFileOptions {
90 #[getter]
91 fn n_rows(&self) -> Option<(i64, usize)> {
92 self.inner
93 .pre_slice
94 .clone()
95 .map(|slice| <(i64, usize)>::try_from(slice).unwrap())
96 }
97 #[getter]
98 fn with_columns(&self) -> Option<Vec<&str>> {
99 self.inner
100 .projection
101 .as_ref()?
102 .iter()
103 .map(|x| x.as_str())
104 .collect::<Vec<_>>()
105 .into()
106 }
107 #[getter]
108 fn cache(&self, _py: Python<'_>) -> bool {
109 self.inner.cache
110 }
111 #[getter]
112 fn row_index(&self) -> Option<(&str, IdxSize)> {
113 self.inner
114 .row_index
115 .as_ref()
116 .map(|n| (n.name.as_str(), n.offset))
117 }
118 #[getter]
119 fn rechunk(&self, _py: Python<'_>) -> bool {
120 self.inner.rechunk
121 }
122 #[getter]
123 fn hive_options(&self, _py: Python<'_>) -> PyResult<PyObject> {
124 Err(PyNotImplementedError::new_err("hive options"))
125 }
126 #[getter]
127 fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
128 self.inner.include_file_paths.as_deref()
129 }
130}
131
132#[pyclass]
133pub struct Scan {
135 #[pyo3(get)]
136 paths: PyObject,
137 #[pyo3(get)]
138 file_info: PyObject,
139 #[pyo3(get)]
140 predicate: Option<PyExprIR>,
141 #[pyo3(get)]
142 file_options: PyFileOptions,
143 #[pyo3(get)]
144 scan_type: PyObject,
145}
146
147#[pyclass]
148pub struct DataFrameScan {
150 #[pyo3(get)]
151 df: PyDataFrame,
152 #[pyo3(get)]
153 projection: PyObject,
154 #[pyo3(get)]
155 selection: Option<PyExprIR>,
156}
157
158#[pyclass]
159pub struct SimpleProjection {
161 #[pyo3(get)]
162 input: usize,
163}
164
165#[pyclass]
166pub struct Select {
168 #[pyo3(get)]
169 input: usize,
170 #[pyo3(get)]
171 expr: Vec<PyExprIR>,
172 #[pyo3(get)]
173 should_broadcast: bool,
174}
175
176#[pyclass]
177pub struct Sort {
179 #[pyo3(get)]
180 input: usize,
181 #[pyo3(get)]
182 by_column: Vec<PyExprIR>,
183 #[pyo3(get)]
184 sort_options: (bool, Vec<bool>, Vec<bool>),
185 #[pyo3(get)]
186 slice: Option<(i64, usize)>,
187}
188
189#[pyclass]
190pub struct Cache {
192 #[pyo3(get)]
193 input: usize,
194 #[pyo3(get)]
195 id_: usize,
196 #[pyo3(get)]
197 cache_hits: u32,
198}
199
200#[pyclass]
201pub struct GroupBy {
203 #[pyo3(get)]
204 input: usize,
205 #[pyo3(get)]
206 keys: Vec<PyExprIR>,
207 #[pyo3(get)]
208 aggs: Vec<PyExprIR>,
209 #[pyo3(get)]
210 apply: (),
211 #[pyo3(get)]
212 maintain_order: bool,
213 #[pyo3(get)]
214 options: PyObject,
215}
216
217#[pyclass]
218pub struct Join {
220 #[pyo3(get)]
221 input_left: usize,
222 #[pyo3(get)]
223 input_right: usize,
224 #[pyo3(get)]
225 left_on: Vec<PyExprIR>,
226 #[pyo3(get)]
227 right_on: Vec<PyExprIR>,
228 #[pyo3(get)]
229 options: PyObject,
230}
231
232#[pyclass]
233pub struct MergeSorted {
235 #[pyo3(get)]
236 input_left: usize,
237 #[pyo3(get)]
238 input_right: usize,
239 #[pyo3(get)]
240 key: String,
241}
242
243#[pyclass]
244pub struct HStack {
246 #[pyo3(get)]
247 input: usize,
248 #[pyo3(get)]
249 exprs: Vec<PyExprIR>,
250 #[pyo3(get)]
251 should_broadcast: bool,
252}
253
254#[pyclass]
255pub struct Reduce {
257 #[pyo3(get)]
258 input: usize,
259 #[pyo3(get)]
260 exprs: Vec<PyExprIR>,
261}
262
263#[pyclass]
264pub struct Distinct {
266 #[pyo3(get)]
267 input: usize,
268 #[pyo3(get)]
269 options: PyObject,
270}
271#[pyclass]
272pub struct MapFunction {
274 #[pyo3(get)]
275 input: usize,
276 #[pyo3(get)]
277 function: PyObject,
278}
279#[pyclass]
280pub struct Union {
281 #[pyo3(get)]
282 inputs: Vec<usize>,
283 #[pyo3(get)]
284 options: Option<(i64, usize)>,
285}
286#[pyclass]
287pub struct HConcat {
289 #[pyo3(get)]
290 inputs: Vec<usize>,
291 #[pyo3(get)]
292 options: (),
293}
294#[pyclass]
295pub struct ExtContext {
297 #[pyo3(get)]
298 input: usize,
299 #[pyo3(get)]
300 contexts: Vec<usize>,
301}
302
303#[pyclass]
304pub struct Sink {
305 #[pyo3(get)]
306 input: usize,
307 #[pyo3(get)]
308 payload: PyObject,
309}
310
311pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
312 match plan {
313 IR::PythonScan { options } => {
314 let python_src = match options.python_source {
315 PythonScanSource::Pyarrow => "pyarrow",
316 PythonScanSource::Cuda => "cuda",
317 PythonScanSource::IOPlugin => "io_plugin",
318 };
319
320 PythonScan {
321 options: (
322 options
323 .scan_fn
324 .as_ref()
325 .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
326 options.with_columns.as_ref().map_or_else(
327 || Ok(py.None()),
328 |cols| {
329 cols.iter()
330 .map(|x| x.as_str())
331 .collect::<Vec<_>>()
332 .into_py_any(py)
333 },
334 )?,
335 python_src,
336 match &options.predicate {
337 PythonPredicate::None => py.None(),
338 PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
339 PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
340 },
341 options
342 .n_rows
343 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
344 )
345 .into_py_any(py)?,
346 }
347 .into_py_any(py)
348 },
349 IR::Slice { input, offset, len } => Slice {
350 input: input.0,
351 offset: *offset,
352 len: *len,
353 }
354 .into_py_any(py),
355 IR::Filter { input, predicate } => Filter {
356 input: input.0,
357 predicate: predicate.into(),
358 }
359 .into_py_any(py),
360 IR::Scan {
361 hive_parts: Some(_),
362 ..
363 } => Err(PyNotImplementedError::new_err(
364 "scan with hive partitioning",
365 )),
366 IR::Scan {
367 sources,
368 file_info: _,
369 hive_parts: _,
370 predicate,
371 output_schema: _,
372 scan_type,
373 unified_scan_args,
374 } => Scan {
375 paths: sources
376 .into_paths()
377 .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?
378 .into_py_any(py)?,
379 file_info: py.None(),
381 predicate: predicate.as_ref().map(|e| e.into()),
382 file_options: PyFileOptions {
383 inner: (**unified_scan_args).clone(),
384 },
385 scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
386 }
387 .into_py_any(py),
388 IR::DataFrameScan {
389 df,
390 schema: _,
391 output_schema,
392 } => DataFrameScan {
393 df: PyDataFrame::new((**df).clone()),
394 projection: output_schema.as_ref().map_or_else(
395 || Ok(py.None()),
396 |s| {
397 s.iter_names()
398 .map(|s| s.as_str())
399 .collect::<Vec<_>>()
400 .into_py_any(py)
401 },
402 )?,
403 selection: None,
404 }
405 .into_py_any(py),
406 IR::SimpleProjection { input, columns: _ } => {
407 SimpleProjection { input: input.0 }.into_py_any(py)
408 },
409 IR::Select {
410 input,
411 expr,
412 schema: _,
413 options,
414 } => Select {
415 expr: expr.iter().map(|e| e.into()).collect(),
416 input: input.0,
417 should_broadcast: options.should_broadcast,
418 }
419 .into_py_any(py),
420 IR::Sort {
421 input,
422 by_column,
423 slice,
424 sort_options,
425 } => Sort {
426 input: input.0,
427 by_column: by_column.iter().map(|e| e.into()).collect(),
428 sort_options: (
429 sort_options.maintain_order,
430 sort_options.nulls_last.clone(),
431 sort_options.descending.clone(),
432 ),
433 slice: *slice,
434 }
435 .into_py_any(py),
436 IR::Cache {
437 input,
438 id,
439 cache_hits,
440 } => Cache {
441 input: input.0,
442 id_: *id,
443 cache_hits: *cache_hits,
444 }
445 .into_py_any(py),
446 IR::GroupBy {
447 input,
448 keys,
449 aggs,
450 schema: _,
451 apply,
452 maintain_order,
453 options,
454 } => GroupBy {
455 input: input.0,
456 keys: keys.iter().map(|e| e.into()).collect(),
457 aggs: aggs.iter().map(|e| e.into()).collect(),
458 apply: apply.as_ref().map_or(Ok(()), |_| {
459 Err(PyNotImplementedError::new_err(format!(
460 "apply inside GroupBy {:?}",
461 plan
462 )))
463 })?,
464 maintain_order: *maintain_order,
465 options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
466 }
467 .into_py_any(py),
468 IR::Join {
469 input_left,
470 input_right,
471 schema: _,
472 left_on,
473 right_on,
474 options,
475 } => Join {
476 input_left: input_left.0,
477 input_right: input_right.0,
478 left_on: left_on.iter().map(|e| e.into()).collect(),
479 right_on: right_on.iter().map(|e| e.into()).collect(),
480 options: {
481 let how = &options.args.how;
482 let name = Into::<&str>::into(how).into_pyobject(py)?;
483 (
484 match how {
485 #[cfg(feature = "asof_join")]
486 JoinType::AsOf(_) => {
487 return Err(PyNotImplementedError::new_err("asof join"));
488 },
489 #[cfg(feature = "iejoin")]
490 JoinType::IEJoin => {
491 let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
492 else {
493 unreachable!()
494 };
495 (
496 name,
497 crate::Wrap(ie_options.operator1).into_py_any(py)?,
498 ie_options.operator2.as_ref().map_or_else(
499 || Ok(py.None()),
500 |op| crate::Wrap(*op).into_py_any(py),
501 )?,
502 )
503 .into_py_any(py)?
504 },
505 JoinType::Cross if options.options.is_some() => {
508 return Err(PyNotImplementedError::new_err("nested loop join"));
509 },
510 _ => name.into_any().unbind(),
511 },
512 options.args.nulls_equal,
513 options.args.slice,
514 options.args.suffix().as_str(),
515 options.args.coalesce.coalesce(how),
516 Into::<&str>::into(options.args.maintain_order),
517 )
518 .into_py_any(py)?
519 },
520 }
521 .into_py_any(py),
522 IR::HStack {
523 input,
524 exprs,
525 schema: _,
526 options,
527 } => HStack {
528 input: input.0,
529 exprs: exprs.iter().map(|e| e.into()).collect(),
530 should_broadcast: options.should_broadcast,
531 }
532 .into_py_any(py),
533 IR::Distinct { input, options } => Distinct {
534 input: input.0,
535 options: (
536 Into::<&str>::into(options.keep_strategy),
537 options.subset.as_ref().map_or_else(
538 || Ok(py.None()),
539 |f| {
540 f.iter()
541 .map(|s| s.as_ref())
542 .collect::<Vec<&str>>()
543 .into_py_any(py)
544 },
545 )?,
546 options.maintain_order,
547 options.slice,
548 )
549 .into_py_any(py)?,
550 }
551 .into_py_any(py),
552 IR::MapFunction { input, function } => MapFunction {
553 input: input.0,
554 function: match function {
555 FunctionIR::OpaquePython(_) => {
556 return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
557 },
558 FunctionIR::Opaque {
559 function: _,
560 schema: _,
561 predicate_pd: _,
562 projection_pd: _,
563 streamable: _,
564 fmt_str: _,
565 } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
566 FunctionIR::Pipeline {
567 function: _,
568 schema: _,
569 original: _,
570 } => return Err(PyNotImplementedError::new_err("pipeline mapfunction")),
571 FunctionIR::Unnest { columns } => (
572 "unnest",
573 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
574 )
575 .into_py_any(py)?,
576 FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
577 FunctionIR::Rename {
578 existing,
579 new,
580 swapping,
581 schema: _,
582 } => (
583 "rename",
584 existing.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
585 new.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
586 *swapping,
587 )
588 .into_py_any(py)?,
589 FunctionIR::Explode { columns, schema: _ } => (
590 "explode",
591 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
592 )
593 .into_py_any(py)?,
594 #[cfg(feature = "pivot")]
595 FunctionIR::Unpivot { args, schema: _ } => (
596 "unpivot",
597 args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
598 args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
599 args.variable_name
600 .as_ref()
601 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
602 args.value_name
603 .as_ref()
604 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
605 )
606 .into_py_any(py)?,
607 FunctionIR::RowIndex {
608 name,
609 schema: _,
610 offset,
611 } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
612 FunctionIR::FastCount {
613 sources,
614 scan_type,
615 cloud_options,
616 alias,
617 } => {
618 let sources = sources
619 .into_paths()
620 .ok_or_else(|| {
621 PyNotImplementedError::new_err("FastCount with BytesIO sources")
622 })?
623 .into_py_any(py)?;
624
625 let scan_type = scan_type_to_pyobject(py, scan_type, cloud_options)?;
626
627 let alias = alias
628 .as_ref()
629 .map(|a| a.as_str())
630 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
631
632 ("fast_count", sources, scan_type, alias).into_py_any(py)?
633 },
634 },
635 }
636 .into_py_any(py),
637 IR::Union { inputs, options } => Union {
638 inputs: inputs.iter().map(|n| n.0).collect(),
639 options: options.slice,
641 }
642 .into_py_any(py),
643 IR::HConcat {
644 inputs,
645 schema: _,
646 options: _,
647 } => HConcat {
648 inputs: inputs.iter().map(|n| n.0).collect(),
649 options: (),
650 }
651 .into_py_any(py),
652 IR::ExtContext {
653 input,
654 contexts,
655 schema: _,
656 } => ExtContext {
657 input: input.0,
658 contexts: contexts.iter().map(|n| n.0).collect(),
659 }
660 .into_py_any(py),
661 IR::Sink { input, payload } => Sink {
662 input: input.0,
663 payload: PyString::new(
664 py,
665 &serde_json::to_string(payload)
666 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
667 )
668 .into(),
669 }
670 .into_py_any(py),
671 IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
672 "Not expecting to see a SinkMultiple node",
673 )),
674 #[cfg(feature = "merge_sorted")]
675 IR::MergeSorted {
676 input_left,
677 input_right,
678 key,
679 } => MergeSorted {
680 input_left: input_left.0,
681 input_right: input_right.0,
682 key: key.to_string(),
683 }
684 .into_py_any(py),
685 IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
686 }
687}