datafusion_python/expr/
aggregate.rs1use std::fmt::{self, Display, Formatter};
19
20use datafusion::common::DataFusionError;
21use datafusion::logical_expr::expr::{AggregateFunction, AggregateFunctionParams, Alias};
22use datafusion::logical_expr::logical_plan::Aggregate;
23use datafusion::logical_expr::Expr;
24use pyo3::prelude::*;
25use pyo3::IntoPyObjectExt;
26
27use super::logical_node::LogicalNode;
28use crate::common::df_schema::PyDFSchema;
29use crate::errors::py_type_err;
30use crate::expr::PyExpr;
31use crate::sql::logical::PyLogicalPlan;
32
33#[pyclass(frozen, name = "Aggregate", module = "datafusion.expr", subclass)]
34#[derive(Clone)]
35pub struct PyAggregate {
36 aggregate: Aggregate,
37}
38
39impl From<Aggregate> for PyAggregate {
40 fn from(aggregate: Aggregate) -> PyAggregate {
41 PyAggregate { aggregate }
42 }
43}
44
45impl TryFrom<PyAggregate> for Aggregate {
46 type Error = DataFusionError;
47
48 fn try_from(agg: PyAggregate) -> Result<Self, Self::Error> {
49 Ok(agg.aggregate)
50 }
51}
52
53impl Display for PyAggregate {
54 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
55 write!(
56 f,
57 "Aggregate
58 \nGroupBy(s): {:?}
59 \nAggregates(s): {:?}
60 \nInput: {:?}
61 \nProjected Schema: {:?}",
62 &self.aggregate.group_expr,
63 &self.aggregate.aggr_expr,
64 self.aggregate.input,
65 self.aggregate.schema
66 )
67 }
68}
69
70#[pymethods]
71impl PyAggregate {
72 fn group_by_exprs(&self) -> PyResult<Vec<PyExpr>> {
74 Ok(self
75 .aggregate
76 .group_expr
77 .iter()
78 .map(|e| PyExpr::from(e.clone()))
79 .collect())
80 }
81
82 fn aggregate_exprs(&self) -> PyResult<Vec<PyExpr>> {
84 Ok(self
85 .aggregate
86 .aggr_expr
87 .iter()
88 .map(|e| PyExpr::from(e.clone()))
89 .collect())
90 }
91
92 pub fn agg_expressions(&self) -> PyResult<Vec<PyExpr>> {
94 Ok(self
95 .aggregate
96 .aggr_expr
97 .iter()
98 .map(|e| PyExpr::from(e.clone()))
99 .collect())
100 }
101
102 pub fn agg_func_name(&self, expr: PyExpr) -> PyResult<String> {
103 Self::_agg_func_name(&expr.expr)
104 }
105
106 pub fn aggregation_arguments(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
107 self._aggregation_arguments(&expr.expr)
108 }
109
110 fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
112 Ok(Self::inputs(self))
113 }
114
115 fn schema(&self) -> PyDFSchema {
117 (*self.aggregate.schema).clone().into()
118 }
119
120 fn __repr__(&self) -> PyResult<String> {
121 Ok(format!("Aggregate({self})"))
122 }
123}
124
125impl PyAggregate {
126 #[allow(clippy::only_used_in_recursion)]
127 fn _aggregation_arguments(&self, expr: &Expr) -> PyResult<Vec<PyExpr>> {
128 match expr {
129 Expr::Alias(Alias { expr, .. }) => self._aggregation_arguments(expr.as_ref()),
131 Expr::AggregateFunction(AggregateFunction {
132 func: _,
133 params: AggregateFunctionParams { args, .. },
134 ..
135 }) => Ok(args.iter().map(|e| PyExpr::from(e.clone())).collect()),
136 _ => Err(py_type_err(
137 "Encountered a non Aggregate type in aggregation_arguments",
138 )),
139 }
140 }
141
142 fn _agg_func_name(expr: &Expr) -> PyResult<String> {
143 match expr {
144 Expr::Alias(Alias { expr, .. }) => Self::_agg_func_name(expr.as_ref()),
145 Expr::AggregateFunction(AggregateFunction { func, .. }) => Ok(func.name().to_owned()),
146 _ => Err(py_type_err(
147 "Encountered a non Aggregate type in agg_func_name",
148 )),
149 }
150 }
151}
152
153impl LogicalNode for PyAggregate {
154 fn inputs(&self) -> Vec<PyLogicalPlan> {
155 vec![PyLogicalPlan::from((*self.aggregate.input).clone())]
156 }
157
158 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
159 self.clone().into_bound_py_any(py)
160 }
161}