1#[cfg(feature = "iejoin")]
2use polars::prelude::JoinTypeOptionsIR;
3use polars_core::prelude::IdxSize;
4use polars_ops::prelude::JoinType;
5use polars_plan::plans::IR;
6use polars_plan::prelude::{
7 FileCount, FileScan, FileScanOptions, FunctionIR, PythonPredicate, PythonScanSource,
8};
9use pyo3::exceptions::{PyNotImplementedError, PyValueError};
10use pyo3::prelude::*;
11use pyo3::IntoPyObjectExt;
12
13use super::expr_nodes::PyGroupbyOptions;
14use crate::lazyframe::visit::PyExprIR;
15use crate::PyDataFrame;
16
17#[pyclass]
18pub struct PythonScan {
20 #[pyo3(get)]
21 options: PyObject,
22}
23
24#[pyclass]
25pub struct Slice {
27 #[pyo3(get)]
28 input: usize,
29 #[pyo3(get)]
30 offset: i64,
31 #[pyo3(get)]
32 len: IdxSize,
33}
34
35#[pyclass]
36pub struct Filter {
38 #[pyo3(get)]
39 input: usize,
40 #[pyo3(get)]
41 predicate: PyExprIR,
42}
43
44#[pyclass]
45#[derive(Clone)]
46pub struct PyFileOptions {
47 inner: FileScanOptions,
48}
49
50#[pymethods]
51impl PyFileOptions {
52 #[getter]
53 fn n_rows(&self) -> Option<(i64, usize)> {
54 self.inner.slice
55 }
56 #[getter]
57 fn with_columns(&self) -> Option<Vec<&str>> {
58 self.inner
59 .with_columns
60 .as_ref()?
61 .iter()
62 .map(|x| x.as_str())
63 .collect::<Vec<_>>()
64 .into()
65 }
66 #[getter]
67 fn cache(&self, _py: Python<'_>) -> bool {
68 self.inner.cache
69 }
70 #[getter]
71 fn row_index(&self) -> Option<(&str, IdxSize)> {
72 self.inner
73 .row_index
74 .as_ref()
75 .map(|n| (n.name.as_str(), n.offset))
76 }
77 #[getter]
78 fn rechunk(&self, _py: Python<'_>) -> bool {
79 self.inner.rechunk
80 }
81 #[getter]
82 fn file_counter(&self, _py: Python<'_>) -> FileCount {
83 self.inner.file_counter
84 }
85 #[getter]
86 fn hive_options(&self, _py: Python<'_>) -> PyResult<PyObject> {
87 Err(PyNotImplementedError::new_err("hive options"))
88 }
89}
90
91#[pyclass]
92pub struct Scan {
94 #[pyo3(get)]
95 paths: PyObject,
96 #[pyo3(get)]
97 file_info: PyObject,
98 #[pyo3(get)]
99 predicate: Option<PyExprIR>,
100 #[pyo3(get)]
101 file_options: PyFileOptions,
102 #[pyo3(get)]
103 scan_type: PyObject,
104}
105
106#[pyclass]
107pub struct DataFrameScan {
109 #[pyo3(get)]
110 df: PyDataFrame,
111 #[pyo3(get)]
112 projection: PyObject,
113 #[pyo3(get)]
114 selection: Option<PyExprIR>,
115}
116
117#[pyclass]
118pub struct SimpleProjection {
120 #[pyo3(get)]
121 input: usize,
122}
123
124#[pyclass]
125pub struct Select {
127 #[pyo3(get)]
128 input: usize,
129 #[pyo3(get)]
130 expr: Vec<PyExprIR>,
131 #[pyo3(get)]
132 should_broadcast: bool,
133}
134
135#[pyclass]
136pub struct Sort {
138 #[pyo3(get)]
139 input: usize,
140 #[pyo3(get)]
141 by_column: Vec<PyExprIR>,
142 #[pyo3(get)]
143 sort_options: (bool, Vec<bool>, Vec<bool>),
144 #[pyo3(get)]
145 slice: Option<(i64, usize)>,
146}
147
148#[pyclass]
149pub struct Cache {
151 #[pyo3(get)]
152 input: usize,
153 #[pyo3(get)]
154 id_: usize,
155 #[pyo3(get)]
156 cache_hits: u32,
157}
158
159#[pyclass]
160pub struct GroupBy {
162 #[pyo3(get)]
163 input: usize,
164 #[pyo3(get)]
165 keys: Vec<PyExprIR>,
166 #[pyo3(get)]
167 aggs: Vec<PyExprIR>,
168 #[pyo3(get)]
169 apply: (),
170 #[pyo3(get)]
171 maintain_order: bool,
172 #[pyo3(get)]
173 options: PyObject,
174}
175
176#[pyclass]
177pub struct Join {
179 #[pyo3(get)]
180 input_left: usize,
181 #[pyo3(get)]
182 input_right: usize,
183 #[pyo3(get)]
184 left_on: Vec<PyExprIR>,
185 #[pyo3(get)]
186 right_on: Vec<PyExprIR>,
187 #[pyo3(get)]
188 options: PyObject,
189}
190
191#[pyclass]
192pub struct HStack {
194 #[pyo3(get)]
195 input: usize,
196 #[pyo3(get)]
197 exprs: Vec<PyExprIR>,
198 #[pyo3(get)]
199 should_broadcast: bool,
200}
201
202#[pyclass]
203pub struct Reduce {
205 #[pyo3(get)]
206 input: usize,
207 #[pyo3(get)]
208 exprs: Vec<PyExprIR>,
209}
210
211#[pyclass]
212pub struct Distinct {
214 #[pyo3(get)]
215 input: usize,
216 #[pyo3(get)]
217 options: PyObject,
218}
219#[pyclass]
220pub struct MapFunction {
222 #[pyo3(get)]
223 input: usize,
224 #[pyo3(get)]
225 function: PyObject,
226}
227#[pyclass]
228pub struct Union {
229 #[pyo3(get)]
230 inputs: Vec<usize>,
231 #[pyo3(get)]
232 options: Option<(i64, usize)>,
233}
234#[pyclass]
235pub struct HConcat {
237 #[pyo3(get)]
238 inputs: Vec<usize>,
239 #[pyo3(get)]
240 options: (),
241}
242#[pyclass]
243pub struct ExtContext {
245 #[pyo3(get)]
246 input: usize,
247 #[pyo3(get)]
248 contexts: Vec<usize>,
249}
250
251#[pyclass]
252pub struct Sink {
253 #[pyo3(get)]
254 input: usize,
255 #[pyo3(get)]
256 payload: PyObject,
257}
258
259pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
260 match plan {
261 IR::PythonScan { options } => {
262 let python_src = match options.python_source {
263 PythonScanSource::Pyarrow => "pyarrow",
264 PythonScanSource::Cuda => "cuda",
265 PythonScanSource::IOPlugin => "io_plugin",
266 };
267
268 PythonScan {
269 options: (
270 options
271 .scan_fn
272 .as_ref()
273 .map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
274 options.with_columns.as_ref().map_or_else(
275 || Ok(py.None()),
276 |cols| {
277 cols.iter()
278 .map(|x| x.as_str())
279 .collect::<Vec<_>>()
280 .into_py_any(py)
281 },
282 )?,
283 python_src,
284 match &options.predicate {
285 PythonPredicate::None => py.None(),
286 PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
287 PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
288 },
289 options
290 .n_rows
291 .map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
292 )
293 .into_py_any(py)?,
294 }
295 .into_py_any(py)
296 },
297 IR::Slice { input, offset, len } => Slice {
298 input: input.0,
299 offset: *offset,
300 len: *len,
301 }
302 .into_py_any(py),
303 IR::Filter { input, predicate } => Filter {
304 input: input.0,
305 predicate: predicate.into(),
306 }
307 .into_py_any(py),
308 IR::Scan {
309 hive_parts: Some(_),
310 ..
311 } => Err(PyNotImplementedError::new_err(
312 "scan with hive partitioning",
313 )),
314 IR::Scan {
315 sources,
316 file_info: _,
317 hive_parts: _,
318 predicate,
319 output_schema: _,
320 scan_type,
321 file_options,
322 } => Scan {
323 paths: sources
324 .into_paths()
325 .ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?
326 .into_py_any(py)?,
327 file_info: py.None(),
329 predicate: predicate.as_ref().map(|e| e.into()),
330 file_options: PyFileOptions {
331 inner: file_options.clone(),
332 },
333 scan_type: match scan_type {
334 #[cfg(feature = "csv")]
335 FileScan::Csv {
336 options,
337 cloud_options,
338 } => {
339 let options = serde_json::to_string(options)
342 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
343 let cloud_options = serde_json::to_string(cloud_options)
344 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
345 ("csv", options, cloud_options).into_py_any(py)?
346 },
347 #[cfg(feature = "parquet")]
348 FileScan::Parquet {
349 options,
350 cloud_options,
351 ..
352 } => {
353 let options = serde_json::to_string(options)
354 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
355 let cloud_options = serde_json::to_string(cloud_options)
356 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
357 ("parquet", options, cloud_options).into_py_any(py)?
358 },
359 #[cfg(feature = "ipc")]
360 FileScan::Ipc { .. } => return Err(PyNotImplementedError::new_err("ipc scan")),
361 #[cfg(feature = "json")]
362 FileScan::NDJson { options, .. } => {
363 let options = serde_json::to_string(options)
365 .map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
366 ("ndjson", options).into_py_any(py)?
367 },
368 FileScan::Anonymous { .. } => {
369 return Err(PyNotImplementedError::new_err("anonymous scan"))
370 },
371 },
372 }
373 .into_py_any(py),
374 IR::DataFrameScan {
375 df,
376 schema: _,
377 output_schema,
378 } => DataFrameScan {
379 df: PyDataFrame::new((**df).clone()),
380 projection: output_schema.as_ref().map_or_else(
381 || Ok(py.None()),
382 |s| {
383 s.iter_names()
384 .map(|s| s.as_str())
385 .collect::<Vec<_>>()
386 .into_py_any(py)
387 },
388 )?,
389 selection: None,
390 }
391 .into_py_any(py),
392 IR::SimpleProjection { input, columns: _ } => {
393 SimpleProjection { input: input.0 }.into_py_any(py)
394 },
395 IR::Select {
396 input,
397 expr,
398 schema: _,
399 options,
400 } => Select {
401 expr: expr.iter().map(|e| e.into()).collect(),
402 input: input.0,
403 should_broadcast: options.should_broadcast,
404 }
405 .into_py_any(py),
406 IR::Sort {
407 input,
408 by_column,
409 slice,
410 sort_options,
411 } => Sort {
412 input: input.0,
413 by_column: by_column.iter().map(|e| e.into()).collect(),
414 sort_options: (
415 sort_options.maintain_order,
416 sort_options.nulls_last.clone(),
417 sort_options.descending.clone(),
418 ),
419 slice: *slice,
420 }
421 .into_py_any(py),
422 IR::Cache {
423 input,
424 id,
425 cache_hits,
426 } => Cache {
427 input: input.0,
428 id_: *id,
429 cache_hits: *cache_hits,
430 }
431 .into_py_any(py),
432 IR::GroupBy {
433 input,
434 keys,
435 aggs,
436 schema: _,
437 apply,
438 maintain_order,
439 options,
440 } => GroupBy {
441 input: input.0,
442 keys: keys.iter().map(|e| e.into()).collect(),
443 aggs: aggs.iter().map(|e| e.into()).collect(),
444 apply: apply.as_ref().map_or(Ok(()), |_| {
445 Err(PyNotImplementedError::new_err(format!(
446 "apply inside GroupBy {:?}",
447 plan
448 )))
449 })?,
450 maintain_order: *maintain_order,
451 options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
452 }
453 .into_py_any(py),
454 IR::Join {
455 input_left,
456 input_right,
457 schema: _,
458 left_on,
459 right_on,
460 options,
461 } => Join {
462 input_left: input_left.0,
463 input_right: input_right.0,
464 left_on: left_on.iter().map(|e| e.into()).collect(),
465 right_on: right_on.iter().map(|e| e.into()).collect(),
466 options: {
467 let how = &options.args.how;
468 let name = Into::<&str>::into(how).into_pyobject(py)?;
469 (
470 match how {
471 #[cfg(feature = "asof_join")]
472 JoinType::AsOf(_) => {
473 return Err(PyNotImplementedError::new_err("asof join"))
474 },
475 #[cfg(feature = "iejoin")]
476 JoinType::IEJoin => {
477 let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
478 else {
479 unreachable!()
480 };
481 (
482 name,
483 crate::Wrap(ie_options.operator1).into_py_any(py)?,
484 ie_options.operator2.as_ref().map_or_else(
485 || Ok(py.None()),
486 |op| crate::Wrap(*op).into_py_any(py),
487 )?,
488 )
489 .into_py_any(py)?
490 },
491 JoinType::Cross if options.options.is_some() => {
494 return Err(PyNotImplementedError::new_err("nested loop join"))
495 },
496 _ => name.into_any().unbind(),
497 },
498 options.args.join_nulls,
499 options.args.slice,
500 options.args.suffix().as_str(),
501 options.args.coalesce.coalesce(how),
502 Into::<&str>::into(options.args.maintain_order),
503 )
504 .into_py_any(py)?
505 },
506 }
507 .into_py_any(py),
508 IR::HStack {
509 input,
510 exprs,
511 schema: _,
512 options,
513 } => HStack {
514 input: input.0,
515 exprs: exprs.iter().map(|e| e.into()).collect(),
516 should_broadcast: options.should_broadcast,
517 }
518 .into_py_any(py),
519 IR::Distinct { input, options } => Distinct {
520 input: input.0,
521 options: (
522 Into::<&str>::into(options.keep_strategy),
523 options.subset.as_ref().map_or_else(
524 || Ok(py.None()),
525 |f| {
526 f.iter()
527 .map(|s| s.as_ref())
528 .collect::<Vec<&str>>()
529 .into_py_any(py)
530 },
531 )?,
532 options.maintain_order,
533 options.slice,
534 )
535 .into_py_any(py)?,
536 }
537 .into_py_any(py),
538 IR::MapFunction { input, function } => MapFunction {
539 input: input.0,
540 function: match function {
541 FunctionIR::OpaquePython(_) => {
542 return Err(PyNotImplementedError::new_err("opaque python mapfunction"))
543 },
544 FunctionIR::Opaque {
545 function: _,
546 schema: _,
547 predicate_pd: _,
548 projection_pd: _,
549 streamable: _,
550 fmt_str: _,
551 } => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
552 FunctionIR::Pipeline {
553 function: _,
554 schema: _,
555 original: _,
556 } => return Err(PyNotImplementedError::new_err("pipeline mapfunction")),
557 FunctionIR::Unnest { columns } => (
558 "unnest",
559 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
560 )
561 .into_py_any(py)?,
562 FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
563 #[cfg(feature = "merge_sorted")]
564 FunctionIR::MergeSorted { column } => {
565 ("merge_sorted", column.to_string()).into_py_any(py)?
566 },
567 FunctionIR::Rename {
568 existing,
569 new,
570 swapping,
571 schema: _,
572 } => (
573 "rename",
574 existing.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
575 new.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
576 *swapping,
577 )
578 .into_py_any(py)?,
579 FunctionIR::Explode { columns, schema: _ } => (
580 "explode",
581 columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
582 )
583 .into_py_any(py)?,
584 #[cfg(feature = "pivot")]
585 FunctionIR::Unpivot { args, schema: _ } => (
586 "unpivot",
587 args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
588 args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
589 args.variable_name
590 .as_ref()
591 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
592 args.value_name
593 .as_ref()
594 .map_or_else(|| Ok(py.None()), |s| s.as_str().into_py_any(py))?,
595 )
596 .into_py_any(py)?,
597 FunctionIR::RowIndex {
598 name,
599 schema: _,
600 offset,
601 } => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
602 FunctionIR::FastCount {
603 sources: _,
604 scan_type: _,
605 alias: _,
606 } => return Err(PyNotImplementedError::new_err("function count")),
607 },
608 }
609 .into_py_any(py),
610 IR::Union { inputs, options } => Union {
611 inputs: inputs.iter().map(|n| n.0).collect(),
612 options: options.slice,
614 }
615 .into_py_any(py),
616 IR::HConcat {
617 inputs,
618 schema: _,
619 options: _,
620 } => HConcat {
621 inputs: inputs.iter().map(|n| n.0).collect(),
622 options: (),
623 }
624 .into_py_any(py),
625 IR::ExtContext {
626 input,
627 contexts,
628 schema: _,
629 } => ExtContext {
630 input: input.0,
631 contexts: contexts.iter().map(|n| n.0).collect(),
632 }
633 .into_py_any(py),
634 IR::Sink {
635 input: _,
636 payload: _,
637 } => Err(PyNotImplementedError::new_err(
638 "Not expecting to see a Sink node",
639 )),
640 IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
641 }
642}