datafusion_functions_aggregate/
grouping.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines physical expressions that can evaluated at runtime during query execution
19
20use std::any::Any;
21use std::fmt;
22
23use arrow::datatypes::Field;
24use arrow::datatypes::{DataType, FieldRef};
25use datafusion_common::{not_impl_err, Result};
26use datafusion_expr::function::AccumulatorArgs;
27use datafusion_expr::function::StateFieldsArgs;
28use datafusion_expr::utils::format_state_name;
29use datafusion_expr::{
30    Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility,
31};
32use datafusion_macros::user_doc;
33
34make_udaf_expr_and_func!(
35    Grouping,
36    grouping,
37    expression,
38    "Returns 1 if the data is aggregated across the specified column or 0 for not aggregated in the result set.",
39    grouping_udaf
40);
41
42#[user_doc(
43    doc_section(label = "General Functions"),
44    description = "Returns 1 if the data is aggregated across the specified column, or 0 if it is not aggregated in the result set.",
45    syntax_example = "grouping(expression)",
46    sql_example = r#"```sql
47> SELECT column_name, GROUPING(column_name) AS group_column
48  FROM table_name
49  GROUP BY GROUPING SETS ((column_name), ());
50+-------------+-------------+
51| column_name | group_column |
52+-------------+-------------+
53| value1      | 0           |
54| value2      | 0           |
55| NULL        | 1           |
56+-------------+-------------+
57```"#,
58    argument(
59        name = "expression",
60        description = "Expression to evaluate whether data is aggregated across the specified column. Can be a constant, column, or function."
61    )
62)]
63pub struct Grouping {
64    signature: Signature,
65}
66
67impl fmt::Debug for Grouping {
68    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
69        f.debug_struct("Grouping")
70            .field("name", &self.name())
71            .field("signature", &self.signature)
72            .finish()
73    }
74}
75
76impl Default for Grouping {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82impl Grouping {
83    /// Create a new GROUPING aggregate function.
84    pub fn new() -> Self {
85        Self {
86            signature: Signature::variadic_any(Volatility::Immutable),
87        }
88    }
89}
90
91impl AggregateUDFImpl for Grouping {
92    fn as_any(&self) -> &dyn Any {
93        self
94    }
95
96    fn name(&self) -> &str {
97        "grouping"
98    }
99
100    fn signature(&self) -> &Signature {
101        &self.signature
102    }
103
104    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
105        Ok(DataType::Int32)
106    }
107
108    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
109        Ok(vec![Field::new(
110            format_state_name(args.name, "grouping"),
111            DataType::Int32,
112            true,
113        )
114        .into()])
115    }
116
117    fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
118        not_impl_err!(
119            "physical plan is not yet implemented for GROUPING aggregate function"
120        )
121    }
122
123    fn documentation(&self) -> Option<&Documentation> {
124        self.doc()
125    }
126}