datafusion_functions_aggregate/
grouping.rs1use arrow::datatypes::Field;
21use arrow::datatypes::{DataType, FieldRef};
22use datafusion_common::{Result, not_impl_err};
23use datafusion_expr::function::AccumulatorArgs;
24use datafusion_expr::function::StateFieldsArgs;
25use datafusion_expr::utils::format_state_name;
26use datafusion_expr::{
27 Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility,
28};
29use datafusion_macros::user_doc;
30
31make_udaf_expr_and_func!(
32 Grouping,
33 grouping,
34 expression,
35 "Returns 1 if the data is aggregated across the specified column or 0 for not aggregated in the result set.",
36 grouping_udaf
37);
38
39#[user_doc(
40 doc_section(label = "General Functions"),
41 description = "Returns 1 if the data is aggregated across the specified column, or 0 if it is not aggregated in the result set.",
42 syntax_example = "grouping(expression)",
43 sql_example = r#"```sql
44> SELECT column_name, GROUPING(column_name) AS group_column
45 FROM table_name
46 GROUP BY GROUPING SETS ((column_name), ());
47+-------------+-------------+
48| column_name | group_column |
49+-------------+-------------+
50| value1 | 0 |
51| value2 | 0 |
52| NULL | 1 |
53+-------------+-------------+
54```"#,
55 argument(
56 name = "expression",
57 description = "Expression to evaluate whether data is aggregated across the specified column. Can be a constant, column, or function."
58 )
59)]
60#[derive(PartialEq, Eq, Hash, Debug)]
61pub struct Grouping {
62 signature: Signature,
63}
64
65impl Default for Grouping {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71impl Grouping {
72 pub fn new() -> Self {
74 Self {
75 signature: Signature::variadic_any(Volatility::Immutable),
76 }
77 }
78}
79
80impl AggregateUDFImpl for Grouping {
81 fn name(&self) -> &str {
82 "grouping"
83 }
84
85 fn signature(&self) -> &Signature {
86 &self.signature
87 }
88
89 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
90 Ok(DataType::Int32)
91 }
92
93 fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
94 Ok(vec![
95 Field::new(
96 format_state_name(args.name, "grouping"),
97 DataType::Int32,
98 true,
99 )
100 .into(),
101 ])
102 }
103
104 fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
105 not_impl_err!(
106 "physical plan is not yet implemented for GROUPING aggregate function"
107 )
108 }
109
110 fn documentation(&self) -> Option<&Documentation> {
111 self.doc()
112 }
113}