datafusion_python/
expr.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::logical_expr::expr::{AggregateFunctionParams, WindowFunctionParams};
19use datafusion::logical_expr::utils::exprlist_to_fields;
20use datafusion::logical_expr::{
21    ExprFuncBuilder, ExprFunctionExt, LogicalPlan, WindowFunctionDefinition,
22};
23use pyo3::IntoPyObjectExt;
24use pyo3::{basic::CompareOp, prelude::*};
25use std::convert::{From, Into};
26use std::sync::Arc;
27use window::PyWindowFrame;
28
29use datafusion::arrow::datatypes::{DataType, Field};
30use datafusion::arrow::pyarrow::PyArrowType;
31use datafusion::functions::core::expr_ext::FieldAccessor;
32use datafusion::logical_expr::{
33    col,
34    expr::{AggregateFunction, InList, InSubquery, ScalarFunction, WindowFunction},
35    lit, Between, BinaryExpr, Case, Cast, Expr, Like, Operator, TryCast,
36};
37
38use crate::common::data_type::{DataTypeMap, NullTreatment, PyScalarValue, RexType};
39use crate::errors::{
40    py_runtime_err, py_type_err, py_unsupported_variant_err, PyDataFusionError, PyDataFusionResult,
41};
42use crate::expr::aggregate_expr::PyAggregateFunction;
43use crate::expr::binary_expr::PyBinaryExpr;
44use crate::expr::column::PyColumn;
45use crate::expr::literal::PyLiteral;
46use crate::functions::add_builder_fns_to_window;
47use crate::pyarrow_util::scalar_to_pyarrow;
48use crate::sql::logical::PyLogicalPlan;
49
50use self::alias::PyAlias;
51use self::bool_expr::{
52    PyIsFalse, PyIsNotFalse, PyIsNotNull, PyIsNotTrue, PyIsNotUnknown, PyIsNull, PyIsTrue,
53    PyIsUnknown, PyNegative, PyNot,
54};
55use self::like::{PyILike, PyLike, PySimilarTo};
56use self::scalar_variable::PyScalarVariable;
57
58pub mod aggregate;
59pub mod aggregate_expr;
60pub mod alias;
61pub mod analyze;
62pub mod between;
63pub mod binary_expr;
64pub mod bool_expr;
65pub mod case;
66pub mod cast;
67pub mod column;
68pub mod conditional_expr;
69pub mod create_memory_table;
70pub mod create_view;
71pub mod distinct;
72pub mod drop_table;
73pub mod empty_relation;
74pub mod exists;
75pub mod explain;
76pub mod extension;
77pub mod filter;
78pub mod grouping_set;
79pub mod in_list;
80pub mod in_subquery;
81pub mod join;
82pub mod like;
83pub mod limit;
84pub mod literal;
85pub mod logical_node;
86pub mod placeholder;
87pub mod projection;
88pub mod repartition;
89pub mod scalar_subquery;
90pub mod scalar_variable;
91pub mod signature;
92pub mod sort;
93pub mod sort_expr;
94pub mod subquery;
95pub mod subquery_alias;
96pub mod table_scan;
97pub mod union;
98pub mod unnest;
99pub mod unnest_expr;
100pub mod window;
101
102use sort_expr::{to_sort_expressions, PySortExpr};
103
104/// A PyExpr that can be used on a DataFrame
105#[pyclass(name = "RawExpr", module = "datafusion.expr", subclass)]
106#[derive(Debug, Clone)]
107pub struct PyExpr {
108    pub expr: Expr,
109}
110
111impl From<PyExpr> for Expr {
112    fn from(expr: PyExpr) -> Expr {
113        expr.expr
114    }
115}
116
117impl From<Expr> for PyExpr {
118    fn from(expr: Expr) -> PyExpr {
119        PyExpr { expr }
120    }
121}
122
123/// Convert a list of DataFusion Expr to PyExpr
124pub fn py_expr_list(expr: &[Expr]) -> PyResult<Vec<PyExpr>> {
125    Ok(expr.iter().map(|e| PyExpr::from(e.clone())).collect())
126}
127
128#[pymethods]
129impl PyExpr {
130    /// Return the specific expression
131    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
132        Python::with_gil(|_| {
133            match &self.expr {
134            Expr::Alias(alias) => Ok(PyAlias::from(alias.clone()).into_bound_py_any(py)?),
135            Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_bound_py_any(py)?),
136            Expr::ScalarVariable(data_type, variables) => {
137                Ok(PyScalarVariable::new(data_type, variables).into_bound_py_any(py)?)
138            }
139            Expr::Like(value) => Ok(PyLike::from(value.clone()).into_bound_py_any(py)?),
140            Expr::Literal(value) => Ok(PyLiteral::from(value.clone()).into_bound_py_any(py)?),
141            Expr::BinaryExpr(expr) => Ok(PyBinaryExpr::from(expr.clone()).into_bound_py_any(py)?),
142            Expr::Not(expr) => Ok(PyNot::new(*expr.clone()).into_bound_py_any(py)?),
143            Expr::IsNotNull(expr) => Ok(PyIsNotNull::new(*expr.clone()).into_bound_py_any(py)?),
144            Expr::IsNull(expr) => Ok(PyIsNull::new(*expr.clone()).into_bound_py_any(py)?),
145            Expr::IsTrue(expr) => Ok(PyIsTrue::new(*expr.clone()).into_bound_py_any(py)?),
146            Expr::IsFalse(expr) => Ok(PyIsFalse::new(*expr.clone()).into_bound_py_any(py)?),
147            Expr::IsUnknown(expr) => Ok(PyIsUnknown::new(*expr.clone()).into_bound_py_any(py)?),
148            Expr::IsNotTrue(expr) => Ok(PyIsNotTrue::new(*expr.clone()).into_bound_py_any(py)?),
149            Expr::IsNotFalse(expr) => Ok(PyIsNotFalse::new(*expr.clone()).into_bound_py_any(py)?),
150            Expr::IsNotUnknown(expr) => Ok(PyIsNotUnknown::new(*expr.clone()).into_bound_py_any(py)?),
151            Expr::Negative(expr) => Ok(PyNegative::new(*expr.clone()).into_bound_py_any(py)?),
152            Expr::AggregateFunction(expr) => {
153                Ok(PyAggregateFunction::from(expr.clone()).into_bound_py_any(py)?)
154            }
155            Expr::SimilarTo(value) => Ok(PySimilarTo::from(value.clone()).into_bound_py_any(py)?),
156            Expr::Between(value) => Ok(between::PyBetween::from(value.clone()).into_bound_py_any(py)?),
157            Expr::Case(value) => Ok(case::PyCase::from(value.clone()).into_bound_py_any(py)?),
158            Expr::Cast(value) => Ok(cast::PyCast::from(value.clone()).into_bound_py_any(py)?),
159            Expr::TryCast(value) => Ok(cast::PyTryCast::from(value.clone()).into_bound_py_any(py)?),
160            Expr::ScalarFunction(value) => Err(py_unsupported_variant_err(format!(
161                "Converting Expr::ScalarFunction to a Python object is not implemented: {:?}",
162                value
163            ))),
164            Expr::WindowFunction(value) => Err(py_unsupported_variant_err(format!(
165                "Converting Expr::WindowFunction to a Python object is not implemented: {:?}",
166                value
167            ))),
168            Expr::InList(value) => Ok(in_list::PyInList::from(value.clone()).into_bound_py_any(py)?),
169            Expr::Exists(value) => Ok(exists::PyExists::from(value.clone()).into_bound_py_any(py)?),
170            Expr::InSubquery(value) => {
171                Ok(in_subquery::PyInSubquery::from(value.clone()).into_bound_py_any(py)?)
172            }
173            Expr::ScalarSubquery(value) => {
174                Ok(scalar_subquery::PyScalarSubquery::from(value.clone()).into_bound_py_any(py)?)
175            }
176            #[allow(deprecated)]
177            Expr::Wildcard { qualifier, options } => Err(py_unsupported_variant_err(format!(
178                "Converting Expr::Wildcard to a Python object is not implemented : {:?} {:?}",
179                qualifier, options
180            ))),
181            Expr::GroupingSet(value) => {
182                Ok(grouping_set::PyGroupingSet::from(value.clone()).into_bound_py_any(py)?)
183            }
184            Expr::Placeholder(value) => {
185                Ok(placeholder::PyPlaceholder::from(value.clone()).into_bound_py_any(py)?)
186            }
187            Expr::OuterReferenceColumn(data_type, column) => Err(py_unsupported_variant_err(format!(
188                "Converting Expr::OuterReferenceColumn to a Python object is not implemented: {:?} - {:?}",
189                data_type, column
190            ))),
191            Expr::Unnest(value) => Ok(unnest_expr::PyUnnestExpr::from(value.clone()).into_bound_py_any(py)?),
192        }
193        })
194    }
195
196    /// Returns the name of this expression as it should appear in a schema. This name
197    /// will not include any CAST expressions.
198    fn schema_name(&self) -> PyResult<String> {
199        Ok(format!("{}", self.expr.schema_name()))
200    }
201
202    /// Returns a full and complete string representation of this expression.
203    fn canonical_name(&self) -> PyResult<String> {
204        Ok(format!("{}", self.expr))
205    }
206
207    /// Returns the name of the Expr variant.
208    /// Ex: 'IsNotNull', 'Literal', 'BinaryExpr', etc
209    fn variant_name(&self) -> PyResult<&str> {
210        Ok(self.expr.variant_name())
211    }
212
213    fn __richcmp__(&self, other: PyExpr, op: CompareOp) -> PyExpr {
214        let expr = match op {
215            CompareOp::Lt => self.expr.clone().lt(other.expr),
216            CompareOp::Le => self.expr.clone().lt_eq(other.expr),
217            CompareOp::Eq => self.expr.clone().eq(other.expr),
218            CompareOp::Ne => self.expr.clone().not_eq(other.expr),
219            CompareOp::Gt => self.expr.clone().gt(other.expr),
220            CompareOp::Ge => self.expr.clone().gt_eq(other.expr),
221        };
222        expr.into()
223    }
224
225    fn __repr__(&self) -> PyResult<String> {
226        Ok(format!("Expr({})", self.expr))
227    }
228
229    fn __add__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
230        Ok((self.expr.clone() + rhs.expr).into())
231    }
232
233    fn __sub__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
234        Ok((self.expr.clone() - rhs.expr).into())
235    }
236
237    fn __truediv__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
238        Ok((self.expr.clone() / rhs.expr).into())
239    }
240
241    fn __mul__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
242        Ok((self.expr.clone() * rhs.expr).into())
243    }
244
245    fn __mod__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
246        let expr = self.expr.clone() % rhs.expr;
247        Ok(expr.into())
248    }
249
250    fn __and__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
251        Ok(self.expr.clone().and(rhs.expr).into())
252    }
253
254    fn __or__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
255        Ok(self.expr.clone().or(rhs.expr).into())
256    }
257
258    fn __invert__(&self) -> PyResult<PyExpr> {
259        let expr = !self.expr.clone();
260        Ok(expr.into())
261    }
262
263    fn __getitem__(&self, key: &str) -> PyResult<PyExpr> {
264        Ok(self.expr.clone().field(key).into())
265    }
266
267    #[staticmethod]
268    pub fn literal(value: PyScalarValue) -> PyExpr {
269        lit(value.0).into()
270    }
271
272    #[staticmethod]
273    pub fn column(value: &str) -> PyExpr {
274        col(value).into()
275    }
276
277    /// assign a name to the PyExpr
278    pub fn alias(&self, name: &str) -> PyExpr {
279        self.expr.clone().alias(name).into()
280    }
281
282    /// Create a sort PyExpr from an existing PyExpr.
283    #[pyo3(signature = (ascending=true, nulls_first=true))]
284    pub fn sort(&self, ascending: bool, nulls_first: bool) -> PySortExpr {
285        self.expr.clone().sort(ascending, nulls_first).into()
286    }
287
288    pub fn is_null(&self) -> PyExpr {
289        self.expr.clone().is_null().into()
290    }
291
292    pub fn is_not_null(&self) -> PyExpr {
293        self.expr.clone().is_not_null().into()
294    }
295
296    pub fn cast(&self, to: PyArrowType<DataType>) -> PyExpr {
297        // self.expr.cast_to() requires DFSchema to validate that the cast
298        // is supported, omit that for now
299        let expr = Expr::Cast(Cast::new(Box::new(self.expr.clone()), to.0));
300        expr.into()
301    }
302
303    #[pyo3(signature = (low, high, negated=false))]
304    pub fn between(&self, low: PyExpr, high: PyExpr, negated: bool) -> PyExpr {
305        let expr = Expr::Between(Between::new(
306            Box::new(self.expr.clone()),
307            negated,
308            Box::new(low.into()),
309            Box::new(high.into()),
310        ));
311        expr.into()
312    }
313
314    /// A Rex (Row Expression) specifies a single row of data. That specification
315    /// could include user defined functions or types. RexType identifies the row
316    /// as one of the possible valid `RexTypes`.
317    pub fn rex_type(&self) -> PyResult<RexType> {
318        Ok(match self.expr {
319            Expr::Alias(..) => RexType::Alias,
320            Expr::Column(..) => RexType::Reference,
321            Expr::ScalarVariable(..) | Expr::Literal(..) => RexType::Literal,
322            Expr::BinaryExpr { .. }
323            | Expr::Not(..)
324            | Expr::IsNotNull(..)
325            | Expr::Negative(..)
326            | Expr::IsNull(..)
327            | Expr::Like { .. }
328            | Expr::SimilarTo { .. }
329            | Expr::Between { .. }
330            | Expr::Case { .. }
331            | Expr::Cast { .. }
332            | Expr::TryCast { .. }
333            | Expr::ScalarFunction { .. }
334            | Expr::AggregateFunction { .. }
335            | Expr::WindowFunction { .. }
336            | Expr::InList { .. }
337            | Expr::Exists { .. }
338            | Expr::InSubquery { .. }
339            | Expr::GroupingSet(..)
340            | Expr::IsTrue(..)
341            | Expr::IsFalse(..)
342            | Expr::IsUnknown(_)
343            | Expr::IsNotTrue(..)
344            | Expr::IsNotFalse(..)
345            | Expr::Placeholder { .. }
346            | Expr::OuterReferenceColumn(_, _)
347            | Expr::Unnest(_)
348            | Expr::IsNotUnknown(_) => RexType::Call,
349            Expr::ScalarSubquery(..) => RexType::ScalarSubquery,
350            #[allow(deprecated)]
351            Expr::Wildcard { .. } => {
352                return Err(py_unsupported_variant_err("Expr::Wildcard is unsupported"))
353            }
354        })
355    }
356
357    /// Given the current `Expr` return the DataTypeMap which represents the
358    /// PythonType, Arrow DataType, and SqlType Enum which represents
359    pub fn types(&self) -> PyResult<DataTypeMap> {
360        Self::_types(&self.expr)
361    }
362
363    /// Extracts the Expr value into a PyObject that can be shared with Python
364    pub fn python_value(&self, py: Python) -> PyResult<PyObject> {
365        match &self.expr {
366            Expr::Literal(scalar_value) => scalar_to_pyarrow(scalar_value, py),
367            _ => Err(py_type_err(format!(
368                "Non Expr::Literal encountered in types: {:?}",
369                &self.expr
370            ))),
371        }
372    }
373
374    /// Row expressions, Rex(s), operate on the concept of operands. Different variants of Expressions, Expr(s),
375    /// store those operands in different datastructures. This function examines the Expr variant and returns
376    /// the operands to the calling logic as a Vec of PyExpr instances.
377    pub fn rex_call_operands(&self) -> PyResult<Vec<PyExpr>> {
378        match &self.expr {
379            // Expr variants that are themselves the operand to return
380            Expr::Column(..) | Expr::ScalarVariable(..) | Expr::Literal(..) => {
381                Ok(vec![PyExpr::from(self.expr.clone())])
382            }
383
384            Expr::Alias(alias) => Ok(vec![PyExpr::from(*alias.expr.clone())]),
385
386            // Expr(s) that house the Expr instance to return in their bounded params
387            Expr::Not(expr)
388            | Expr::IsNull(expr)
389            | Expr::IsNotNull(expr)
390            | Expr::IsTrue(expr)
391            | Expr::IsFalse(expr)
392            | Expr::IsUnknown(expr)
393            | Expr::IsNotTrue(expr)
394            | Expr::IsNotFalse(expr)
395            | Expr::IsNotUnknown(expr)
396            | Expr::Negative(expr)
397            | Expr::Cast(Cast { expr, .. })
398            | Expr::TryCast(TryCast { expr, .. })
399            | Expr::InSubquery(InSubquery { expr, .. }) => Ok(vec![PyExpr::from(*expr.clone())]),
400
401            // Expr variants containing a collection of Expr(s) for operands
402            Expr::AggregateFunction(AggregateFunction {
403                params: AggregateFunctionParams { args, .. },
404                ..
405            })
406            | Expr::ScalarFunction(ScalarFunction { args, .. })
407            | Expr::WindowFunction(WindowFunction {
408                params: WindowFunctionParams { args, .. },
409                ..
410            }) => Ok(args.iter().map(|arg| PyExpr::from(arg.clone())).collect()),
411
412            // Expr(s) that require more specific processing
413            Expr::Case(Case {
414                expr,
415                when_then_expr,
416                else_expr,
417            }) => {
418                let mut operands: Vec<PyExpr> = Vec::new();
419
420                if let Some(e) = expr {
421                    for (when, then) in when_then_expr {
422                        operands.push(PyExpr::from(Expr::BinaryExpr(BinaryExpr::new(
423                            Box::new(*e.clone()),
424                            Operator::Eq,
425                            Box::new(*when.clone()),
426                        ))));
427                        operands.push(PyExpr::from(*then.clone()));
428                    }
429                } else {
430                    for (when, then) in when_then_expr {
431                        operands.push(PyExpr::from(*when.clone()));
432                        operands.push(PyExpr::from(*then.clone()));
433                    }
434                };
435
436                if let Some(e) = else_expr {
437                    operands.push(PyExpr::from(*e.clone()));
438                };
439
440                Ok(operands)
441            }
442            Expr::InList(InList { expr, list, .. }) => {
443                let mut operands: Vec<PyExpr> = vec![PyExpr::from(*expr.clone())];
444                for list_elem in list {
445                    operands.push(PyExpr::from(list_elem.clone()));
446                }
447
448                Ok(operands)
449            }
450            Expr::BinaryExpr(BinaryExpr { left, right, .. }) => Ok(vec![
451                PyExpr::from(*left.clone()),
452                PyExpr::from(*right.clone()),
453            ]),
454            Expr::Like(Like { expr, pattern, .. }) => Ok(vec![
455                PyExpr::from(*expr.clone()),
456                PyExpr::from(*pattern.clone()),
457            ]),
458            Expr::SimilarTo(Like { expr, pattern, .. }) => Ok(vec![
459                PyExpr::from(*expr.clone()),
460                PyExpr::from(*pattern.clone()),
461            ]),
462            Expr::Between(Between {
463                expr,
464                negated: _,
465                low,
466                high,
467            }) => Ok(vec![
468                PyExpr::from(*expr.clone()),
469                PyExpr::from(*low.clone()),
470                PyExpr::from(*high.clone()),
471            ]),
472
473            // Currently un-support/implemented Expr types for Rex Call operations
474            Expr::GroupingSet(..)
475            | Expr::Unnest(_)
476            | Expr::OuterReferenceColumn(_, _)
477            | Expr::ScalarSubquery(..)
478            | Expr::Placeholder { .. }
479            | Expr::Exists { .. } => Err(py_runtime_err(format!(
480                "Unimplemented Expr type: {}",
481                self.expr
482            ))),
483
484            #[allow(deprecated)]
485            Expr::Wildcard { .. } => {
486                Err(py_unsupported_variant_err("Expr::Wildcard is unsupported"))
487            }
488        }
489    }
490
491    /// Extracts the operator associated with a RexType::Call
492    pub fn rex_call_operator(&self) -> PyResult<String> {
493        Ok(match &self.expr {
494            Expr::BinaryExpr(BinaryExpr {
495                left: _,
496                op,
497                right: _,
498            }) => format!("{op}"),
499            Expr::ScalarFunction(ScalarFunction { func, args: _ }) => func.name().to_string(),
500            Expr::Cast { .. } => "cast".to_string(),
501            Expr::Between { .. } => "between".to_string(),
502            Expr::Case { .. } => "case".to_string(),
503            Expr::IsNull(..) => "is null".to_string(),
504            Expr::IsNotNull(..) => "is not null".to_string(),
505            Expr::IsTrue(_) => "is true".to_string(),
506            Expr::IsFalse(_) => "is false".to_string(),
507            Expr::IsUnknown(_) => "is unknown".to_string(),
508            Expr::IsNotTrue(_) => "is not true".to_string(),
509            Expr::IsNotFalse(_) => "is not false".to_string(),
510            Expr::IsNotUnknown(_) => "is not unknown".to_string(),
511            Expr::InList { .. } => "in list".to_string(),
512            Expr::Negative(..) => "negative".to_string(),
513            Expr::Not(..) => "not".to_string(),
514            Expr::Like(Like {
515                negated,
516                case_insensitive,
517                ..
518            }) => {
519                let name = if *case_insensitive { "ilike" } else { "like" };
520                if *negated {
521                    format!("not {name}")
522                } else {
523                    name.to_string()
524                }
525            }
526            Expr::SimilarTo(Like { negated, .. }) => {
527                if *negated {
528                    "not similar to".to_string()
529                } else {
530                    "similar to".to_string()
531                }
532            }
533            _ => {
534                return Err(py_type_err(format!(
535                    "Catch all triggered in get_operator_name: {:?}",
536                    &self.expr
537                )))
538            }
539        })
540    }
541
542    pub fn column_name(&self, plan: PyLogicalPlan) -> PyResult<String> {
543        self._column_name(&plan.plan()).map_err(py_runtime_err)
544    }
545
546    // Expression Function Builder functions
547
548    pub fn order_by(&self, order_by: Vec<PySortExpr>) -> PyExprFuncBuilder {
549        self.expr
550            .clone()
551            .order_by(to_sort_expressions(order_by))
552            .into()
553    }
554
555    pub fn filter(&self, filter: PyExpr) -> PyExprFuncBuilder {
556        self.expr.clone().filter(filter.expr.clone()).into()
557    }
558
559    pub fn distinct(&self) -> PyExprFuncBuilder {
560        self.expr.clone().distinct().into()
561    }
562
563    pub fn null_treatment(&self, null_treatment: NullTreatment) -> PyExprFuncBuilder {
564        self.expr
565            .clone()
566            .null_treatment(Some(null_treatment.into()))
567            .into()
568    }
569
570    pub fn partition_by(&self, partition_by: Vec<PyExpr>) -> PyExprFuncBuilder {
571        let partition_by = partition_by.iter().map(|e| e.expr.clone()).collect();
572        self.expr.clone().partition_by(partition_by).into()
573    }
574
575    pub fn window_frame(&self, window_frame: PyWindowFrame) -> PyExprFuncBuilder {
576        self.expr.clone().window_frame(window_frame.into()).into()
577    }
578
579    #[pyo3(signature = (partition_by=None, window_frame=None, order_by=None, null_treatment=None))]
580    pub fn over(
581        &self,
582        partition_by: Option<Vec<PyExpr>>,
583        window_frame: Option<PyWindowFrame>,
584        order_by: Option<Vec<PySortExpr>>,
585        null_treatment: Option<NullTreatment>,
586    ) -> PyDataFusionResult<PyExpr> {
587        match &self.expr {
588            Expr::AggregateFunction(agg_fn) => {
589                let window_fn = Expr::WindowFunction(WindowFunction::new(
590                    WindowFunctionDefinition::AggregateUDF(agg_fn.func.clone()),
591                    agg_fn.params.args.clone(),
592                ));
593
594                add_builder_fns_to_window(
595                    window_fn,
596                    partition_by,
597                    window_frame,
598                    order_by,
599                    null_treatment,
600                )
601            }
602            Expr::WindowFunction(_) => add_builder_fns_to_window(
603                self.expr.clone(),
604                partition_by,
605                window_frame,
606                order_by,
607                null_treatment,
608            ),
609            _ => Err(
610                PyDataFusionError::ExecutionError(datafusion::error::DataFusionError::Plan(
611                    format!("Using {} with `over` is not allowed. Must use an aggregate or window function.", self.expr.variant_name()),
612                ))
613            ),
614        }
615    }
616}
617
618#[pyclass(name = "ExprFuncBuilder", module = "datafusion.expr", subclass)]
619#[derive(Debug, Clone)]
620pub struct PyExprFuncBuilder {
621    pub builder: ExprFuncBuilder,
622}
623
624impl From<ExprFuncBuilder> for PyExprFuncBuilder {
625    fn from(builder: ExprFuncBuilder) -> Self {
626        Self { builder }
627    }
628}
629
630#[pymethods]
631impl PyExprFuncBuilder {
632    pub fn order_by(&self, order_by: Vec<PySortExpr>) -> PyExprFuncBuilder {
633        self.builder
634            .clone()
635            .order_by(to_sort_expressions(order_by))
636            .into()
637    }
638
639    pub fn filter(&self, filter: PyExpr) -> PyExprFuncBuilder {
640        self.builder.clone().filter(filter.expr.clone()).into()
641    }
642
643    pub fn distinct(&self) -> PyExprFuncBuilder {
644        self.builder.clone().distinct().into()
645    }
646
647    pub fn null_treatment(&self, null_treatment: NullTreatment) -> PyExprFuncBuilder {
648        self.builder
649            .clone()
650            .null_treatment(Some(null_treatment.into()))
651            .into()
652    }
653
654    pub fn partition_by(&self, partition_by: Vec<PyExpr>) -> PyExprFuncBuilder {
655        let partition_by = partition_by.iter().map(|e| e.expr.clone()).collect();
656        self.builder.clone().partition_by(partition_by).into()
657    }
658
659    pub fn window_frame(&self, window_frame: PyWindowFrame) -> PyExprFuncBuilder {
660        self.builder
661            .clone()
662            .window_frame(window_frame.into())
663            .into()
664    }
665
666    pub fn build(&self) -> PyDataFusionResult<PyExpr> {
667        Ok(self.builder.clone().build().map(|expr| expr.into())?)
668    }
669}
670
671impl PyExpr {
672    pub fn _column_name(&self, plan: &LogicalPlan) -> PyDataFusionResult<String> {
673        let field = Self::expr_to_field(&self.expr, plan)?;
674        Ok(field.name().to_owned())
675    }
676
677    /// Create a [Field] representing an [Expr], given an input [LogicalPlan] to resolve against
678    pub fn expr_to_field(expr: &Expr, input_plan: &LogicalPlan) -> PyDataFusionResult<Arc<Field>> {
679        let fields = exprlist_to_fields(&[expr.clone()], input_plan)?;
680        Ok(fields[0].1.clone())
681    }
682    fn _types(expr: &Expr) -> PyResult<DataTypeMap> {
683        match expr {
684            Expr::BinaryExpr(BinaryExpr {
685                left: _,
686                op,
687                right: _,
688            }) => match op {
689                Operator::Eq
690                | Operator::NotEq
691                | Operator::Lt
692                | Operator::LtEq
693                | Operator::Gt
694                | Operator::GtEq
695                | Operator::And
696                | Operator::Or
697                | Operator::IsDistinctFrom
698                | Operator::IsNotDistinctFrom
699                | Operator::RegexMatch
700                | Operator::RegexIMatch
701                | Operator::RegexNotMatch
702                | Operator::RegexNotIMatch
703                | Operator::LikeMatch
704                | Operator::ILikeMatch
705                | Operator::NotLikeMatch
706                | Operator::NotILikeMatch => DataTypeMap::map_from_arrow_type(&DataType::Boolean),
707                Operator::Plus | Operator::Minus | Operator::Multiply | Operator::Modulo => {
708                    DataTypeMap::map_from_arrow_type(&DataType::Int64)
709                }
710                Operator::Divide => DataTypeMap::map_from_arrow_type(&DataType::Float64),
711                Operator::StringConcat => DataTypeMap::map_from_arrow_type(&DataType::Utf8),
712                Operator::BitwiseShiftLeft
713                | Operator::BitwiseShiftRight
714                | Operator::BitwiseXor
715                | Operator::BitwiseAnd
716                | Operator::BitwiseOr => DataTypeMap::map_from_arrow_type(&DataType::Binary),
717                Operator::AtArrow | Operator::ArrowAt => {
718                    Err(py_type_err(format!("Unsupported expr: ${op}")))
719                }
720            },
721            Expr::Cast(Cast { expr: _, data_type }) => DataTypeMap::map_from_arrow_type(data_type),
722            Expr::Literal(scalar_value) => DataTypeMap::map_from_scalar_value(scalar_value),
723            _ => Err(py_type_err(format!(
724                "Non Expr::Literal encountered in types: {:?}",
725                expr
726            ))),
727        }
728    }
729}
730
731/// Initializes the `expr` module to match the pattern of `datafusion-expr` https://docs.rs/datafusion-expr/latest/datafusion_expr/
732pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
733    m.add_class::<PyExpr>()?;
734    m.add_class::<PyColumn>()?;
735    m.add_class::<PyLiteral>()?;
736    m.add_class::<PyBinaryExpr>()?;
737    m.add_class::<PyLiteral>()?;
738    m.add_class::<PyAggregateFunction>()?;
739    m.add_class::<PyNot>()?;
740    m.add_class::<PyIsNotNull>()?;
741    m.add_class::<PyIsNull>()?;
742    m.add_class::<PyIsTrue>()?;
743    m.add_class::<PyIsFalse>()?;
744    m.add_class::<PyIsUnknown>()?;
745    m.add_class::<PyIsNotTrue>()?;
746    m.add_class::<PyIsNotFalse>()?;
747    m.add_class::<PyIsNotUnknown>()?;
748    m.add_class::<PyNegative>()?;
749    m.add_class::<PyLike>()?;
750    m.add_class::<PyILike>()?;
751    m.add_class::<PySimilarTo>()?;
752    m.add_class::<PyScalarVariable>()?;
753    m.add_class::<alias::PyAlias>()?;
754    m.add_class::<in_list::PyInList>()?;
755    m.add_class::<exists::PyExists>()?;
756    m.add_class::<subquery::PySubquery>()?;
757    m.add_class::<in_subquery::PyInSubquery>()?;
758    m.add_class::<scalar_subquery::PyScalarSubquery>()?;
759    m.add_class::<placeholder::PyPlaceholder>()?;
760    m.add_class::<grouping_set::PyGroupingSet>()?;
761    m.add_class::<case::PyCase>()?;
762    m.add_class::<conditional_expr::PyCaseBuilder>()?;
763    m.add_class::<cast::PyCast>()?;
764    m.add_class::<cast::PyTryCast>()?;
765    m.add_class::<between::PyBetween>()?;
766    m.add_class::<explain::PyExplain>()?;
767    m.add_class::<limit::PyLimit>()?;
768    m.add_class::<aggregate::PyAggregate>()?;
769    m.add_class::<sort::PySort>()?;
770    m.add_class::<analyze::PyAnalyze>()?;
771    m.add_class::<empty_relation::PyEmptyRelation>()?;
772    m.add_class::<join::PyJoin>()?;
773    m.add_class::<join::PyJoinType>()?;
774    m.add_class::<join::PyJoinConstraint>()?;
775    m.add_class::<union::PyUnion>()?;
776    m.add_class::<unnest::PyUnnest>()?;
777    m.add_class::<unnest_expr::PyUnnestExpr>()?;
778    m.add_class::<extension::PyExtension>()?;
779    m.add_class::<filter::PyFilter>()?;
780    m.add_class::<projection::PyProjection>()?;
781    m.add_class::<table_scan::PyTableScan>()?;
782    m.add_class::<create_memory_table::PyCreateMemoryTable>()?;
783    m.add_class::<create_view::PyCreateView>()?;
784    m.add_class::<distinct::PyDistinct>()?;
785    m.add_class::<sort_expr::PySortExpr>()?;
786    m.add_class::<subquery_alias::PySubqueryAlias>()?;
787    m.add_class::<drop_table::PyDropTable>()?;
788    m.add_class::<repartition::PyPartitioning>()?;
789    m.add_class::<repartition::PyRepartition>()?;
790    m.add_class::<window::PyWindowExpr>()?;
791    m.add_class::<window::PyWindowFrame>()?;
792    m.add_class::<window::PyWindowFrameBound>()?;
793    Ok(())
794}