datafusion_functions_aggregate/
bool_and_or.rs1use std::any::Any;
21use std::mem::size_of_val;
22
23use arrow::array::ArrayRef;
24use arrow::array::BooleanArray;
25use arrow::compute::bool_and as compute_bool_and;
26use arrow::compute::bool_or as compute_bool_or;
27use arrow::datatypes::Field;
28use arrow::datatypes::{DataType, FieldRef};
29
30use datafusion_common::internal_err;
31use datafusion_common::{Result, ScalarValue};
32use datafusion_common::{downcast_value, not_impl_err};
33use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
34use datafusion_expr::utils::{AggregateOrderSensitivity, format_state_name};
35use datafusion_expr::{
36 Accumulator, AggregateUDFImpl, Documentation, GroupsAccumulator, ReversedUDAF,
37 Signature, Volatility,
38};
39
40use datafusion_functions_aggregate_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator;
41use datafusion_macros::user_doc;
42
43macro_rules! typed_bool_and_or_batch {
45 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
46 let array = downcast_value!($VALUES, $ARRAYTYPE);
47 let delta = $OP(array);
48 Ok(ScalarValue::$SCALAR(delta))
49 }};
50}
51
52macro_rules! bool_and_or_batch {
54 ($VALUES:expr, $OP:ident) => {{
55 match $VALUES.data_type() {
56 DataType::Boolean => {
57 typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
58 }
59 e => {
60 return internal_err!(
61 "Bool and/Bool or is not expected to receive the type {e:?}"
62 );
63 }
64 }
65 }};
66}
67
68fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
70 bool_and_or_batch!(values, compute_bool_and)
71}
72
73fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
75 bool_and_or_batch!(values, compute_bool_or)
76}
77
78make_udaf_expr_and_func!(
79 BoolAnd,
80 bool_and,
81 expression,
82 "The values to combine with `AND`",
83 bool_and_udaf
84);
85
86make_udaf_expr_and_func!(
87 BoolOr,
88 bool_or,
89 expression,
90 "The values to combine with `OR`",
91 bool_or_udaf
92);
93
94#[user_doc(
95 doc_section(label = "General Functions"),
96 description = "Returns true if all non-null input values are true, otherwise false.",
97 syntax_example = "bool_and(expression)",
98 sql_example = r#"```sql
99> SELECT bool_and(column_name) FROM table_name;
100+----------------------------+
101| bool_and(column_name) |
102+----------------------------+
103| true |
104+----------------------------+
105```"#,
106 standard_argument(name = "expression", prefix = "The")
107)]
108#[derive(Debug, PartialEq, Eq, Hash)]
110pub struct BoolAnd {
111 signature: Signature,
112}
113
114impl BoolAnd {
115 fn new() -> Self {
116 Self {
117 signature: Signature::exact(vec![DataType::Boolean], Volatility::Immutable),
118 }
119 }
120}
121
122impl Default for BoolAnd {
123 fn default() -> Self {
124 Self::new()
125 }
126}
127
128impl AggregateUDFImpl for BoolAnd {
129 fn as_any(&self) -> &dyn Any {
130 self
131 }
132
133 fn name(&self) -> &str {
134 "bool_and"
135 }
136
137 fn signature(&self) -> &Signature {
138 &self.signature
139 }
140
141 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
142 Ok(DataType::Boolean)
143 }
144
145 fn accumulator(&self, _: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
146 Ok(Box::<BoolAndAccumulator>::default())
147 }
148
149 fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
150 Ok(vec![
151 Field::new(
152 format_state_name(args.name, self.name()),
153 DataType::Boolean,
154 true,
155 )
156 .into(),
157 ])
158 }
159
160 fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
161 true
162 }
163
164 fn create_groups_accumulator(
165 &self,
166 args: AccumulatorArgs,
167 ) -> Result<Box<dyn GroupsAccumulator>> {
168 match args.return_field.data_type() {
169 DataType::Boolean => {
170 Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y, true)))
171 }
172 _ => not_impl_err!(
173 "GroupsAccumulator not supported for {} with {}",
174 args.name,
175 args.return_field.data_type()
176 ),
177 }
178 }
179
180 fn order_sensitivity(&self) -> AggregateOrderSensitivity {
181 AggregateOrderSensitivity::Insensitive
182 }
183
184 fn reverse_expr(&self) -> ReversedUDAF {
185 ReversedUDAF::Identical
186 }
187
188 fn documentation(&self) -> Option<&Documentation> {
189 self.doc()
190 }
191}
192
193#[derive(Debug, Default)]
194struct BoolAndAccumulator {
195 acc: Option<bool>,
196}
197
198impl Accumulator for BoolAndAccumulator {
199 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
200 let values = &values[0];
201 self.acc = match (self.acc, bool_and_batch(values)?) {
202 (None, ScalarValue::Boolean(v)) => v,
203 (Some(v), ScalarValue::Boolean(None)) => Some(v),
204 (Some(a), ScalarValue::Boolean(Some(b))) => Some(a && b),
205 _ => unreachable!(),
206 };
207 Ok(())
208 }
209
210 fn evaluate(&mut self) -> Result<ScalarValue> {
211 Ok(ScalarValue::Boolean(self.acc))
212 }
213
214 fn size(&self) -> usize {
215 size_of_val(self)
216 }
217
218 fn state(&mut self) -> Result<Vec<ScalarValue>> {
219 Ok(vec![ScalarValue::Boolean(self.acc)])
220 }
221
222 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
223 self.update_batch(states)
224 }
225}
226
227#[user_doc(
228 doc_section(label = "General Functions"),
229 description = "Returns true if all non-null input values are true, otherwise false.",
230 syntax_example = "bool_and(expression)",
231 sql_example = r#"```sql
232> SELECT bool_and(column_name) FROM table_name;
233+----------------------------+
234| bool_and(column_name) |
235+----------------------------+
236| true |
237+----------------------------+
238```"#,
239 standard_argument(name = "expression", prefix = "The")
240)]
241#[derive(Debug, Clone, PartialEq, Eq, Hash)]
243pub struct BoolOr {
244 signature: Signature,
245}
246
247impl BoolOr {
248 fn new() -> Self {
249 Self {
250 signature: Signature::exact(vec![DataType::Boolean], Volatility::Immutable),
251 }
252 }
253}
254
255impl Default for BoolOr {
256 fn default() -> Self {
257 Self::new()
258 }
259}
260
261impl AggregateUDFImpl for BoolOr {
262 fn as_any(&self) -> &dyn Any {
263 self
264 }
265
266 fn name(&self) -> &str {
267 "bool_or"
268 }
269
270 fn signature(&self) -> &Signature {
271 &self.signature
272 }
273
274 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
275 Ok(DataType::Boolean)
276 }
277
278 fn accumulator(&self, _: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
279 Ok(Box::<BoolOrAccumulator>::default())
280 }
281
282 fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
283 Ok(vec![
284 Field::new(
285 format_state_name(args.name, self.name()),
286 DataType::Boolean,
287 true,
288 )
289 .into(),
290 ])
291 }
292
293 fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
294 true
295 }
296
297 fn create_groups_accumulator(
298 &self,
299 args: AccumulatorArgs,
300 ) -> Result<Box<dyn GroupsAccumulator>> {
301 match args.return_field.data_type() {
302 DataType::Boolean => Ok(Box::new(BooleanGroupsAccumulator::new(
303 |x, y| x || y,
304 false,
305 ))),
306 _ => not_impl_err!(
307 "GroupsAccumulator not supported for {} with {}",
308 args.name,
309 args.return_field.data_type()
310 ),
311 }
312 }
313
314 fn order_sensitivity(&self) -> AggregateOrderSensitivity {
315 AggregateOrderSensitivity::Insensitive
316 }
317
318 fn reverse_expr(&self) -> ReversedUDAF {
319 ReversedUDAF::Identical
320 }
321
322 fn documentation(&self) -> Option<&Documentation> {
323 self.doc()
324 }
325}
326
327#[derive(Debug, Default)]
328struct BoolOrAccumulator {
329 acc: Option<bool>,
330}
331
332impl Accumulator for BoolOrAccumulator {
333 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
334 let values = &values[0];
335 self.acc = match (self.acc, bool_or_batch(values)?) {
336 (None, ScalarValue::Boolean(v)) => v,
337 (Some(v), ScalarValue::Boolean(None)) => Some(v),
338 (Some(a), ScalarValue::Boolean(Some(b))) => Some(a || b),
339 _ => unreachable!(),
340 };
341 Ok(())
342 }
343
344 fn evaluate(&mut self) -> Result<ScalarValue> {
345 Ok(ScalarValue::Boolean(self.acc))
346 }
347
348 fn size(&self) -> usize {
349 size_of_val(self)
350 }
351
352 fn state(&mut self) -> Result<Vec<ScalarValue>> {
353 Ok(vec![ScalarValue::Boolean(self.acc)])
354 }
355
356 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
357 self.update_batch(states)
358 }
359}