datafusion_functions_aggregate/
bool_and_or.rs1use std::any::Any;
21use std::mem::size_of_val;
22
23use arrow::array::ArrayRef;
24use arrow::array::BooleanArray;
25use arrow::compute::bool_and as compute_bool_and;
26use arrow::compute::bool_or as compute_bool_or;
27use arrow::datatypes::Field;
28use arrow::datatypes::{DataType, FieldRef};
29
30use datafusion_common::internal_err;
31use datafusion_common::{downcast_value, not_impl_err};
32use datafusion_common::{Result, ScalarValue};
33use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
34use datafusion_expr::utils::{format_state_name, AggregateOrderSensitivity};
35use datafusion_expr::{
36 Accumulator, AggregateUDFImpl, Documentation, GroupsAccumulator, ReversedUDAF,
37 Signature, Volatility,
38};
39
40use datafusion_functions_aggregate_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator;
41use datafusion_macros::user_doc;
42
43macro_rules! typed_bool_and_or_batch {
45 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
46 let array = downcast_value!($VALUES, $ARRAYTYPE);
47 let delta = $OP(array);
48 Ok(ScalarValue::$SCALAR(delta))
49 }};
50}
51
52macro_rules! bool_and_or_batch {
54 ($VALUES:expr, $OP:ident) => {{
55 match $VALUES.data_type() {
56 DataType::Boolean => {
57 typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
58 }
59 e => {
60 return internal_err!(
61 "Bool and/Bool or is not expected to receive the type {e:?}"
62 );
63 }
64 }
65 }};
66}
67
68fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
70 bool_and_or_batch!(values, compute_bool_and)
71}
72
73fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
75 bool_and_or_batch!(values, compute_bool_or)
76}
77
78make_udaf_expr_and_func!(
79 BoolAnd,
80 bool_and,
81 expression,
82 "The values to combine with `AND`",
83 bool_and_udaf
84);
85
86make_udaf_expr_and_func!(
87 BoolOr,
88 bool_or,
89 expression,
90 "The values to combine with `OR`",
91 bool_or_udaf
92);
93
94#[user_doc(
95 doc_section(label = "General Functions"),
96 description = "Returns true if all non-null input values are true, otherwise false.",
97 syntax_example = "bool_and(expression)",
98 sql_example = r#"```sql
99> SELECT bool_and(column_name) FROM table_name;
100+----------------------------+
101| bool_and(column_name) |
102+----------------------------+
103| true |
104+----------------------------+
105```"#,
106 standard_argument(name = "expression", prefix = "The")
107)]
108#[derive(Debug)]
110pub struct BoolAnd {
111 signature: Signature,
112}
113
114impl BoolAnd {
115 fn new() -> Self {
116 Self {
117 signature: Signature::uniform(
118 1,
119 vec![DataType::Boolean],
120 Volatility::Immutable,
121 ),
122 }
123 }
124}
125
126impl Default for BoolAnd {
127 fn default() -> Self {
128 Self::new()
129 }
130}
131
132impl AggregateUDFImpl for BoolAnd {
133 fn as_any(&self) -> &dyn Any {
134 self
135 }
136
137 fn name(&self) -> &str {
138 "bool_and"
139 }
140
141 fn signature(&self) -> &Signature {
142 &self.signature
143 }
144
145 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
146 Ok(DataType::Boolean)
147 }
148
149 fn accumulator(&self, _: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
150 Ok(Box::<BoolAndAccumulator>::default())
151 }
152
153 fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
154 Ok(vec![Field::new(
155 format_state_name(args.name, self.name()),
156 DataType::Boolean,
157 true,
158 )
159 .into()])
160 }
161
162 fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
163 true
164 }
165
166 fn create_groups_accumulator(
167 &self,
168 args: AccumulatorArgs,
169 ) -> Result<Box<dyn GroupsAccumulator>> {
170 match args.return_field.data_type() {
171 DataType::Boolean => {
172 Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y, true)))
173 }
174 _ => not_impl_err!(
175 "GroupsAccumulator not supported for {} with {}",
176 args.name,
177 args.return_field.data_type()
178 ),
179 }
180 }
181
182 fn aliases(&self) -> &[String] {
183 &[]
184 }
185
186 fn order_sensitivity(&self) -> AggregateOrderSensitivity {
187 AggregateOrderSensitivity::Insensitive
188 }
189
190 fn reverse_expr(&self) -> ReversedUDAF {
191 ReversedUDAF::Identical
192 }
193
194 fn documentation(&self) -> Option<&Documentation> {
195 self.doc()
196 }
197}
198
199#[derive(Debug, Default)]
200struct BoolAndAccumulator {
201 acc: Option<bool>,
202}
203
204impl Accumulator for BoolAndAccumulator {
205 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
206 let values = &values[0];
207 self.acc = match (self.acc, bool_and_batch(values)?) {
208 (None, ScalarValue::Boolean(v)) => v,
209 (Some(v), ScalarValue::Boolean(None)) => Some(v),
210 (Some(a), ScalarValue::Boolean(Some(b))) => Some(a && b),
211 _ => unreachable!(),
212 };
213 Ok(())
214 }
215
216 fn evaluate(&mut self) -> Result<ScalarValue> {
217 Ok(ScalarValue::Boolean(self.acc))
218 }
219
220 fn size(&self) -> usize {
221 size_of_val(self)
222 }
223
224 fn state(&mut self) -> Result<Vec<ScalarValue>> {
225 Ok(vec![ScalarValue::Boolean(self.acc)])
226 }
227
228 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
229 self.update_batch(states)
230 }
231}
232
233#[user_doc(
234 doc_section(label = "General Functions"),
235 description = "Returns true if all non-null input values are true, otherwise false.",
236 syntax_example = "bool_and(expression)",
237 sql_example = r#"```sql
238> SELECT bool_and(column_name) FROM table_name;
239+----------------------------+
240| bool_and(column_name) |
241+----------------------------+
242| true |
243+----------------------------+
244```"#,
245 standard_argument(name = "expression", prefix = "The")
246)]
247#[derive(Debug, Clone)]
249pub struct BoolOr {
250 signature: Signature,
251}
252
253impl BoolOr {
254 fn new() -> Self {
255 Self {
256 signature: Signature::uniform(
257 1,
258 vec![DataType::Boolean],
259 Volatility::Immutable,
260 ),
261 }
262 }
263}
264
265impl Default for BoolOr {
266 fn default() -> Self {
267 Self::new()
268 }
269}
270
271impl AggregateUDFImpl for BoolOr {
272 fn as_any(&self) -> &dyn Any {
273 self
274 }
275
276 fn name(&self) -> &str {
277 "bool_or"
278 }
279
280 fn signature(&self) -> &Signature {
281 &self.signature
282 }
283
284 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
285 Ok(DataType::Boolean)
286 }
287
288 fn accumulator(&self, _: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
289 Ok(Box::<BoolOrAccumulator>::default())
290 }
291
292 fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
293 Ok(vec![Field::new(
294 format_state_name(args.name, self.name()),
295 DataType::Boolean,
296 true,
297 )
298 .into()])
299 }
300
301 fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
302 true
303 }
304
305 fn create_groups_accumulator(
306 &self,
307 args: AccumulatorArgs,
308 ) -> Result<Box<dyn GroupsAccumulator>> {
309 match args.return_field.data_type() {
310 DataType::Boolean => Ok(Box::new(BooleanGroupsAccumulator::new(
311 |x, y| x || y,
312 false,
313 ))),
314 _ => not_impl_err!(
315 "GroupsAccumulator not supported for {} with {}",
316 args.name,
317 args.return_field.data_type()
318 ),
319 }
320 }
321
322 fn aliases(&self) -> &[String] {
323 &[]
324 }
325
326 fn order_sensitivity(&self) -> AggregateOrderSensitivity {
327 AggregateOrderSensitivity::Insensitive
328 }
329
330 fn reverse_expr(&self) -> ReversedUDAF {
331 ReversedUDAF::Identical
332 }
333
334 fn documentation(&self) -> Option<&Documentation> {
335 self.doc()
336 }
337}
338
339#[derive(Debug, Default)]
340struct BoolOrAccumulator {
341 acc: Option<bool>,
342}
343
344impl Accumulator for BoolOrAccumulator {
345 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
346 let values = &values[0];
347 self.acc = match (self.acc, bool_or_batch(values)?) {
348 (None, ScalarValue::Boolean(v)) => v,
349 (Some(v), ScalarValue::Boolean(None)) => Some(v),
350 (Some(a), ScalarValue::Boolean(Some(b))) => Some(a || b),
351 _ => unreachable!(),
352 };
353 Ok(())
354 }
355
356 fn evaluate(&mut self) -> Result<ScalarValue> {
357 Ok(ScalarValue::Boolean(self.acc))
358 }
359
360 fn size(&self) -> usize {
361 size_of_val(self)
362 }
363
364 fn state(&mut self) -> Result<Vec<ScalarValue>> {
365 Ok(vec![ScalarValue::Boolean(self.acc)])
366 }
367
368 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
369 self.update_batch(states)
370 }
371}