vibesql_executor/select/columnar/executor/
mod.rs1#![allow(clippy::needless_range_loop, clippy::unnecessary_map_or)]
7mod aggregate;
29mod expression;
30mod fused;
31
32use super::aggregate::{AggregateOp, AggregateSpec};
33use super::batch::ColumnarBatch;
34use super::filter::ColumnPredicate;
35use super::simd_filter::simd_filter_batch;
36use crate::errors::ExecutorError;
37use crate::schema::CombinedSchema;
38use vibesql_storage::Row;
39use vibesql_types::SqlValue;
40
41use aggregate::compute_batch_aggregates;
42use fused::{can_use_fused_aggregation, execute_fused_filter_aggregate};
43
44pub fn execute_columnar_batch(
61 batch: &ColumnarBatch,
62 predicates: &[ColumnPredicate],
63 aggregates: &[AggregateSpec],
64 _schema: Option<&CombinedSchema>,
65) -> Result<Vec<Row>, ExecutorError> {
66 if batch.row_count() == 0 {
68 let values: Vec<SqlValue> = aggregates
69 .iter()
70 .map(|spec| match spec.op {
71 AggregateOp::Count => SqlValue::Integer(0),
72 _ => SqlValue::Null,
73 })
74 .collect();
75 return Ok(vec![Row::new(values)]);
76 }
77
78 if !predicates.is_empty() && can_use_fused_aggregation(aggregates) {
81 if let Ok(results) = execute_fused_filter_aggregate(batch, predicates, aggregates) {
82 return Ok(vec![Row::new(results)]);
83 }
84 }
86
87 #[cfg(feature = "profile-q6")]
92 let filter_start = std::time::Instant::now();
93
94 let filtered_batch =
95 if predicates.is_empty() { batch.clone() } else { simd_filter_batch(batch, predicates)? };
96
97 #[cfg(feature = "profile-q6")]
98 {
99 let filter_time = filter_start.elapsed();
100 eprintln!(
101 "[PROFILE-Q6] Phase 1 - SIMD Filter: {:?} ({}/{} rows passed)",
102 filter_time,
103 filtered_batch.row_count(),
104 batch.row_count()
105 );
106 }
107
108 #[cfg(feature = "profile-q6")]
110 let agg_start = std::time::Instant::now();
111
112 let results = compute_batch_aggregates(&filtered_batch, aggregates)?;
113
114 #[cfg(feature = "profile-q6")]
115 {
116 let agg_time = agg_start.elapsed();
117 eprintln!(
118 "[PROFILE-Q6] Phase 2 - SIMD Aggregate: {:?} ({} aggregates)",
119 agg_time,
120 aggregates.len()
121 );
122 }
123
124 Ok(vec![Row::new(results)])
126}
127
128#[cfg(test)]
129mod tests {
130 use super::super::aggregate::AggregateSource;
131 use super::super::batch::ColumnarBatch;
132 use super::*;
133
134 fn make_test_batch() -> ColumnarBatch {
135 let rows = vec![
136 Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
137 Row::new(vec![SqlValue::Integer(20), SqlValue::Double(2.5)]),
138 Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.5)]),
139 Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
140 ];
141 ColumnarBatch::from_rows(&rows).unwrap()
142 }
143
144 #[test]
145 fn test_execute_columnar_batch_sum() {
146 let batch = make_test_batch();
147 let aggregates =
148 vec![AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) }];
149
150 let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
151 assert_eq!(result.len(), 1);
152 assert_eq!(result[0].get(0), Some(&SqlValue::Integer(100)));
153 }
154
155 #[test]
156 fn test_execute_columnar_batch_with_filter() {
157 let batch = make_test_batch();
158 let predicates =
159 vec![ColumnPredicate::LessThan { column_idx: 0, value: SqlValue::Integer(25) }];
160 let aggregates =
161 vec![AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) }];
162
163 let result = execute_columnar_batch(&batch, &predicates, &aggregates, None).unwrap();
164 assert_eq!(result.len(), 1);
165 assert_eq!(result[0].get(0), Some(&SqlValue::Integer(30)));
167 }
168
169 #[test]
170 fn test_execute_columnar_batch_multiple_aggregates() {
171 let batch = make_test_batch();
172 let aggregates = vec![
173 AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
174 AggregateSpec { op: AggregateOp::Avg, source: AggregateSource::Column(1) },
175 AggregateSpec { op: AggregateOp::Count, source: AggregateSource::CountStar },
176 ];
177
178 let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
179 assert_eq!(result.len(), 1);
180 assert_eq!(result[0].len(), 3);
181
182 assert_eq!(result[0].get(0), Some(&SqlValue::Integer(100)));
184
185 if let Some(SqlValue::Double(avg)) = result[0].get(1) {
187 assert!((avg - 3.0).abs() < 0.001);
188 } else {
189 panic!("Expected Double for AVG");
190 }
191
192 assert_eq!(result[0].get(2), Some(&SqlValue::Integer(4)));
194 }
195
196 #[test]
197 fn test_execute_columnar_batch_empty() {
198 let batch = ColumnarBatch::new(2);
199 let aggregates = vec![
200 AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
201 AggregateSpec { op: AggregateOp::Count, source: AggregateSource::CountStar },
202 ];
203
204 let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
205 assert_eq!(result.len(), 1);
206 assert_eq!(result[0].get(0), Some(&SqlValue::Null)); assert_eq!(result[0].get(1), Some(&SqlValue::Integer(0))); }
209
210 #[test]
213 fn test_execute_columnar_batch_with_nulls() {
214 let rows = vec![
216 Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
217 Row::new(vec![SqlValue::Null, SqlValue::Double(2.5)]), Row::new(vec![SqlValue::Integer(30), SqlValue::Null]), Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
220 ];
221 let batch = ColumnarBatch::from_rows(&rows).unwrap();
222
223 let aggregates = vec![
224 AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
225 AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(1) },
226 AggregateSpec { op: AggregateOp::Count, source: AggregateSource::CountStar },
227 ];
228
229 let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
230 assert_eq!(result.len(), 1);
231
232 assert_eq!(result[0].get(0), Some(&SqlValue::Integer(80)));
234
235 if let Some(SqlValue::Double(sum)) = result[0].get(1) {
237 assert!((sum - 8.5).abs() < 0.001);
238 } else {
239 panic!("Expected Double for SUM(col1)");
240 }
241
242 assert_eq!(result[0].get(2), Some(&SqlValue::Integer(4)));
244 }
245
246 #[test]
249 fn test_execute_columnar_batch_with_nulls_and_filter() {
250 let rows = vec![
251 Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.0)]),
252 Row::new(vec![SqlValue::Integer(20), SqlValue::Null]), Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.0)]),
254 Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.0)]),
255 ];
256 let batch = ColumnarBatch::from_rows(&rows).unwrap();
257
258 let predicates =
260 vec![ColumnPredicate::LessThan { column_idx: 0, value: SqlValue::Integer(35) }];
261
262 let aggregates =
263 vec![AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(1) }];
264
265 let result = execute_columnar_batch(&batch, &predicates, &aggregates, None).unwrap();
266 assert_eq!(result.len(), 1);
267
268 if let Some(SqlValue::Double(sum)) = result[0].get(0) {
270 assert!((sum - 4.0).abs() < 0.001);
271 } else {
272 panic!("Expected Double for SUM(col1)");
273 }
274 }
275}