datafusion_functions_aggregate_common/aggregate/groups_accumulator/
bool_op.rs1use std::sync::Arc;
19
20use crate::aggregate::groups_accumulator::nulls::filtered_null_mask;
21use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder};
22use arrow::buffer::BooleanBuffer;
23use datafusion_common::Result;
24use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
25
26use super::accumulate::NullState;
27
28#[derive(Debug)]
38pub struct BooleanGroupsAccumulator<F>
39where
40 F: Fn(bool, bool) -> bool + Send + Sync,
41{
42 values: BooleanBufferBuilder,
44
45 null_state: NullState,
47
48 bool_fn: F,
50
51 identity: bool,
54}
55
56impl<F> BooleanGroupsAccumulator<F>
57where
58 F: Fn(bool, bool) -> bool + Send + Sync,
59{
60 pub fn new(bool_fn: F, identity: bool) -> Self {
61 Self {
62 values: BooleanBufferBuilder::new(0),
63 null_state: NullState::new(),
64 bool_fn,
65 identity,
66 }
67 }
68}
69
70impl<F> GroupsAccumulator for BooleanGroupsAccumulator<F>
71where
72 F: Fn(bool, bool) -> bool + Send + Sync,
73{
74 fn update_batch(
75 &mut self,
76 values: &[ArrayRef],
77 group_indices: &[usize],
78 opt_filter: Option<&BooleanArray>,
79 total_num_groups: usize,
80 ) -> Result<()> {
81 assert_eq!(values.len(), 1, "single argument to update_batch");
82 let values = values[0].as_boolean();
83
84 if self.values.len() < total_num_groups {
85 let new_groups = total_num_groups - self.values.len();
86 self.values.append_n(new_groups, self.identity);
89 }
90
91 self.null_state.accumulate_boolean(
93 group_indices,
94 values,
95 opt_filter,
96 total_num_groups,
97 |group_index, new_value| {
98 let current_value = self.values.get_bit(group_index);
99 let value = (self.bool_fn)(current_value, new_value);
100 self.values.set_bit(group_index, value);
101 },
102 );
103
104 Ok(())
105 }
106
107 fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
108 let values = self.values.finish();
109
110 let values = match emit_to {
111 EmitTo::All => values,
112 EmitTo::First(n) => {
113 let first_n: BooleanBuffer = values.iter().take(n).collect();
114 for v in values.iter().skip(n) {
116 self.values.append(v);
117 }
118 first_n
119 }
120 };
121
122 let nulls = self.null_state.build(emit_to);
123 let values = BooleanArray::new(values, Some(nulls));
124 Ok(Arc::new(values))
125 }
126
127 fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
128 self.evaluate(emit_to).map(|arr| vec![arr])
129 }
130
131 fn merge_batch(
132 &mut self,
133 values: &[ArrayRef],
134 group_indices: &[usize],
135 opt_filter: Option<&BooleanArray>,
136 total_num_groups: usize,
137 ) -> Result<()> {
138 self.update_batch(values, group_indices, opt_filter, total_num_groups)
140 }
141
142 fn size(&self) -> usize {
143 self.values.capacity() / 8 + self.null_state.size()
145 }
146
147 fn convert_to_state(
148 &self,
149 values: &[ArrayRef],
150 opt_filter: Option<&BooleanArray>,
151 ) -> Result<Vec<ArrayRef>> {
152 let values = values[0].as_boolean().clone();
153
154 let values_null_buffer_filtered = filtered_null_mask(opt_filter, &values);
155 let (values_buf, _) = values.into_parts();
156 let values_filtered = BooleanArray::new(values_buf, values_null_buffer_filtered);
157
158 Ok(vec![Arc::new(values_filtered)])
159 }
160
161 fn supports_convert_to_state(&self) -> bool {
162 true
163 }
164}