vibesql_executor/select/columnar/executor/
mod.rs1#![allow(clippy::needless_range_loop, clippy::unnecessary_map_or)]
7mod aggregate;
29mod expression;
30mod fused;
31
32use aggregate::compute_batch_aggregates;
33use fused::{can_use_fused_aggregation, execute_fused_filter_aggregate};
34use vibesql_storage::Row;
35use vibesql_types::SqlValue;
36
37use super::{
38 aggregate::{AggregateOp, AggregateSpec},
39 batch::ColumnarBatch,
40 filter::ColumnPredicate,
41 simd_filter::simd_filter_batch,
42};
43use crate::{errors::ExecutorError, schema::CombinedSchema};
44
45pub fn execute_columnar_batch(
62 batch: &ColumnarBatch,
63 predicates: &[ColumnPredicate],
64 aggregates: &[AggregateSpec],
65 _schema: Option<&CombinedSchema>,
66) -> Result<Vec<Row>, ExecutorError> {
67 if batch.row_count() == 0 {
69 let values: Vec<SqlValue> = aggregates
70 .iter()
71 .map(|spec| match spec.op {
72 AggregateOp::Count => SqlValue::Integer(0),
73 _ => SqlValue::Null,
74 })
75 .collect();
76 return Ok(vec![Row::new(values)]);
77 }
78
79 if !predicates.is_empty() && can_use_fused_aggregation(aggregates) {
82 if let Ok(results) = execute_fused_filter_aggregate(batch, predicates, aggregates) {
83 return Ok(vec![Row::new(results)]);
84 }
85 }
87
88 #[cfg(feature = "profile-q6")]
93 let filter_start = std::time::Instant::now();
94
95 let filtered_batch =
96 if predicates.is_empty() { batch.clone() } else { simd_filter_batch(batch, predicates)? };
97
98 #[cfg(feature = "profile-q6")]
99 {
100 let filter_time = filter_start.elapsed();
101 eprintln!(
102 "[PROFILE-Q6] Phase 1 - SIMD Filter: {:?} ({}/{} rows passed)",
103 filter_time,
104 filtered_batch.row_count(),
105 batch.row_count()
106 );
107 }
108
109 #[cfg(feature = "profile-q6")]
111 let agg_start = std::time::Instant::now();
112
113 let results = compute_batch_aggregates(&filtered_batch, aggregates)?;
114
115 #[cfg(feature = "profile-q6")]
116 {
117 let agg_time = agg_start.elapsed();
118 eprintln!(
119 "[PROFILE-Q6] Phase 2 - SIMD Aggregate: {:?} ({} aggregates)",
120 agg_time,
121 aggregates.len()
122 );
123 }
124
125 Ok(vec![Row::new(results)])
127}
128
129#[cfg(test)]
130mod tests {
131 use super::{
132 super::{aggregate::AggregateSource, batch::ColumnarBatch},
133 *,
134 };
135
136 fn make_test_batch() -> ColumnarBatch {
137 let rows = vec![
138 Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
139 Row::new(vec![SqlValue::Integer(20), SqlValue::Double(2.5)]),
140 Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.5)]),
141 Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
142 ];
143 ColumnarBatch::from_rows(&rows).unwrap()
144 }
145
146 #[test]
147 fn test_execute_columnar_batch_sum() {
148 let batch = make_test_batch();
149 let aggregates =
150 vec![AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) }];
151
152 let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
153 assert_eq!(result.len(), 1);
154 assert_eq!(result[0].get(0), Some(&SqlValue::Integer(100)));
155 }
156
157 #[test]
158 fn test_execute_columnar_batch_with_filter() {
159 let batch = make_test_batch();
160 let predicates =
161 vec![ColumnPredicate::LessThan { column_idx: 0, value: SqlValue::Integer(25) }];
162 let aggregates =
163 vec![AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) }];
164
165 let result = execute_columnar_batch(&batch, &predicates, &aggregates, None).unwrap();
166 assert_eq!(result.len(), 1);
167 assert_eq!(result[0].get(0), Some(&SqlValue::Integer(30)));
169 }
170
171 #[test]
172 fn test_execute_columnar_batch_multiple_aggregates() {
173 let batch = make_test_batch();
174 let aggregates = vec![
175 AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
176 AggregateSpec { op: AggregateOp::Avg, source: AggregateSource::Column(1) },
177 AggregateSpec { op: AggregateOp::Count, source: AggregateSource::CountStar },
178 ];
179
180 let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
181 assert_eq!(result.len(), 1);
182 assert_eq!(result[0].len(), 3);
183
184 assert_eq!(result[0].get(0), Some(&SqlValue::Integer(100)));
186
187 if let Some(SqlValue::Double(avg)) = result[0].get(1) {
189 assert!((avg - 3.0).abs() < 0.001);
190 } else {
191 panic!("Expected Double for AVG");
192 }
193
194 assert_eq!(result[0].get(2), Some(&SqlValue::Integer(4)));
196 }
197
198 #[test]
199 fn test_execute_columnar_batch_empty() {
200 let batch = ColumnarBatch::new(2);
201 let aggregates = vec![
202 AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
203 AggregateSpec { op: AggregateOp::Count, source: AggregateSource::CountStar },
204 ];
205
206 let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
207 assert_eq!(result.len(), 1);
208 assert_eq!(result[0].get(0), Some(&SqlValue::Null)); assert_eq!(result[0].get(1), Some(&SqlValue::Integer(0))); }
211
212 #[test]
215 fn test_execute_columnar_batch_with_nulls() {
216 let rows = vec![
218 Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
219 Row::new(vec![SqlValue::Null, SqlValue::Double(2.5)]), Row::new(vec![SqlValue::Integer(30), SqlValue::Null]), Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
222 ];
223 let batch = ColumnarBatch::from_rows(&rows).unwrap();
224
225 let aggregates = vec![
226 AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
227 AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(1) },
228 AggregateSpec { op: AggregateOp::Count, source: AggregateSource::CountStar },
229 ];
230
231 let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
232 assert_eq!(result.len(), 1);
233
234 assert_eq!(result[0].get(0), Some(&SqlValue::Integer(80)));
236
237 if let Some(SqlValue::Double(sum)) = result[0].get(1) {
239 assert!((sum - 8.5).abs() < 0.001);
240 } else {
241 panic!("Expected Double for SUM(col1)");
242 }
243
244 assert_eq!(result[0].get(2), Some(&SqlValue::Integer(4)));
246 }
247
248 #[test]
251 fn test_execute_columnar_batch_with_nulls_and_filter() {
252 let rows = vec![
253 Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.0)]),
254 Row::new(vec![SqlValue::Integer(20), SqlValue::Null]), Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.0)]),
257 Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.0)]),
258 ];
259 let batch = ColumnarBatch::from_rows(&rows).unwrap();
260
261 let predicates =
263 vec![ColumnPredicate::LessThan { column_idx: 0, value: SqlValue::Integer(35) }];
264
265 let aggregates =
266 vec![AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(1) }];
267
268 let result = execute_columnar_batch(&batch, &predicates, &aggregates, None).unwrap();
269 assert_eq!(result.len(), 1);
270
271 if let Some(SqlValue::Double(sum)) = result[0].get(0) {
273 assert!((sum - 4.0).abs() < 0.001);
274 } else {
275 panic!("Expected Double for SUM(col1)");
276 }
277 }
278}