datafusion_python/expr/
aggregate.rs1use std::fmt::{self, Display, Formatter};
19
20use datafusion::common::DataFusionError;
21use datafusion::logical_expr::Expr;
22use datafusion::logical_expr::expr::{AggregateFunction, AggregateFunctionParams, Alias};
23use datafusion::logical_expr::logical_plan::Aggregate;
24use pyo3::IntoPyObjectExt;
25use pyo3::prelude::*;
26
27use super::logical_node::LogicalNode;
28use crate::common::df_schema::PyDFSchema;
29use crate::errors::py_type_err;
30use crate::expr::PyExpr;
31use crate::sql::logical::PyLogicalPlan;
32
33#[pyclass(
34 from_py_object,
35 frozen,
36 name = "Aggregate",
37 module = "datafusion.expr",
38 subclass
39)]
40#[derive(Clone)]
41pub struct PyAggregate {
42 aggregate: Aggregate,
43}
44
45impl From<Aggregate> for PyAggregate {
46 fn from(aggregate: Aggregate) -> PyAggregate {
47 PyAggregate { aggregate }
48 }
49}
50
51impl TryFrom<PyAggregate> for Aggregate {
52 type Error = DataFusionError;
53
54 fn try_from(agg: PyAggregate) -> Result<Self, Self::Error> {
55 Ok(agg.aggregate)
56 }
57}
58
59impl Display for PyAggregate {
60 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
61 write!(
62 f,
63 "Aggregate
64 \nGroupBy(s): {:?}
65 \nAggregates(s): {:?}
66 \nInput: {:?}
67 \nProjected Schema: {:?}",
68 &self.aggregate.group_expr,
69 &self.aggregate.aggr_expr,
70 self.aggregate.input,
71 self.aggregate.schema
72 )
73 }
74}
75
76#[pymethods]
77impl PyAggregate {
78 fn group_by_exprs(&self) -> PyResult<Vec<PyExpr>> {
80 Ok(self
81 .aggregate
82 .group_expr
83 .iter()
84 .map(|e| PyExpr::from(e.clone()))
85 .collect())
86 }
87
88 fn aggregate_exprs(&self) -> PyResult<Vec<PyExpr>> {
90 Ok(self
91 .aggregate
92 .aggr_expr
93 .iter()
94 .map(|e| PyExpr::from(e.clone()))
95 .collect())
96 }
97
98 pub fn agg_expressions(&self) -> PyResult<Vec<PyExpr>> {
100 Ok(self
101 .aggregate
102 .aggr_expr
103 .iter()
104 .map(|e| PyExpr::from(e.clone()))
105 .collect())
106 }
107
108 pub fn agg_func_name(&self, expr: PyExpr) -> PyResult<String> {
109 Self::_agg_func_name(&expr.expr)
110 }
111
112 pub fn aggregation_arguments(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
113 self._aggregation_arguments(&expr.expr)
114 }
115
116 fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
118 Ok(Self::inputs(self))
119 }
120
121 fn schema(&self) -> PyDFSchema {
123 (*self.aggregate.schema).clone().into()
124 }
125
126 fn __repr__(&self) -> PyResult<String> {
127 Ok(format!("Aggregate({self})"))
128 }
129}
130
131impl PyAggregate {
132 #[allow(clippy::only_used_in_recursion)]
133 fn _aggregation_arguments(&self, expr: &Expr) -> PyResult<Vec<PyExpr>> {
134 match expr {
135 Expr::Alias(Alias { expr, .. }) => self._aggregation_arguments(expr.as_ref()),
137 Expr::AggregateFunction(AggregateFunction {
138 func: _,
139 params: AggregateFunctionParams { args, .. },
140 ..
141 }) => Ok(args.iter().map(|e| PyExpr::from(e.clone())).collect()),
142 _ => Err(py_type_err(
143 "Encountered a non Aggregate type in aggregation_arguments",
144 )),
145 }
146 }
147
148 fn _agg_func_name(expr: &Expr) -> PyResult<String> {
149 match expr {
150 Expr::Alias(Alias { expr, .. }) => Self::_agg_func_name(expr.as_ref()),
151 Expr::AggregateFunction(AggregateFunction { func, .. }) => Ok(func.name().to_owned()),
152 _ => Err(py_type_err(
153 "Encountered a non Aggregate type in agg_func_name",
154 )),
155 }
156 }
157}
158
159impl LogicalNode for PyAggregate {
160 fn inputs(&self) -> Vec<PyLogicalPlan> {
161 vec![PyLogicalPlan::from((*self.aggregate.input).clone())]
162 }
163
164 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
165 self.clone().into_bound_py_any(py)
166 }
167}