1use std::collections::HashMap;
19use std::convert::{From, Into};
20use std::sync::Arc;
21
22use datafusion::arrow::datatypes::{DataType, Field};
23use datafusion::arrow::pyarrow::PyArrowType;
24use datafusion::functions::core::expr_ext::FieldAccessor;
25use datafusion::logical_expr::expr::{
26 AggregateFunction, AggregateFunctionParams, FieldMetadata, InList, InSubquery, ScalarFunction,
27 WindowFunction,
28};
29use datafusion::logical_expr::utils::exprlist_to_fields;
30use datafusion::logical_expr::{
31 Between, BinaryExpr, Case, Cast, Expr, ExprFuncBuilder, ExprFunctionExt, Like, LogicalPlan,
32 Operator, TryCast, WindowFunctionDefinition, col, lit, lit_with_metadata,
33};
34use pyo3::IntoPyObjectExt;
35use pyo3::basic::CompareOp;
36use pyo3::prelude::*;
37use window::PyWindowFrame;
38
39use self::alias::PyAlias;
40use self::bool_expr::{
41 PyIsFalse, PyIsNotFalse, PyIsNotNull, PyIsNotTrue, PyIsNotUnknown, PyIsNull, PyIsTrue,
42 PyIsUnknown, PyNegative, PyNot,
43};
44use self::like::{PyILike, PyLike, PySimilarTo};
45use self::scalar_variable::PyScalarVariable;
46use crate::common::data_type::{DataTypeMap, NullTreatment, PyScalarValue, RexType};
47use crate::errors::{PyDataFusionResult, py_runtime_err, py_type_err, py_unsupported_variant_err};
48use crate::expr::aggregate_expr::PyAggregateFunction;
49use crate::expr::binary_expr::PyBinaryExpr;
50use crate::expr::column::PyColumn;
51use crate::expr::literal::PyLiteral;
52use crate::functions::add_builder_fns_to_window;
53use crate::pyarrow_util::scalar_to_pyarrow;
54use crate::sql::logical::PyLogicalPlan;
55
56pub mod aggregate;
57pub mod aggregate_expr;
58pub mod alias;
59pub mod analyze;
60pub mod between;
61pub mod binary_expr;
62pub mod bool_expr;
63pub mod case;
64pub mod cast;
65pub mod column;
66pub mod conditional_expr;
67pub mod copy_to;
68pub mod create_catalog;
69pub mod create_catalog_schema;
70pub mod create_external_table;
71pub mod create_function;
72pub mod create_index;
73pub mod create_memory_table;
74pub mod create_view;
75pub mod describe_table;
76pub mod distinct;
77pub mod dml;
78pub mod drop_catalog_schema;
79pub mod drop_function;
80pub mod drop_table;
81pub mod drop_view;
82pub mod empty_relation;
83pub mod exists;
84pub mod explain;
85pub mod extension;
86pub mod filter;
87pub mod grouping_set;
88pub mod in_list;
89pub mod in_subquery;
90pub mod join;
91pub mod like;
92pub mod limit;
93pub mod literal;
94pub mod logical_node;
95pub mod placeholder;
96pub mod projection;
97pub mod recursive_query;
98pub mod repartition;
99pub mod scalar_subquery;
100pub mod scalar_variable;
101pub mod signature;
102pub mod sort;
103pub mod sort_expr;
104pub mod statement;
105pub mod subquery;
106pub mod subquery_alias;
107pub mod table_scan;
108pub mod union;
109pub mod unnest;
110pub mod unnest_expr;
111pub mod values;
112pub mod window;
113
114use sort_expr::{PySortExpr, to_sort_expressions};
115
116#[pyclass(frozen, name = "RawExpr", module = "datafusion.expr", subclass)]
118#[derive(Debug, Clone)]
119pub struct PyExpr {
120 pub expr: Expr,
121}
122
123impl From<PyExpr> for Expr {
124 fn from(expr: PyExpr) -> Expr {
125 expr.expr
126 }
127}
128
129impl From<Expr> for PyExpr {
130 fn from(expr: Expr) -> PyExpr {
131 PyExpr { expr }
132 }
133}
134
135pub fn py_expr_list(expr: &[Expr]) -> PyResult<Vec<PyExpr>> {
137 Ok(expr.iter().map(|e| PyExpr::from(e.clone())).collect())
138}
139
140#[pymethods]
141impl PyExpr {
142 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
144 Python::attach(|_| match &self.expr {
145 Expr::Alias(alias) => Ok(PyAlias::from(alias.clone()).into_bound_py_any(py)?),
146 Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_bound_py_any(py)?),
147 Expr::ScalarVariable(field, variables) => {
148 Ok(PyScalarVariable::new(field, variables).into_bound_py_any(py)?)
149 }
150 Expr::Like(value) => Ok(PyLike::from(value.clone()).into_bound_py_any(py)?),
151 Expr::Literal(value, metadata) => Ok(PyLiteral::new_with_metadata(
152 value.clone(),
153 metadata.clone(),
154 )
155 .into_bound_py_any(py)?),
156 Expr::BinaryExpr(expr) => Ok(PyBinaryExpr::from(expr.clone()).into_bound_py_any(py)?),
157 Expr::Not(expr) => Ok(PyNot::new(*expr.clone()).into_bound_py_any(py)?),
158 Expr::IsNotNull(expr) => Ok(PyIsNotNull::new(*expr.clone()).into_bound_py_any(py)?),
159 Expr::IsNull(expr) => Ok(PyIsNull::new(*expr.clone()).into_bound_py_any(py)?),
160 Expr::IsTrue(expr) => Ok(PyIsTrue::new(*expr.clone()).into_bound_py_any(py)?),
161 Expr::IsFalse(expr) => Ok(PyIsFalse::new(*expr.clone()).into_bound_py_any(py)?),
162 Expr::IsUnknown(expr) => Ok(PyIsUnknown::new(*expr.clone()).into_bound_py_any(py)?),
163 Expr::IsNotTrue(expr) => Ok(PyIsNotTrue::new(*expr.clone()).into_bound_py_any(py)?),
164 Expr::IsNotFalse(expr) => Ok(PyIsNotFalse::new(*expr.clone()).into_bound_py_any(py)?),
165 Expr::IsNotUnknown(expr) => {
166 Ok(PyIsNotUnknown::new(*expr.clone()).into_bound_py_any(py)?)
167 }
168 Expr::Negative(expr) => Ok(PyNegative::new(*expr.clone()).into_bound_py_any(py)?),
169 Expr::AggregateFunction(expr) => {
170 Ok(PyAggregateFunction::from(expr.clone()).into_bound_py_any(py)?)
171 }
172 Expr::SimilarTo(value) => Ok(PySimilarTo::from(value.clone()).into_bound_py_any(py)?),
173 Expr::Between(value) => {
174 Ok(between::PyBetween::from(value.clone()).into_bound_py_any(py)?)
175 }
176 Expr::Case(value) => Ok(case::PyCase::from(value.clone()).into_bound_py_any(py)?),
177 Expr::Cast(value) => Ok(cast::PyCast::from(value.clone()).into_bound_py_any(py)?),
178 Expr::TryCast(value) => Ok(cast::PyTryCast::from(value.clone()).into_bound_py_any(py)?),
179 Expr::ScalarFunction(value) => Err(py_unsupported_variant_err(format!(
180 "Converting Expr::ScalarFunction to a Python object is not implemented: {value:?}"
181 ))),
182 Expr::WindowFunction(value) => Err(py_unsupported_variant_err(format!(
183 "Converting Expr::WindowFunction to a Python object is not implemented: {value:?}"
184 ))),
185 Expr::InList(value) => {
186 Ok(in_list::PyInList::from(value.clone()).into_bound_py_any(py)?)
187 }
188 Expr::Exists(value) => Ok(exists::PyExists::from(value.clone()).into_bound_py_any(py)?),
189 Expr::InSubquery(value) => {
190 Ok(in_subquery::PyInSubquery::from(value.clone()).into_bound_py_any(py)?)
191 }
192 Expr::ScalarSubquery(value) => {
193 Ok(scalar_subquery::PyScalarSubquery::from(value.clone()).into_bound_py_any(py)?)
194 }
195 #[allow(deprecated)]
196 Expr::Wildcard { qualifier, options } => Err(py_unsupported_variant_err(format!(
197 "Converting Expr::Wildcard to a Python object is not implemented : {qualifier:?} {options:?}"
198 ))),
199 Expr::GroupingSet(value) => {
200 Ok(grouping_set::PyGroupingSet::from(value.clone()).into_bound_py_any(py)?)
201 }
202 Expr::Placeholder(value) => {
203 Ok(placeholder::PyPlaceholder::from(value.clone()).into_bound_py_any(py)?)
204 }
205 Expr::OuterReferenceColumn(data_type, column) => {
206 Err(py_unsupported_variant_err(format!(
207 "Converting Expr::OuterReferenceColumn to a Python object is not implemented: {data_type:?} - {column:?}"
208 )))
209 }
210 Expr::Unnest(value) => {
211 Ok(unnest_expr::PyUnnestExpr::from(value.clone()).into_bound_py_any(py)?)
212 }
213 })
214 }
215
216 fn schema_name(&self) -> PyResult<String> {
219 Ok(format!("{}", self.expr.schema_name()))
220 }
221
222 fn canonical_name(&self) -> PyResult<String> {
224 Ok(format!("{}", self.expr))
225 }
226
227 fn variant_name(&self) -> PyResult<&str> {
230 Ok(self.expr.variant_name())
231 }
232
233 fn __richcmp__(&self, other: PyExpr, op: CompareOp) -> PyExpr {
234 let expr = match op {
235 CompareOp::Lt => self.expr.clone().lt(other.expr),
236 CompareOp::Le => self.expr.clone().lt_eq(other.expr),
237 CompareOp::Eq => self.expr.clone().eq(other.expr),
238 CompareOp::Ne => self.expr.clone().not_eq(other.expr),
239 CompareOp::Gt => self.expr.clone().gt(other.expr),
240 CompareOp::Ge => self.expr.clone().gt_eq(other.expr),
241 };
242 expr.into()
243 }
244
245 fn __repr__(&self) -> PyResult<String> {
246 Ok(format!("Expr({})", self.expr))
247 }
248
249 fn __add__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
250 Ok((self.expr.clone() + rhs.expr).into())
251 }
252
253 fn __sub__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
254 Ok((self.expr.clone() - rhs.expr).into())
255 }
256
257 fn __truediv__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
258 Ok((self.expr.clone() / rhs.expr).into())
259 }
260
261 fn __mul__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
262 Ok((self.expr.clone() * rhs.expr).into())
263 }
264
265 fn __mod__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
266 let expr = self.expr.clone() % rhs.expr;
267 Ok(expr.into())
268 }
269
270 fn __and__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
271 Ok(self.expr.clone().and(rhs.expr).into())
272 }
273
274 fn __or__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
275 Ok(self.expr.clone().or(rhs.expr).into())
276 }
277
278 fn __invert__(&self) -> PyResult<PyExpr> {
279 let expr = !self.expr.clone();
280 Ok(expr.into())
281 }
282
283 fn __getitem__(&self, key: &str) -> PyResult<PyExpr> {
284 Ok(self.expr.clone().field(key).into())
285 }
286
287 #[staticmethod]
288 pub fn literal(value: PyScalarValue) -> PyExpr {
289 lit(value.0).into()
290 }
291
292 #[staticmethod]
293 pub fn literal_with_metadata(
294 value: PyScalarValue,
295 metadata: HashMap<String, String>,
296 ) -> PyExpr {
297 let metadata = FieldMetadata::new(metadata.into_iter().collect());
298 lit_with_metadata(value.0, Some(metadata)).into()
299 }
300
301 #[staticmethod]
302 pub fn column(value: &str) -> PyExpr {
303 col(value).into()
304 }
305
306 #[pyo3(signature = (name, metadata=None))]
308 pub fn alias(&self, name: &str, metadata: Option<HashMap<String, String>>) -> PyExpr {
309 let metadata = metadata.map(|m| FieldMetadata::new(m.into_iter().collect()));
310 self.expr.clone().alias_with_metadata(name, metadata).into()
311 }
312
313 #[pyo3(signature = (ascending=true, nulls_first=true))]
315 pub fn sort(&self, ascending: bool, nulls_first: bool) -> PySortExpr {
316 self.expr.clone().sort(ascending, nulls_first).into()
317 }
318
319 pub fn is_null(&self) -> PyExpr {
320 self.expr.clone().is_null().into()
321 }
322
323 pub fn is_not_null(&self) -> PyExpr {
324 self.expr.clone().is_not_null().into()
325 }
326
327 pub fn cast(&self, to: PyArrowType<DataType>) -> PyExpr {
328 let expr = Expr::Cast(Cast::new(Box::new(self.expr.clone()), to.0));
331 expr.into()
332 }
333
334 #[pyo3(signature = (low, high, negated=false))]
335 pub fn between(&self, low: PyExpr, high: PyExpr, negated: bool) -> PyExpr {
336 let expr = Expr::Between(Between::new(
337 Box::new(self.expr.clone()),
338 negated,
339 Box::new(low.into()),
340 Box::new(high.into()),
341 ));
342 expr.into()
343 }
344
345 pub fn rex_type(&self) -> PyResult<RexType> {
349 Ok(match self.expr {
350 Expr::Alias(..) => RexType::Alias,
351 Expr::Column(..) => RexType::Reference,
352 Expr::ScalarVariable(..) | Expr::Literal(..) => RexType::Literal,
353 Expr::BinaryExpr { .. }
354 | Expr::Not(..)
355 | Expr::IsNotNull(..)
356 | Expr::Negative(..)
357 | Expr::IsNull(..)
358 | Expr::Like { .. }
359 | Expr::SimilarTo { .. }
360 | Expr::Between { .. }
361 | Expr::Case { .. }
362 | Expr::Cast { .. }
363 | Expr::TryCast { .. }
364 | Expr::ScalarFunction { .. }
365 | Expr::AggregateFunction { .. }
366 | Expr::WindowFunction { .. }
367 | Expr::InList { .. }
368 | Expr::Exists { .. }
369 | Expr::InSubquery { .. }
370 | Expr::GroupingSet(..)
371 | Expr::IsTrue(..)
372 | Expr::IsFalse(..)
373 | Expr::IsUnknown(_)
374 | Expr::IsNotTrue(..)
375 | Expr::IsNotFalse(..)
376 | Expr::Placeholder { .. }
377 | Expr::OuterReferenceColumn(_, _)
378 | Expr::Unnest(_)
379 | Expr::IsNotUnknown(_) => RexType::Call,
380 Expr::ScalarSubquery(..) => RexType::ScalarSubquery,
381 #[allow(deprecated)]
382 Expr::Wildcard { .. } => {
383 return Err(py_unsupported_variant_err("Expr::Wildcard is unsupported"));
384 }
385 })
386 }
387
388 pub fn types(&self) -> PyResult<DataTypeMap> {
391 Self::_types(&self.expr)
392 }
393
394 pub fn python_value<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
396 match &self.expr {
397 Expr::Literal(scalar_value, _) => scalar_to_pyarrow(scalar_value, py),
398 _ => Err(py_type_err(format!(
399 "Non Expr::Literal encountered in types: {:?}",
400 &self.expr
401 ))),
402 }
403 }
404
405 pub fn rex_call_operands(&self) -> PyResult<Vec<PyExpr>> {
409 match &self.expr {
410 Expr::Column(..) | Expr::ScalarVariable(..) | Expr::Literal(..) => {
412 Ok(vec![PyExpr::from(self.expr.clone())])
413 }
414
415 Expr::Alias(alias) => Ok(vec![PyExpr::from(*alias.expr.clone())]),
416
417 Expr::Not(expr)
419 | Expr::IsNull(expr)
420 | Expr::IsNotNull(expr)
421 | Expr::IsTrue(expr)
422 | Expr::IsFalse(expr)
423 | Expr::IsUnknown(expr)
424 | Expr::IsNotTrue(expr)
425 | Expr::IsNotFalse(expr)
426 | Expr::IsNotUnknown(expr)
427 | Expr::Negative(expr)
428 | Expr::Cast(Cast { expr, .. })
429 | Expr::TryCast(TryCast { expr, .. })
430 | Expr::InSubquery(InSubquery { expr, .. }) => Ok(vec![PyExpr::from(*expr.clone())]),
431
432 Expr::AggregateFunction(AggregateFunction {
434 params: AggregateFunctionParams { args, .. },
435 ..
436 })
437 | Expr::ScalarFunction(ScalarFunction { args, .. }) => {
438 Ok(args.iter().map(|arg| PyExpr::from(arg.clone())).collect())
439 }
440 Expr::WindowFunction(boxed_window_fn) => {
441 let args = &boxed_window_fn.params.args;
442 Ok(args.iter().map(|arg| PyExpr::from(arg.clone())).collect())
443 }
444
445 Expr::Case(Case {
447 expr,
448 when_then_expr,
449 else_expr,
450 }) => {
451 let mut operands: Vec<PyExpr> = Vec::new();
452
453 if let Some(e) = expr {
454 for (when, then) in when_then_expr {
455 operands.push(PyExpr::from(Expr::BinaryExpr(BinaryExpr::new(
456 Box::new(*e.clone()),
457 Operator::Eq,
458 Box::new(*when.clone()),
459 ))));
460 operands.push(PyExpr::from(*then.clone()));
461 }
462 } else {
463 for (when, then) in when_then_expr {
464 operands.push(PyExpr::from(*when.clone()));
465 operands.push(PyExpr::from(*then.clone()));
466 }
467 };
468
469 if let Some(e) = else_expr {
470 operands.push(PyExpr::from(*e.clone()));
471 };
472
473 Ok(operands)
474 }
475 Expr::InList(InList { expr, list, .. }) => {
476 let mut operands: Vec<PyExpr> = vec![PyExpr::from(*expr.clone())];
477 for list_elem in list {
478 operands.push(PyExpr::from(list_elem.clone()));
479 }
480
481 Ok(operands)
482 }
483 Expr::BinaryExpr(BinaryExpr { left, right, .. }) => Ok(vec![
484 PyExpr::from(*left.clone()),
485 PyExpr::from(*right.clone()),
486 ]),
487 Expr::Like(Like { expr, pattern, .. }) => Ok(vec![
488 PyExpr::from(*expr.clone()),
489 PyExpr::from(*pattern.clone()),
490 ]),
491 Expr::SimilarTo(Like { expr, pattern, .. }) => Ok(vec![
492 PyExpr::from(*expr.clone()),
493 PyExpr::from(*pattern.clone()),
494 ]),
495 Expr::Between(Between {
496 expr,
497 negated: _,
498 low,
499 high,
500 }) => Ok(vec![
501 PyExpr::from(*expr.clone()),
502 PyExpr::from(*low.clone()),
503 PyExpr::from(*high.clone()),
504 ]),
505
506 Expr::GroupingSet(..)
508 | Expr::Unnest(_)
509 | Expr::OuterReferenceColumn(_, _)
510 | Expr::ScalarSubquery(..)
511 | Expr::Placeholder { .. }
512 | Expr::Exists { .. } => Err(py_runtime_err(format!(
513 "Unimplemented Expr type: {}",
514 self.expr
515 ))),
516
517 #[allow(deprecated)]
518 Expr::Wildcard { .. } => {
519 Err(py_unsupported_variant_err("Expr::Wildcard is unsupported"))
520 }
521 }
522 }
523
524 pub fn rex_call_operator(&self) -> PyResult<String> {
526 Ok(match &self.expr {
527 Expr::BinaryExpr(BinaryExpr {
528 left: _,
529 op,
530 right: _,
531 }) => format!("{op}"),
532 Expr::ScalarFunction(ScalarFunction { func, args: _ }) => func.name().to_string(),
533 Expr::Cast { .. } => "cast".to_string(),
534 Expr::Between { .. } => "between".to_string(),
535 Expr::Case { .. } => "case".to_string(),
536 Expr::IsNull(..) => "is null".to_string(),
537 Expr::IsNotNull(..) => "is not null".to_string(),
538 Expr::IsTrue(_) => "is true".to_string(),
539 Expr::IsFalse(_) => "is false".to_string(),
540 Expr::IsUnknown(_) => "is unknown".to_string(),
541 Expr::IsNotTrue(_) => "is not true".to_string(),
542 Expr::IsNotFalse(_) => "is not false".to_string(),
543 Expr::IsNotUnknown(_) => "is not unknown".to_string(),
544 Expr::InList { .. } => "in list".to_string(),
545 Expr::Negative(..) => "negative".to_string(),
546 Expr::Not(..) => "not".to_string(),
547 Expr::Like(Like {
548 negated,
549 case_insensitive,
550 ..
551 }) => {
552 let name = if *case_insensitive { "ilike" } else { "like" };
553 if *negated {
554 format!("not {name}")
555 } else {
556 name.to_string()
557 }
558 }
559 Expr::SimilarTo(Like { negated, .. }) => {
560 if *negated {
561 "not similar to".to_string()
562 } else {
563 "similar to".to_string()
564 }
565 }
566 _ => {
567 return Err(py_type_err(format!(
568 "Catch all triggered in get_operator_name: {:?}",
569 &self.expr
570 )));
571 }
572 })
573 }
574
575 pub fn column_name(&self, plan: PyLogicalPlan) -> PyResult<String> {
576 self._column_name(&plan.plan()).map_err(py_runtime_err)
577 }
578
579 pub fn order_by(&self, order_by: Vec<PySortExpr>) -> PyExprFuncBuilder {
582 self.expr
583 .clone()
584 .order_by(to_sort_expressions(order_by))
585 .into()
586 }
587
588 pub fn filter(&self, filter: PyExpr) -> PyExprFuncBuilder {
589 self.expr.clone().filter(filter.expr.clone()).into()
590 }
591
592 pub fn distinct(&self) -> PyExprFuncBuilder {
593 self.expr.clone().distinct().into()
594 }
595
596 pub fn null_treatment(&self, null_treatment: NullTreatment) -> PyExprFuncBuilder {
597 self.expr
598 .clone()
599 .null_treatment(Some(null_treatment.into()))
600 .into()
601 }
602
603 pub fn partition_by(&self, partition_by: Vec<PyExpr>) -> PyExprFuncBuilder {
604 let partition_by = partition_by.iter().map(|e| e.expr.clone()).collect();
605 self.expr.clone().partition_by(partition_by).into()
606 }
607
608 pub fn window_frame(&self, window_frame: PyWindowFrame) -> PyExprFuncBuilder {
609 self.expr.clone().window_frame(window_frame.into()).into()
610 }
611
612 #[pyo3(signature = (partition_by=None, window_frame=None, order_by=None, null_treatment=None))]
613 pub fn over(
614 &self,
615 partition_by: Option<Vec<PyExpr>>,
616 window_frame: Option<PyWindowFrame>,
617 order_by: Option<Vec<PySortExpr>>,
618 null_treatment: Option<NullTreatment>,
619 ) -> PyDataFusionResult<PyExpr> {
620 match &self.expr {
621 Expr::AggregateFunction(agg_fn) => {
622 let window_fn = Expr::WindowFunction(Box::new(WindowFunction::new(
623 WindowFunctionDefinition::AggregateUDF(agg_fn.func.clone()),
624 agg_fn.params.args.clone(),
625 )));
626
627 add_builder_fns_to_window(
628 window_fn,
629 partition_by,
630 window_frame,
631 order_by,
632 null_treatment,
633 )
634 }
635 Expr::WindowFunction(_) => add_builder_fns_to_window(
636 self.expr.clone(),
637 partition_by,
638 window_frame,
639 order_by,
640 null_treatment,
641 ),
642 _ => Err(datafusion::error::DataFusionError::Plan(format!(
643 "Using {} with `over` is not allowed. Must use an aggregate or window function.",
644 self.expr.variant_name()
645 ))
646 .into()),
647 }
648 }
649}
650
651#[pyclass(frozen, name = "ExprFuncBuilder", module = "datafusion.expr", subclass)]
652#[derive(Debug, Clone)]
653pub struct PyExprFuncBuilder {
654 pub builder: ExprFuncBuilder,
655}
656
657impl From<ExprFuncBuilder> for PyExprFuncBuilder {
658 fn from(builder: ExprFuncBuilder) -> Self {
659 Self { builder }
660 }
661}
662
663#[pymethods]
664impl PyExprFuncBuilder {
665 pub fn order_by(&self, order_by: Vec<PySortExpr>) -> PyExprFuncBuilder {
666 self.builder
667 .clone()
668 .order_by(to_sort_expressions(order_by))
669 .into()
670 }
671
672 pub fn filter(&self, filter: PyExpr) -> PyExprFuncBuilder {
673 self.builder.clone().filter(filter.expr.clone()).into()
674 }
675
676 pub fn distinct(&self) -> PyExprFuncBuilder {
677 self.builder.clone().distinct().into()
678 }
679
680 pub fn null_treatment(&self, null_treatment: NullTreatment) -> PyExprFuncBuilder {
681 self.builder
682 .clone()
683 .null_treatment(Some(null_treatment.into()))
684 .into()
685 }
686
687 pub fn partition_by(&self, partition_by: Vec<PyExpr>) -> PyExprFuncBuilder {
688 let partition_by = partition_by.iter().map(|e| e.expr.clone()).collect();
689 self.builder.clone().partition_by(partition_by).into()
690 }
691
692 pub fn window_frame(&self, window_frame: PyWindowFrame) -> PyExprFuncBuilder {
693 self.builder
694 .clone()
695 .window_frame(window_frame.into())
696 .into()
697 }
698
699 pub fn build(&self) -> PyDataFusionResult<PyExpr> {
700 Ok(self.builder.clone().build().map(|expr| expr.into())?)
701 }
702}
703
704impl PyExpr {
705 pub fn _column_name(&self, plan: &LogicalPlan) -> PyDataFusionResult<String> {
706 let field = Self::expr_to_field(&self.expr, plan)?;
707 Ok(field.name().to_owned())
708 }
709
710 pub fn expr_to_field(expr: &Expr, input_plan: &LogicalPlan) -> PyDataFusionResult<Arc<Field>> {
712 let fields = exprlist_to_fields(std::slice::from_ref(expr), input_plan)?;
713 Ok(fields[0].1.clone())
714 }
715 fn _types(expr: &Expr) -> PyResult<DataTypeMap> {
716 match expr {
717 Expr::BinaryExpr(BinaryExpr {
718 left: _,
719 op,
720 right: _,
721 }) => match op {
722 Operator::Eq
723 | Operator::NotEq
724 | Operator::Lt
725 | Operator::LtEq
726 | Operator::Gt
727 | Operator::GtEq
728 | Operator::And
729 | Operator::Or
730 | Operator::IsDistinctFrom
731 | Operator::IsNotDistinctFrom
732 | Operator::RegexMatch
733 | Operator::RegexIMatch
734 | Operator::RegexNotMatch
735 | Operator::RegexNotIMatch
736 | Operator::LikeMatch
737 | Operator::ILikeMatch
738 | Operator::NotLikeMatch
739 | Operator::NotILikeMatch => DataTypeMap::map_from_arrow_type(&DataType::Boolean),
740 Operator::Plus | Operator::Minus | Operator::Multiply | Operator::Modulo => {
741 DataTypeMap::map_from_arrow_type(&DataType::Int64)
742 }
743 Operator::Divide => DataTypeMap::map_from_arrow_type(&DataType::Float64),
744 Operator::StringConcat => DataTypeMap::map_from_arrow_type(&DataType::Utf8),
745 Operator::BitwiseShiftLeft
746 | Operator::BitwiseShiftRight
747 | Operator::BitwiseXor
748 | Operator::BitwiseAnd
749 | Operator::BitwiseOr => DataTypeMap::map_from_arrow_type(&DataType::Binary),
750 Operator::AtArrow
751 | Operator::ArrowAt
752 | Operator::Arrow
753 | Operator::LongArrow
754 | Operator::HashArrow
755 | Operator::HashLongArrow
756 | Operator::AtAt
757 | Operator::IntegerDivide
758 | Operator::HashMinus
759 | Operator::AtQuestion
760 | Operator::Question
761 | Operator::QuestionAnd
762 | Operator::QuestionPipe => Err(py_type_err(format!("Unsupported expr: ${op}"))),
763 },
764 Expr::Cast(Cast { expr: _, data_type }) => DataTypeMap::map_from_arrow_type(data_type),
765 Expr::Literal(scalar_value, _) => DataTypeMap::map_from_scalar_value(scalar_value),
766 _ => Err(py_type_err(format!(
767 "Non Expr::Literal encountered in types: {expr:?}"
768 ))),
769 }
770 }
771}
772
773pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
775 m.add_class::<PyExpr>()?;
776 m.add_class::<PyColumn>()?;
777 m.add_class::<PyLiteral>()?;
778 m.add_class::<PyBinaryExpr>()?;
779 m.add_class::<PyLiteral>()?;
780 m.add_class::<PyAggregateFunction>()?;
781 m.add_class::<PyNot>()?;
782 m.add_class::<PyIsNotNull>()?;
783 m.add_class::<PyIsNull>()?;
784 m.add_class::<PyIsTrue>()?;
785 m.add_class::<PyIsFalse>()?;
786 m.add_class::<PyIsUnknown>()?;
787 m.add_class::<PyIsNotTrue>()?;
788 m.add_class::<PyIsNotFalse>()?;
789 m.add_class::<PyIsNotUnknown>()?;
790 m.add_class::<PyNegative>()?;
791 m.add_class::<PyLike>()?;
792 m.add_class::<PyILike>()?;
793 m.add_class::<PySimilarTo>()?;
794 m.add_class::<PyScalarVariable>()?;
795 m.add_class::<alias::PyAlias>()?;
796 m.add_class::<in_list::PyInList>()?;
797 m.add_class::<exists::PyExists>()?;
798 m.add_class::<subquery::PySubquery>()?;
799 m.add_class::<in_subquery::PyInSubquery>()?;
800 m.add_class::<scalar_subquery::PyScalarSubquery>()?;
801 m.add_class::<placeholder::PyPlaceholder>()?;
802 m.add_class::<grouping_set::PyGroupingSet>()?;
803 m.add_class::<case::PyCase>()?;
804 m.add_class::<conditional_expr::PyCaseBuilder>()?;
805 m.add_class::<cast::PyCast>()?;
806 m.add_class::<cast::PyTryCast>()?;
807 m.add_class::<between::PyBetween>()?;
808 m.add_class::<explain::PyExplain>()?;
809 m.add_class::<limit::PyLimit>()?;
810 m.add_class::<aggregate::PyAggregate>()?;
811 m.add_class::<sort::PySort>()?;
812 m.add_class::<analyze::PyAnalyze>()?;
813 m.add_class::<empty_relation::PyEmptyRelation>()?;
814 m.add_class::<join::PyJoin>()?;
815 m.add_class::<join::PyJoinType>()?;
816 m.add_class::<join::PyJoinConstraint>()?;
817 m.add_class::<union::PyUnion>()?;
818 m.add_class::<unnest::PyUnnest>()?;
819 m.add_class::<unnest_expr::PyUnnestExpr>()?;
820 m.add_class::<extension::PyExtension>()?;
821 m.add_class::<filter::PyFilter>()?;
822 m.add_class::<projection::PyProjection>()?;
823 m.add_class::<table_scan::PyTableScan>()?;
824 m.add_class::<create_memory_table::PyCreateMemoryTable>()?;
825 m.add_class::<create_view::PyCreateView>()?;
826 m.add_class::<distinct::PyDistinct>()?;
827 m.add_class::<sort_expr::PySortExpr>()?;
828 m.add_class::<subquery_alias::PySubqueryAlias>()?;
829 m.add_class::<drop_table::PyDropTable>()?;
830 m.add_class::<repartition::PyPartitioning>()?;
831 m.add_class::<repartition::PyRepartition>()?;
832 m.add_class::<window::PyWindowExpr>()?;
833 m.add_class::<window::PyWindowFrame>()?;
834 m.add_class::<window::PyWindowFrameBound>()?;
835 m.add_class::<copy_to::PyCopyTo>()?;
836 m.add_class::<copy_to::PyFileType>()?;
837 m.add_class::<create_catalog::PyCreateCatalog>()?;
838 m.add_class::<create_catalog_schema::PyCreateCatalogSchema>()?;
839 m.add_class::<create_external_table::PyCreateExternalTable>()?;
840 m.add_class::<create_function::PyCreateFunction>()?;
841 m.add_class::<create_function::PyOperateFunctionArg>()?;
842 m.add_class::<create_function::PyCreateFunctionBody>()?;
843 m.add_class::<create_index::PyCreateIndex>()?;
844 m.add_class::<describe_table::PyDescribeTable>()?;
845 m.add_class::<dml::PyDmlStatement>()?;
846 m.add_class::<drop_catalog_schema::PyDropCatalogSchema>()?;
847 m.add_class::<drop_function::PyDropFunction>()?;
848 m.add_class::<drop_view::PyDropView>()?;
849 m.add_class::<recursive_query::PyRecursiveQuery>()?;
850
851 m.add_class::<statement::PyTransactionStart>()?;
852 m.add_class::<statement::PyTransactionEnd>()?;
853 m.add_class::<statement::PySetVariable>()?;
854 m.add_class::<statement::PyPrepare>()?;
855 m.add_class::<statement::PyExecute>()?;
856 m.add_class::<statement::PyDeallocate>()?;
857 m.add_class::<values::PyValues>()?;
858 m.add_class::<statement::PyTransactionAccessMode>()?;
859 m.add_class::<statement::PyTransactionConclusion>()?;
860 m.add_class::<statement::PyTransactionIsolationLevel>()?;
861
862 Ok(())
863}