datafusion_python/expr/
aggregate.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Display, Formatter};
19
20use datafusion::common::DataFusionError;
21use datafusion::logical_expr::expr::{AggregateFunction, AggregateFunctionParams, Alias};
22use datafusion::logical_expr::logical_plan::Aggregate;
23use datafusion::logical_expr::Expr;
24use pyo3::prelude::*;
25use pyo3::IntoPyObjectExt;
26
27use super::logical_node::LogicalNode;
28use crate::common::df_schema::PyDFSchema;
29use crate::errors::py_type_err;
30use crate::expr::PyExpr;
31use crate::sql::logical::PyLogicalPlan;
32
33#[pyclass(frozen, name = "Aggregate", module = "datafusion.expr", subclass)]
34#[derive(Clone)]
35pub struct PyAggregate {
36    aggregate: Aggregate,
37}
38
39impl From<Aggregate> for PyAggregate {
40    fn from(aggregate: Aggregate) -> PyAggregate {
41        PyAggregate { aggregate }
42    }
43}
44
45impl TryFrom<PyAggregate> for Aggregate {
46    type Error = DataFusionError;
47
48    fn try_from(agg: PyAggregate) -> Result<Self, Self::Error> {
49        Ok(agg.aggregate)
50    }
51}
52
53impl Display for PyAggregate {
54    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
55        write!(
56            f,
57            "Aggregate
58            \nGroupBy(s): {:?}
59            \nAggregates(s): {:?}
60            \nInput: {:?}
61            \nProjected Schema: {:?}",
62            &self.aggregate.group_expr,
63            &self.aggregate.aggr_expr,
64            self.aggregate.input,
65            self.aggregate.schema
66        )
67    }
68}
69
70#[pymethods]
71impl PyAggregate {
72    /// Retrieves the grouping expressions for this `Aggregate`
73    fn group_by_exprs(&self) -> PyResult<Vec<PyExpr>> {
74        Ok(self
75            .aggregate
76            .group_expr
77            .iter()
78            .map(|e| PyExpr::from(e.clone()))
79            .collect())
80    }
81
82    /// Retrieves the aggregate expressions for this `Aggregate`
83    fn aggregate_exprs(&self) -> PyResult<Vec<PyExpr>> {
84        Ok(self
85            .aggregate
86            .aggr_expr
87            .iter()
88            .map(|e| PyExpr::from(e.clone()))
89            .collect())
90    }
91
92    /// Returns the inner Aggregate Expr(s)
93    pub fn agg_expressions(&self) -> PyResult<Vec<PyExpr>> {
94        Ok(self
95            .aggregate
96            .aggr_expr
97            .iter()
98            .map(|e| PyExpr::from(e.clone()))
99            .collect())
100    }
101
102    pub fn agg_func_name(&self, expr: PyExpr) -> PyResult<String> {
103        Self::_agg_func_name(&expr.expr)
104    }
105
106    pub fn aggregation_arguments(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
107        self._aggregation_arguments(&expr.expr)
108    }
109
110    // Retrieves the input `LogicalPlan` to this `Aggregate` node
111    fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
112        Ok(Self::inputs(self))
113    }
114
115    // Resulting Schema for this `Aggregate` node instance
116    fn schema(&self) -> PyDFSchema {
117        (*self.aggregate.schema).clone().into()
118    }
119
120    fn __repr__(&self) -> PyResult<String> {
121        Ok(format!("Aggregate({self})"))
122    }
123}
124
125impl PyAggregate {
126    #[allow(clippy::only_used_in_recursion)]
127    fn _aggregation_arguments(&self, expr: &Expr) -> PyResult<Vec<PyExpr>> {
128        match expr {
129            // TODO: This Alias logic seems to be returning some strange results that we should investigate
130            Expr::Alias(Alias { expr, .. }) => self._aggregation_arguments(expr.as_ref()),
131            Expr::AggregateFunction(AggregateFunction {
132                func: _,
133                params: AggregateFunctionParams { args, .. },
134                ..
135            }) => Ok(args.iter().map(|e| PyExpr::from(e.clone())).collect()),
136            _ => Err(py_type_err(
137                "Encountered a non Aggregate type in aggregation_arguments",
138            )),
139        }
140    }
141
142    fn _agg_func_name(expr: &Expr) -> PyResult<String> {
143        match expr {
144            Expr::Alias(Alias { expr, .. }) => Self::_agg_func_name(expr.as_ref()),
145            Expr::AggregateFunction(AggregateFunction { func, .. }) => Ok(func.name().to_owned()),
146            _ => Err(py_type_err(
147                "Encountered a non Aggregate type in agg_func_name",
148            )),
149        }
150    }
151}
152
153impl LogicalNode for PyAggregate {
154    fn inputs(&self) -> Vec<PyLogicalPlan> {
155        vec![PyLogicalPlan::from((*self.aggregate.input).clone())]
156    }
157
158    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
159        self.clone().into_bound_py_any(py)
160    }
161}