datafusion_functions_aggregate/
bool_and_or.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines physical expressions that can evaluated at runtime during query execution
19
20use std::any::Any;
21use std::mem::size_of_val;
22
23use arrow::array::ArrayRef;
24use arrow::array::BooleanArray;
25use arrow::compute::bool_and as compute_bool_and;
26use arrow::compute::bool_or as compute_bool_or;
27use arrow::datatypes::Field;
28use arrow::datatypes::{DataType, FieldRef};
29
30use datafusion_common::internal_err;
31use datafusion_common::{downcast_value, not_impl_err};
32use datafusion_common::{Result, ScalarValue};
33use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
34use datafusion_expr::utils::{format_state_name, AggregateOrderSensitivity};
35use datafusion_expr::{
36    Accumulator, AggregateUDFImpl, Documentation, GroupsAccumulator, ReversedUDAF,
37    Signature, Volatility,
38};
39
40use datafusion_functions_aggregate_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator;
41use datafusion_macros::user_doc;
42
43// returns the new value after bool_and/bool_or with the new values, taking nullability into account
44macro_rules! typed_bool_and_or_batch {
45    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
46        let array = downcast_value!($VALUES, $ARRAYTYPE);
47        let delta = $OP(array);
48        Ok(ScalarValue::$SCALAR(delta))
49    }};
50}
51
52// bool_and/bool_or the array and returns a ScalarValue of its corresponding type.
53macro_rules! bool_and_or_batch {
54    ($VALUES:expr, $OP:ident) => {{
55        match $VALUES.data_type() {
56            DataType::Boolean => {
57                typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
58            }
59            e => {
60                return internal_err!(
61                    "Bool and/Bool or is not expected to receive the type {e:?}"
62                );
63            }
64        }
65    }};
66}
67
68/// dynamically-typed bool_and(array) -> ScalarValue
69fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
70    bool_and_or_batch!(values, compute_bool_and)
71}
72
73/// dynamically-typed bool_or(array) -> ScalarValue
74fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
75    bool_and_or_batch!(values, compute_bool_or)
76}
77
78make_udaf_expr_and_func!(
79    BoolAnd,
80    bool_and,
81    expression,
82    "The values to combine with `AND`",
83    bool_and_udaf
84);
85
86make_udaf_expr_and_func!(
87    BoolOr,
88    bool_or,
89    expression,
90    "The values to combine with `OR`",
91    bool_or_udaf
92);
93
94#[user_doc(
95    doc_section(label = "General Functions"),
96    description = "Returns true if all non-null input values are true, otherwise false.",
97    syntax_example = "bool_and(expression)",
98    sql_example = r#"```sql
99> SELECT bool_and(column_name) FROM table_name;
100+----------------------------+
101| bool_and(column_name)       |
102+----------------------------+
103| true                        |
104+----------------------------+
105```"#,
106    standard_argument(name = "expression", prefix = "The")
107)]
108/// BOOL_AND aggregate expression
109#[derive(Debug)]
110pub struct BoolAnd {
111    signature: Signature,
112}
113
114impl BoolAnd {
115    fn new() -> Self {
116        Self {
117            signature: Signature::uniform(
118                1,
119                vec![DataType::Boolean],
120                Volatility::Immutable,
121            ),
122        }
123    }
124}
125
126impl Default for BoolAnd {
127    fn default() -> Self {
128        Self::new()
129    }
130}
131
132impl AggregateUDFImpl for BoolAnd {
133    fn as_any(&self) -> &dyn Any {
134        self
135    }
136
137    fn name(&self) -> &str {
138        "bool_and"
139    }
140
141    fn signature(&self) -> &Signature {
142        &self.signature
143    }
144
145    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
146        Ok(DataType::Boolean)
147    }
148
149    fn accumulator(&self, _: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
150        Ok(Box::<BoolAndAccumulator>::default())
151    }
152
153    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
154        Ok(vec![Field::new(
155            format_state_name(args.name, self.name()),
156            DataType::Boolean,
157            true,
158        )
159        .into()])
160    }
161
162    fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
163        true
164    }
165
166    fn create_groups_accumulator(
167        &self,
168        args: AccumulatorArgs,
169    ) -> Result<Box<dyn GroupsAccumulator>> {
170        match args.return_field.data_type() {
171            DataType::Boolean => {
172                Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y, true)))
173            }
174            _ => not_impl_err!(
175                "GroupsAccumulator not supported for {} with {}",
176                args.name,
177                args.return_field.data_type()
178            ),
179        }
180    }
181
182    fn aliases(&self) -> &[String] {
183        &[]
184    }
185
186    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
187        AggregateOrderSensitivity::Insensitive
188    }
189
190    fn reverse_expr(&self) -> ReversedUDAF {
191        ReversedUDAF::Identical
192    }
193
194    fn documentation(&self) -> Option<&Documentation> {
195        self.doc()
196    }
197}
198
199#[derive(Debug, Default)]
200struct BoolAndAccumulator {
201    acc: Option<bool>,
202}
203
204impl Accumulator for BoolAndAccumulator {
205    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
206        let values = &values[0];
207        self.acc = match (self.acc, bool_and_batch(values)?) {
208            (None, ScalarValue::Boolean(v)) => v,
209            (Some(v), ScalarValue::Boolean(None)) => Some(v),
210            (Some(a), ScalarValue::Boolean(Some(b))) => Some(a && b),
211            _ => unreachable!(),
212        };
213        Ok(())
214    }
215
216    fn evaluate(&mut self) -> Result<ScalarValue> {
217        Ok(ScalarValue::Boolean(self.acc))
218    }
219
220    fn size(&self) -> usize {
221        size_of_val(self)
222    }
223
224    fn state(&mut self) -> Result<Vec<ScalarValue>> {
225        Ok(vec![ScalarValue::Boolean(self.acc)])
226    }
227
228    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
229        self.update_batch(states)
230    }
231}
232
233#[user_doc(
234    doc_section(label = "General Functions"),
235    description = "Returns true if all non-null input values are true, otherwise false.",
236    syntax_example = "bool_and(expression)",
237    sql_example = r#"```sql
238> SELECT bool_and(column_name) FROM table_name;
239+----------------------------+
240| bool_and(column_name)       |
241+----------------------------+
242| true                        |
243+----------------------------+
244```"#,
245    standard_argument(name = "expression", prefix = "The")
246)]
247/// BOOL_OR aggregate expression
248#[derive(Debug, Clone)]
249pub struct BoolOr {
250    signature: Signature,
251}
252
253impl BoolOr {
254    fn new() -> Self {
255        Self {
256            signature: Signature::uniform(
257                1,
258                vec![DataType::Boolean],
259                Volatility::Immutable,
260            ),
261        }
262    }
263}
264
265impl Default for BoolOr {
266    fn default() -> Self {
267        Self::new()
268    }
269}
270
271impl AggregateUDFImpl for BoolOr {
272    fn as_any(&self) -> &dyn Any {
273        self
274    }
275
276    fn name(&self) -> &str {
277        "bool_or"
278    }
279
280    fn signature(&self) -> &Signature {
281        &self.signature
282    }
283
284    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
285        Ok(DataType::Boolean)
286    }
287
288    fn accumulator(&self, _: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
289        Ok(Box::<BoolOrAccumulator>::default())
290    }
291
292    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
293        Ok(vec![Field::new(
294            format_state_name(args.name, self.name()),
295            DataType::Boolean,
296            true,
297        )
298        .into()])
299    }
300
301    fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
302        true
303    }
304
305    fn create_groups_accumulator(
306        &self,
307        args: AccumulatorArgs,
308    ) -> Result<Box<dyn GroupsAccumulator>> {
309        match args.return_field.data_type() {
310            DataType::Boolean => Ok(Box::new(BooleanGroupsAccumulator::new(
311                |x, y| x || y,
312                false,
313            ))),
314            _ => not_impl_err!(
315                "GroupsAccumulator not supported for {} with {}",
316                args.name,
317                args.return_field.data_type()
318            ),
319        }
320    }
321
322    fn aliases(&self) -> &[String] {
323        &[]
324    }
325
326    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
327        AggregateOrderSensitivity::Insensitive
328    }
329
330    fn reverse_expr(&self) -> ReversedUDAF {
331        ReversedUDAF::Identical
332    }
333
334    fn documentation(&self) -> Option<&Documentation> {
335        self.doc()
336    }
337}
338
339#[derive(Debug, Default)]
340struct BoolOrAccumulator {
341    acc: Option<bool>,
342}
343
344impl Accumulator for BoolOrAccumulator {
345    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
346        let values = &values[0];
347        self.acc = match (self.acc, bool_or_batch(values)?) {
348            (None, ScalarValue::Boolean(v)) => v,
349            (Some(v), ScalarValue::Boolean(None)) => Some(v),
350            (Some(a), ScalarValue::Boolean(Some(b))) => Some(a || b),
351            _ => unreachable!(),
352        };
353        Ok(())
354    }
355
356    fn evaluate(&mut self) -> Result<ScalarValue> {
357        Ok(ScalarValue::Boolean(self.acc))
358    }
359
360    fn size(&self) -> usize {
361        size_of_val(self)
362    }
363
364    fn state(&mut self) -> Result<Vec<ScalarValue>> {
365        Ok(vec![ScalarValue::Boolean(self.acc)])
366    }
367
368    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
369        self.update_batch(states)
370    }
371}