Skip to main content

datafusion_python/expr/
aggregate.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Display, Formatter};
19
20use datafusion::common::DataFusionError;
21use datafusion::logical_expr::Expr;
22use datafusion::logical_expr::expr::{AggregateFunction, AggregateFunctionParams, Alias};
23use datafusion::logical_expr::logical_plan::Aggregate;
24use pyo3::IntoPyObjectExt;
25use pyo3::prelude::*;
26
27use super::logical_node::LogicalNode;
28use crate::common::df_schema::PyDFSchema;
29use crate::errors::py_type_err;
30use crate::expr::PyExpr;
31use crate::sql::logical::PyLogicalPlan;
32
33#[pyclass(
34    from_py_object,
35    frozen,
36    name = "Aggregate",
37    module = "datafusion.expr",
38    subclass
39)]
40#[derive(Clone)]
41pub struct PyAggregate {
42    aggregate: Aggregate,
43}
44
45impl From<Aggregate> for PyAggregate {
46    fn from(aggregate: Aggregate) -> PyAggregate {
47        PyAggregate { aggregate }
48    }
49}
50
51impl TryFrom<PyAggregate> for Aggregate {
52    type Error = DataFusionError;
53
54    fn try_from(agg: PyAggregate) -> Result<Self, Self::Error> {
55        Ok(agg.aggregate)
56    }
57}
58
59impl Display for PyAggregate {
60    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
61        write!(
62            f,
63            "Aggregate
64            \nGroupBy(s): {:?}
65            \nAggregates(s): {:?}
66            \nInput: {:?}
67            \nProjected Schema: {:?}",
68            &self.aggregate.group_expr,
69            &self.aggregate.aggr_expr,
70            self.aggregate.input,
71            self.aggregate.schema
72        )
73    }
74}
75
76#[pymethods]
77impl PyAggregate {
78    /// Retrieves the grouping expressions for this `Aggregate`
79    fn group_by_exprs(&self) -> PyResult<Vec<PyExpr>> {
80        Ok(self
81            .aggregate
82            .group_expr
83            .iter()
84            .map(|e| PyExpr::from(e.clone()))
85            .collect())
86    }
87
88    /// Retrieves the aggregate expressions for this `Aggregate`
89    fn aggregate_exprs(&self) -> PyResult<Vec<PyExpr>> {
90        Ok(self
91            .aggregate
92            .aggr_expr
93            .iter()
94            .map(|e| PyExpr::from(e.clone()))
95            .collect())
96    }
97
98    /// Returns the inner Aggregate Expr(s)
99    pub fn agg_expressions(&self) -> PyResult<Vec<PyExpr>> {
100        Ok(self
101            .aggregate
102            .aggr_expr
103            .iter()
104            .map(|e| PyExpr::from(e.clone()))
105            .collect())
106    }
107
108    pub fn agg_func_name(&self, expr: PyExpr) -> PyResult<String> {
109        Self::_agg_func_name(&expr.expr)
110    }
111
112    pub fn aggregation_arguments(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
113        self._aggregation_arguments(&expr.expr)
114    }
115
116    // Retrieves the input `LogicalPlan` to this `Aggregate` node
117    fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
118        Ok(Self::inputs(self))
119    }
120
121    // Resulting Schema for this `Aggregate` node instance
122    fn schema(&self) -> PyDFSchema {
123        (*self.aggregate.schema).clone().into()
124    }
125
126    fn __repr__(&self) -> PyResult<String> {
127        Ok(format!("Aggregate({self})"))
128    }
129}
130
131impl PyAggregate {
132    #[allow(clippy::only_used_in_recursion)]
133    fn _aggregation_arguments(&self, expr: &Expr) -> PyResult<Vec<PyExpr>> {
134        match expr {
135            // TODO: This Alias logic seems to be returning some strange results that we should investigate
136            Expr::Alias(Alias { expr, .. }) => self._aggregation_arguments(expr.as_ref()),
137            Expr::AggregateFunction(AggregateFunction {
138                func: _,
139                params: AggregateFunctionParams { args, .. },
140                ..
141            }) => Ok(args.iter().map(|e| PyExpr::from(e.clone())).collect()),
142            _ => Err(py_type_err(
143                "Encountered a non Aggregate type in aggregation_arguments",
144            )),
145        }
146    }
147
148    fn _agg_func_name(expr: &Expr) -> PyResult<String> {
149        match expr {
150            Expr::Alias(Alias { expr, .. }) => Self::_agg_func_name(expr.as_ref()),
151            Expr::AggregateFunction(AggregateFunction { func, .. }) => Ok(func.name().to_owned()),
152            _ => Err(py_type_err(
153                "Encountered a non Aggregate type in agg_func_name",
154            )),
155        }
156    }
157}
158
159impl LogicalNode for PyAggregate {
160    fn inputs(&self) -> Vec<PyLogicalPlan> {
161        vec![PyLogicalPlan::from((*self.aggregate.input).clone())]
162    }
163
164    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
165        self.clone().into_bound_py_any(py)
166    }
167}