datafusion_functions_aggregate_common/aggregate/groups_accumulator/
bool_op.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::aggregate::groups_accumulator::nulls::filtered_null_mask;
21use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder};
22use arrow::buffer::BooleanBuffer;
23use datafusion_common::Result;
24use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
25
26use super::accumulate::NullState;
27
28/// An accumulator that implements a single operation over a
29/// [`BooleanArray`] where the accumulated state is also boolean (such
30/// as [`BitAndAssign`])
31///
32/// F: The function to apply to two elements. The first argument is
33/// the existing value and should be updated with the second value
34/// (e.g. [`BitAndAssign`] style).
35///
36/// [`BitAndAssign`]: std::ops::BitAndAssign
37#[derive(Debug)]
38pub struct BooleanGroupsAccumulator<F>
39where
40    F: Fn(bool, bool) -> bool + Send + Sync,
41{
42    /// values per group
43    values: BooleanBufferBuilder,
44
45    /// Track nulls in the input / filters
46    null_state: NullState,
47
48    /// Function that computes the output
49    bool_fn: F,
50
51    /// The identity element for the boolean operation.
52    /// Any value combined with this returns the original value.
53    identity: bool,
54}
55
56impl<F> BooleanGroupsAccumulator<F>
57where
58    F: Fn(bool, bool) -> bool + Send + Sync,
59{
60    pub fn new(bool_fn: F, identity: bool) -> Self {
61        Self {
62            values: BooleanBufferBuilder::new(0),
63            null_state: NullState::new(),
64            bool_fn,
65            identity,
66        }
67    }
68}
69
70impl<F> GroupsAccumulator for BooleanGroupsAccumulator<F>
71where
72    F: Fn(bool, bool) -> bool + Send + Sync,
73{
74    fn update_batch(
75        &mut self,
76        values: &[ArrayRef],
77        group_indices: &[usize],
78        opt_filter: Option<&BooleanArray>,
79        total_num_groups: usize,
80    ) -> Result<()> {
81        assert_eq!(values.len(), 1, "single argument to update_batch");
82        let values = values[0].as_boolean();
83
84        if self.values.len() < total_num_groups {
85            let new_groups = total_num_groups - self.values.len();
86            // Fill with the identity element, so that when the first non-null value is encountered,
87            // it will combine with the identity and the result will be the first non-null value itself.
88            self.values.append_n(new_groups, self.identity);
89        }
90
91        // NullState dispatches / handles tracking nulls and groups that saw no values
92        self.null_state.accumulate_boolean(
93            group_indices,
94            values,
95            opt_filter,
96            total_num_groups,
97            |group_index, new_value| {
98                let current_value = self.values.get_bit(group_index);
99                let value = (self.bool_fn)(current_value, new_value);
100                self.values.set_bit(group_index, value);
101            },
102        );
103
104        Ok(())
105    }
106
107    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
108        let values = self.values.finish();
109
110        let values = match emit_to {
111            EmitTo::All => values,
112            EmitTo::First(n) => {
113                let first_n: BooleanBuffer = values.iter().take(n).collect();
114                // put n+1 back into self.values
115                for v in values.iter().skip(n) {
116                    self.values.append(v);
117                }
118                first_n
119            }
120        };
121
122        let nulls = self.null_state.build(emit_to);
123        let values = BooleanArray::new(values, Some(nulls));
124        Ok(Arc::new(values))
125    }
126
127    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
128        self.evaluate(emit_to).map(|arr| vec![arr])
129    }
130
131    fn merge_batch(
132        &mut self,
133        values: &[ArrayRef],
134        group_indices: &[usize],
135        opt_filter: Option<&BooleanArray>,
136        total_num_groups: usize,
137    ) -> Result<()> {
138        // update / merge are the same
139        self.update_batch(values, group_indices, opt_filter, total_num_groups)
140    }
141
142    fn size(&self) -> usize {
143        // capacity is in bits, so convert to bytes
144        self.values.capacity() / 8 + self.null_state.size()
145    }
146
147    fn convert_to_state(
148        &self,
149        values: &[ArrayRef],
150        opt_filter: Option<&BooleanArray>,
151    ) -> Result<Vec<ArrayRef>> {
152        let values = values[0].as_boolean().clone();
153
154        let values_null_buffer_filtered = filtered_null_mask(opt_filter, &values);
155        let (values_buf, _) = values.into_parts();
156        let values_filtered = BooleanArray::new(values_buf, values_null_buffer_filtered);
157
158        Ok(vec![Arc::new(values_filtered)])
159    }
160
161    fn supports_convert_to_state(&self) -> bool {
162        true
163    }
164}