vibesql_executor/select/columnar/executor/
mod.rs1#![allow(clippy::needless_range_loop, clippy::unnecessary_map_or)]
7mod fused;
29mod aggregate;
30mod expression;
31
32use super::batch::ColumnarBatch;
33use super::aggregate::{AggregateOp, AggregateSpec};
34use super::filter::ColumnPredicate;
35use super::simd_filter::simd_filter_batch;
36use crate::errors::ExecutorError;
37use crate::schema::CombinedSchema;
38use vibesql_storage::Row;
39use vibesql_types::SqlValue;
40
41use fused::{can_use_fused_aggregation, execute_fused_filter_aggregate};
42use aggregate::compute_batch_aggregates;
43
44pub fn execute_columnar_batch(
61 batch: &ColumnarBatch,
62 predicates: &[ColumnPredicate],
63 aggregates: &[AggregateSpec],
64 _schema: Option<&CombinedSchema>,
65) -> Result<Vec<Row>, ExecutorError> {
66 if batch.row_count() == 0 {
68 let values: Vec<SqlValue> = aggregates
69 .iter()
70 .map(|spec| match spec.op {
71 AggregateOp::Count => SqlValue::Integer(0),
72 _ => SqlValue::Null,
73 })
74 .collect();
75 return Ok(vec![Row::new(values)]);
76 }
77
78 if !predicates.is_empty() && can_use_fused_aggregation(aggregates) {
81 if let Ok(results) = execute_fused_filter_aggregate(batch, predicates, aggregates) {
82 return Ok(vec![Row::new(results)]);
83 }
84 }
86
87 #[cfg(feature = "profile-q6")]
92 let filter_start = std::time::Instant::now();
93
94 let filtered_batch = if predicates.is_empty() {
95 batch.clone()
96 } else {
97 simd_filter_batch(batch, predicates)?
98 };
99
100 #[cfg(feature = "profile-q6")]
101 {
102 let filter_time = filter_start.elapsed();
103 eprintln!(
104 "[PROFILE-Q6] Phase 1 - SIMD Filter: {:?} ({}/{} rows passed)",
105 filter_time,
106 filtered_batch.row_count(),
107 batch.row_count()
108 );
109 }
110
111 #[cfg(feature = "profile-q6")]
113 let agg_start = std::time::Instant::now();
114
115 let results = compute_batch_aggregates(&filtered_batch, aggregates)?;
116
117 #[cfg(feature = "profile-q6")]
118 {
119 let agg_time = agg_start.elapsed();
120 eprintln!(
121 "[PROFILE-Q6] Phase 2 - SIMD Aggregate: {:?} ({} aggregates)",
122 agg_time,
123 aggregates.len()
124 );
125 }
126
127 Ok(vec![Row::new(results)])
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use super::super::aggregate::AggregateSource;
135 use super::super::batch::ColumnarBatch;
136
137 fn make_test_batch() -> ColumnarBatch {
138 let rows = vec![
139 Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
140 Row::new(vec![SqlValue::Integer(20), SqlValue::Double(2.5)]),
141 Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.5)]),
142 Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
143 ];
144 ColumnarBatch::from_rows(&rows).unwrap()
145 }
146
147 #[test]
148 fn test_execute_columnar_batch_sum() {
149 let batch = make_test_batch();
150 let aggregates = vec![
151 AggregateSpec {
152 op: AggregateOp::Sum,
153 source: AggregateSource::Column(0),
154 },
155 ];
156
157 let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
158 assert_eq!(result.len(), 1);
159 assert_eq!(result[0].get(0), Some(&SqlValue::Integer(100)));
160 }
161
162 #[test]
163 fn test_execute_columnar_batch_with_filter() {
164 let batch = make_test_batch();
165 let predicates = vec![
166 ColumnPredicate::LessThan {
167 column_idx: 0,
168 value: SqlValue::Integer(25),
169 },
170 ];
171 let aggregates = vec![
172 AggregateSpec {
173 op: AggregateOp::Sum,
174 source: AggregateSource::Column(0),
175 },
176 ];
177
178 let result = execute_columnar_batch(&batch, &predicates, &aggregates, None).unwrap();
179 assert_eq!(result.len(), 1);
180 assert_eq!(result[0].get(0), Some(&SqlValue::Integer(30)));
182 }
183
184 #[test]
185 fn test_execute_columnar_batch_multiple_aggregates() {
186 let batch = make_test_batch();
187 let aggregates = vec![
188 AggregateSpec {
189 op: AggregateOp::Sum,
190 source: AggregateSource::Column(0),
191 },
192 AggregateSpec {
193 op: AggregateOp::Avg,
194 source: AggregateSource::Column(1),
195 },
196 AggregateSpec {
197 op: AggregateOp::Count,
198 source: AggregateSource::CountStar,
199 },
200 ];
201
202 let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
203 assert_eq!(result.len(), 1);
204 assert_eq!(result[0].len(), 3);
205
206 assert_eq!(result[0].get(0), Some(&SqlValue::Integer(100)));
208
209 if let Some(SqlValue::Double(avg)) = result[0].get(1) {
211 assert!((avg - 3.0).abs() < 0.001);
212 } else {
213 panic!("Expected Double for AVG");
214 }
215
216 assert_eq!(result[0].get(2), Some(&SqlValue::Integer(4)));
218 }
219
220 #[test]
221 fn test_execute_columnar_batch_empty() {
222 let batch = ColumnarBatch::new(2);
223 let aggregates = vec![
224 AggregateSpec {
225 op: AggregateOp::Sum,
226 source: AggregateSource::Column(0),
227 },
228 AggregateSpec {
229 op: AggregateOp::Count,
230 source: AggregateSource::CountStar,
231 },
232 ];
233
234 let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
235 assert_eq!(result.len(), 1);
236 assert_eq!(result[0].get(0), Some(&SqlValue::Null)); assert_eq!(result[0].get(1), Some(&SqlValue::Integer(0))); }
239
240 #[test]
243 fn test_execute_columnar_batch_with_nulls() {
244 let rows = vec![
246 Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
247 Row::new(vec![SqlValue::Null, SqlValue::Double(2.5)]), Row::new(vec![SqlValue::Integer(30), SqlValue::Null]), Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
250 ];
251 let batch = ColumnarBatch::from_rows(&rows).unwrap();
252
253 let aggregates = vec![
254 AggregateSpec {
255 op: AggregateOp::Sum,
256 source: AggregateSource::Column(0),
257 },
258 AggregateSpec {
259 op: AggregateOp::Sum,
260 source: AggregateSource::Column(1),
261 },
262 AggregateSpec {
263 op: AggregateOp::Count,
264 source: AggregateSource::CountStar,
265 },
266 ];
267
268 let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
269 assert_eq!(result.len(), 1);
270
271 assert_eq!(result[0].get(0), Some(&SqlValue::Integer(80)));
273
274 if let Some(SqlValue::Double(sum)) = result[0].get(1) {
276 assert!((sum - 8.5).abs() < 0.001);
277 } else {
278 panic!("Expected Double for SUM(col1)");
279 }
280
281 assert_eq!(result[0].get(2), Some(&SqlValue::Integer(4)));
283 }
284
285 #[test]
288 fn test_execute_columnar_batch_with_nulls_and_filter() {
289 let rows = vec![
290 Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.0)]),
291 Row::new(vec![SqlValue::Integer(20), SqlValue::Null]), Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.0)]),
293 Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.0)]),
294 ];
295 let batch = ColumnarBatch::from_rows(&rows).unwrap();
296
297 let predicates = vec![
299 ColumnPredicate::LessThan {
300 column_idx: 0,
301 value: SqlValue::Integer(35),
302 },
303 ];
304
305 let aggregates = vec![
306 AggregateSpec {
307 op: AggregateOp::Sum,
308 source: AggregateSource::Column(1),
309 },
310 ];
311
312 let result = execute_columnar_batch(&batch, &predicates, &aggregates, None).unwrap();
313 assert_eq!(result.len(), 1);
314
315 if let Some(SqlValue::Double(sum)) = result[0].get(0) {
317 assert!((sum - 4.0).abs() < 0.001);
318 } else {
319 panic!("Expected Double for SUM(col1)");
320 }
321 }
322}