1mod aggregate;
35pub mod batch;
36mod executor;
37pub mod filter;
38mod scan;
39mod string_ops;
40
41pub mod simd_ops;
44
45mod simd_aggregate;
46pub mod simd_filter;
47mod simd_join;
48
49pub use aggregate::{
50 columnar_group_by, columnar_group_by_batch, compute_multiple_aggregates,
51 evaluate_expression_to_column, evaluate_expression_with_cached_column, extract_aggregates,
52 AggregateOp, AggregateSource, AggregateSpec,
53};
54pub use batch::{ColumnArray, ColumnarBatch};
55pub use executor::execute_columnar_batch;
56pub use filter::{
57 apply_columnar_filter, apply_columnar_filter_simd_streaming, create_filter_bitmap,
58 create_filter_bitmap_tree, evaluate_predicate_tree, extract_column_predicates,
59 extract_predicate_tree, ColumnPredicate, PredicateTree,
60};
61pub use scan::ColumnarScan;
62
63pub use aggregate::compute_aggregates_from_batch;
64pub use simd_aggregate::{can_use_simd_for_column, simd_aggregate_f64, simd_aggregate_i64};
65pub use simd_filter::{simd_create_filter_mask, simd_create_filter_mask_packed, simd_filter_batch};
66pub use simd_join::columnar_hash_join_inner;
67pub use simd_ops::PackedMask;
68
69use crate::errors::ExecutorError;
70use crate::schema::CombinedSchema;
71use log;
72use vibesql_storage::Row;
73use vibesql_types::SqlValue;
74
75pub fn execute_columnar_aggregate(
111 rows: &[Row],
112 predicates: &[ColumnPredicate],
113 aggregates: &[aggregate::AggregateSpec],
114 schema: Option<&CombinedSchema>,
115) -> Result<Vec<Row>, ExecutorError> {
116 if rows.is_empty() {
119 let values: Vec<SqlValue> = aggregates
120 .iter()
121 .map(|spec| match spec.op {
122 aggregate::AggregateOp::Count => SqlValue::Integer(0),
123 _ => SqlValue::Null,
124 })
125 .collect();
126 return Ok(vec![Row::new(values)]);
127 }
128
129 #[cfg(feature = "profile-q6")]
131 let batch_start = std::time::Instant::now();
132
133 let batch = ColumnarBatch::from_rows(rows)?;
134
135 #[cfg(feature = "profile-q6")]
136 {
137 let batch_time = batch_start.elapsed();
138 eprintln!("[PROFILE-Q6] Phase 1 - Convert to batch: {:?}", batch_time);
139 }
140
141 #[cfg(feature = "profile-q6")]
143 let filter_start = std::time::Instant::now();
144
145 let filtered_batch =
146 if predicates.is_empty() { batch.clone() } else { simd_filter_batch(&batch, predicates)? };
147
148 #[cfg(feature = "profile-q6")]
149 {
150 let filter_time = filter_start.elapsed();
151 eprintln!(
152 "[PROFILE-Q6] Phase 2 - SIMD filter: {:?} ({}/{} rows passed)",
153 filter_time,
154 filtered_batch.row_count(),
155 rows.len()
156 );
157 }
158
159 #[cfg(feature = "profile-q6")]
161 let agg_start = std::time::Instant::now();
162
163 let results = compute_aggregates_from_batch(&filtered_batch, aggregates, schema)?;
165
166 #[cfg(feature = "profile-q6")]
167 {
168 let agg_time = agg_start.elapsed();
169 eprintln!(
170 "[PROFILE-Q6] Phase 3 - Batch-native aggregate: {:?} ({} aggregates)",
171 agg_time,
172 aggregates.len()
173 );
174 }
175
176 Ok(vec![Row::new(results)])
178}
179
180pub fn fast_aggregate_on_rows(
203 rows: &[Row],
204 predicates: &[ColumnPredicate],
205 aggregates: &[aggregate::AggregateSpec],
206) -> Result<Vec<Row>, ExecutorError> {
207 use aggregate::{AggregateOp, AggregateSource};
208
209 if rows.is_empty() {
211 let values: Vec<SqlValue> = aggregates
212 .iter()
213 .map(|spec| match spec.op {
214 AggregateOp::Count => SqlValue::Integer(0),
215 _ => SqlValue::Null,
216 })
217 .collect();
218 return Ok(vec![Row::new(values)]);
219 }
220
221 struct Accumulator {
223 sum_f64: f64,
224 sum_i64: i64,
225 count: i64,
226 min_f64: Option<f64>,
227 max_f64: Option<f64>,
228 min_i64: Option<i64>,
229 max_i64: Option<i64>,
230 is_integer: bool,
231 }
232
233 let mut accumulators: Vec<Accumulator> = aggregates
234 .iter()
235 .map(|_| Accumulator {
236 sum_f64: 0.0,
237 sum_i64: 0,
238 count: 0,
239 min_f64: None,
240 max_f64: None,
241 min_i64: None,
242 max_i64: None,
243 is_integer: true,
244 })
245 .collect();
246
247 for row in rows {
249 let passes_filter = predicates.iter().all(|pred| evaluate_predicate(row, pred));
251
252 if !passes_filter {
253 continue;
254 }
255
256 for (i, spec) in aggregates.iter().enumerate() {
258 let acc = &mut accumulators[i];
259
260 match &spec.source {
261 AggregateSource::CountStar => {
262 acc.count += 1;
263 }
264 AggregateSource::Column(col_idx) => {
265 if let Some(value) = row.get(*col_idx) {
266 if !matches!(value, SqlValue::Null) {
267 acc.count += 1;
268 match value {
269 SqlValue::Integer(v) => {
270 acc.sum_i64 += v;
271 acc.sum_f64 += *v as f64;
272 acc.min_i64 = Some(acc.min_i64.map_or(*v, |m| m.min(*v)));
273 acc.max_i64 = Some(acc.max_i64.map_or(*v, |m| m.max(*v)));
274 acc.min_f64 =
275 Some(acc.min_f64.map_or(*v as f64, |m| m.min(*v as f64)));
276 acc.max_f64 =
277 Some(acc.max_f64.map_or(*v as f64, |m| m.max(*v as f64)));
278 }
279 SqlValue::Double(v) => {
280 acc.is_integer = false;
281 acc.sum_f64 += v;
282 acc.min_f64 = Some(acc.min_f64.map_or(*v, |m| m.min(*v)));
283 acc.max_f64 = Some(acc.max_f64.map_or(*v, |m| m.max(*v)));
284 }
285 SqlValue::Float(v) => {
286 acc.is_integer = false;
287 acc.sum_f64 += *v as f64;
288 acc.min_f64 =
289 Some(acc.min_f64.map_or(*v as f64, |m| m.min(*v as f64)));
290 acc.max_f64 =
291 Some(acc.max_f64.map_or(*v as f64, |m| m.max(*v as f64)));
292 }
293 SqlValue::Bigint(v) => {
294 acc.sum_i64 += v;
295 acc.sum_f64 += *v as f64;
296 acc.min_i64 = Some(acc.min_i64.map_or(*v, |m| m.min(*v)));
297 acc.max_i64 = Some(acc.max_i64.map_or(*v, |m| m.max(*v)));
298 acc.min_f64 =
299 Some(acc.min_f64.map_or(*v as f64, |m| m.min(*v as f64)));
300 acc.max_f64 =
301 Some(acc.max_f64.map_or(*v as f64, |m| m.max(*v as f64)));
302 }
303 SqlValue::Numeric(v) => {
304 acc.is_integer = false;
305 acc.sum_f64 += v;
306 acc.min_f64 = Some(acc.min_f64.map_or(*v, |m| m.min(*v)));
307 acc.max_f64 = Some(acc.max_f64.map_or(*v, |m| m.max(*v)));
308 }
309 _ => {}
310 }
311 }
312 }
313 }
314 AggregateSource::Expression(expr) => {
315 if let Some(value) = eval_simple_expression(row, expr) {
318 acc.count += 1;
319 acc.is_integer = false;
320 acc.sum_f64 += value;
321 acc.min_f64 = Some(acc.min_f64.map_or(value, |m| m.min(value)));
322 acc.max_f64 = Some(acc.max_f64.map_or(value, |m| m.max(value)));
323 }
324 }
325 }
326 }
327 }
328
329 let values: Vec<SqlValue> = aggregates
331 .iter()
332 .zip(accumulators.iter())
333 .map(|(spec, acc)| match spec.op {
334 AggregateOp::Count => SqlValue::Integer(acc.count),
335 AggregateOp::Sum => {
336 if acc.count == 0 {
337 SqlValue::Null
338 } else if acc.is_integer {
339 SqlValue::Integer(acc.sum_i64)
340 } else {
341 SqlValue::Double(acc.sum_f64)
342 }
343 }
344 AggregateOp::Avg => {
345 if acc.count == 0 {
346 SqlValue::Null
347 } else {
348 SqlValue::Double(acc.sum_f64 / acc.count as f64)
349 }
350 }
351 AggregateOp::Min => {
352 if acc.is_integer {
353 acc.min_i64.map(SqlValue::Integer).unwrap_or(SqlValue::Null)
354 } else {
355 acc.min_f64.map(SqlValue::Double).unwrap_or(SqlValue::Null)
356 }
357 }
358 AggregateOp::Max => {
359 if acc.is_integer {
360 acc.max_i64.map(SqlValue::Integer).unwrap_or(SqlValue::Null)
361 } else {
362 acc.max_f64.map(SqlValue::Double).unwrap_or(SqlValue::Null)
363 }
364 }
365 })
366 .collect();
367
368 Ok(vec![Row::new(values)])
369}
370
371fn evaluate_predicate(row: &Row, predicate: &ColumnPredicate) -> bool {
373 match predicate {
374 ColumnPredicate::LessThan { column_idx, value } => row
375 .get(*column_idx)
376 .map(|v| compare_values(v, value) == std::cmp::Ordering::Less)
377 .unwrap_or(false),
378 ColumnPredicate::LessThanOrEqual { column_idx, value } => row
379 .get(*column_idx)
380 .map(|v| compare_values(v, value) != std::cmp::Ordering::Greater)
381 .unwrap_or(false),
382 ColumnPredicate::GreaterThan { column_idx, value } => row
383 .get(*column_idx)
384 .map(|v| compare_values(v, value) == std::cmp::Ordering::Greater)
385 .unwrap_or(false),
386 ColumnPredicate::GreaterThanOrEqual { column_idx, value } => row
387 .get(*column_idx)
388 .map(|v| compare_values(v, value) != std::cmp::Ordering::Less)
389 .unwrap_or(false),
390 ColumnPredicate::Equal { column_idx, value } => row
391 .get(*column_idx)
392 .map(|v| compare_values(v, value) == std::cmp::Ordering::Equal)
393 .unwrap_or(false),
394 ColumnPredicate::NotEqual { column_idx, value } => row
395 .get(*column_idx)
396 .map(|v| compare_values(v, value) != std::cmp::Ordering::Equal)
397 .unwrap_or(false),
398 ColumnPredicate::Between { column_idx, low, high } => row
399 .get(*column_idx)
400 .map(|v| {
401 compare_values(v, low) != std::cmp::Ordering::Less
402 && compare_values(v, high) != std::cmp::Ordering::Greater
403 })
404 .unwrap_or(false),
405 ColumnPredicate::Like { column_idx, pattern, negated } => {
406 let matches = row
408 .get(*column_idx)
409 .map(|v| {
410 if let SqlValue::Varchar(s) = v {
411 let pattern_str = pattern.as_str();
414 if let Some(inner) =
415 pattern_str.strip_prefix('%').and_then(|s| s.strip_suffix('%'))
416 {
417 s.contains(inner)
418 } else if let Some(suffix) = pattern_str.strip_prefix('%') {
419 s.ends_with(suffix)
420 } else if let Some(prefix) = pattern_str.strip_suffix('%') {
421 s.starts_with(prefix)
422 } else {
423 s == pattern_str
424 }
425 } else {
426 false
427 }
428 })
429 .unwrap_or(false);
430 if *negated {
431 !matches
432 } else {
433 matches
434 }
435 }
436 ColumnPredicate::InList { column_idx, values, negated } => {
437 let matches = row
439 .get(*column_idx)
440 .map(|v| {
441 values
442 .iter()
443 .any(|list_val| compare_values(v, list_val) == std::cmp::Ordering::Equal)
444 })
445 .unwrap_or(false);
446 if *negated {
447 !matches
448 } else {
449 matches
450 }
451 }
452 }
453}
454
455fn compare_values(a: &SqlValue, b: &SqlValue) -> std::cmp::Ordering {
457 use std::cmp::Ordering;
458
459 match (a, b) {
460 (SqlValue::Integer(a), SqlValue::Integer(b)) => a.cmp(b),
461 (SqlValue::Bigint(a), SqlValue::Bigint(b)) => a.cmp(b),
462 (SqlValue::Double(a), SqlValue::Double(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
463 (SqlValue::Float(a), SqlValue::Float(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
464 (SqlValue::Integer(a), SqlValue::Double(b)) => {
466 (*a as f64).partial_cmp(b).unwrap_or(Ordering::Equal)
467 }
468 (SqlValue::Double(a), SqlValue::Integer(b)) => {
469 a.partial_cmp(&(*b as f64)).unwrap_or(Ordering::Equal)
470 }
471 (SqlValue::Integer(a), SqlValue::Bigint(b)) => (*a).cmp(b),
472 (SqlValue::Bigint(a), SqlValue::Integer(b)) => a.cmp(&{ *b }),
473 (SqlValue::Varchar(a), SqlValue::Varchar(b)) => a.cmp(b),
475 (SqlValue::Date(a), SqlValue::Date(b)) => {
477 match a.year.cmp(&b.year) {
479 Ordering::Equal => match a.month.cmp(&b.month) {
480 Ordering::Equal => a.day.cmp(&b.day),
481 other => other,
482 },
483 other => other,
484 }
485 }
486 (SqlValue::Null, _) | (_, SqlValue::Null) => Ordering::Equal, _ => Ordering::Equal, }
490}
491
492#[allow(clippy::only_used_in_recursion)]
494fn eval_simple_expression(row: &Row, expr: &vibesql_ast::Expression) -> Option<f64> {
495 use vibesql_ast::{BinaryOperator, Expression};
496
497 match expr {
498 Expression::BinaryOp { left, op, right } => {
499 let left_val = eval_simple_expression(row, left)?;
500 let right_val = eval_simple_expression(row, right)?;
501 match op {
502 BinaryOperator::Multiply => Some(left_val * right_val),
503 BinaryOperator::Divide => Some(left_val / right_val),
504 BinaryOperator::Plus => Some(left_val + right_val),
505 BinaryOperator::Minus => Some(left_val - right_val),
506 _ => None,
507 }
508 }
509 Expression::ColumnRef { column, .. } => {
510 log::debug!(
514 "fast_aggregate_on_rows: ColumnRef '{}' requires schema resolution, skipping fast path",
515 column
516 );
517 None
518 }
519 Expression::Literal(val) => match val {
520 SqlValue::Integer(v) => Some(*v as f64),
521 SqlValue::Double(v) => Some(*v),
522 SqlValue::Float(v) => Some(*v as f64),
523 SqlValue::Bigint(v) => Some(*v as f64),
524 SqlValue::Numeric(v) => Some(*v),
525 _ => None,
526 },
527 _ => None,
528 }
529}
530
531pub fn execute_columnar(
550 rows: &[Row],
551 filter: Option<&vibesql_ast::Expression>,
552 aggregates: &[vibesql_ast::Expression],
553 schema: &CombinedSchema,
554) -> Option<Result<Vec<Row>, ExecutorError>> {
555 log::debug!(" Executing columnar query with {} rows", rows.len());
556
557 let predicates = if let Some(filter_expr) = filter {
559 match extract_column_predicates(filter_expr, schema) {
560 Some(preds) => {
561 log::debug!(" ✓ Extracted {} column predicates for SIMD filtering", preds.len());
562 preds
563 }
564 None => {
565 log::debug!(" ✗ Filter too complex for columnar optimization");
566 return None; }
568 }
569 } else {
570 log::debug!(" No filter predicates");
571 vec![] };
573
574 let agg_specs = match extract_aggregates(aggregates, schema) {
576 Some(specs) => {
577 log::debug!(" ✓ Extracted {} aggregate operations", specs.len());
578 for (i, spec) in specs.iter().enumerate() {
579 log::debug!(" Aggregate {}: {:?}", i + 1, spec.op);
580 }
581 specs
582 }
583 None => {
584 log::debug!(" ✗ Aggregates too complex for columnar optimization");
585 return None; }
587 };
588
589 let needs_schema = agg_specs
591 .iter()
592 .any(|spec| matches!(spec.source, aggregate::AggregateSource::Expression(_)));
593 let schema_ref = if needs_schema { Some(schema) } else { None };
594
595 log::debug!(" Executing SIMD-accelerated columnar aggregation");
596 Some(execute_columnar_aggregate(rows, &predicates, &agg_specs, schema_ref))
597}
598
599#[cfg(test)]
600mod tests {
601 use super::*;
602 use vibesql_types::Date;
603
604 #[test]
606 fn test_columnar_pipeline_filtered_sum() {
607 let rows = vec![
615 Row::new(vec![
616 SqlValue::Integer(10), SqlValue::Double(100.0), SqlValue::Double(0.06), SqlValue::Date(Date::new(1994, 6, 1).unwrap()),
620 ]),
621 Row::new(vec![
622 SqlValue::Integer(25), SqlValue::Double(200.0),
624 SqlValue::Double(0.06),
625 SqlValue::Date(Date::new(1994, 7, 1).unwrap()),
626 ]),
627 Row::new(vec![
628 SqlValue::Integer(15), SqlValue::Double(150.0),
630 SqlValue::Double(0.05), SqlValue::Date(Date::new(1994, 8, 1).unwrap()),
632 ]),
633 Row::new(vec![
634 SqlValue::Integer(20), SqlValue::Double(180.0),
636 SqlValue::Double(0.08), SqlValue::Date(Date::new(1994, 9, 1).unwrap()),
638 ]),
639 ];
640
641 let predicates = vec![
643 ColumnPredicate::LessThan { column_idx: 0, value: SqlValue::Integer(24) },
644 ColumnPredicate::Between {
645 column_idx: 2,
646 low: SqlValue::Double(0.05),
647 high: SqlValue::Double(0.07),
648 },
649 ];
650
651 let aggregates = vec![
653 AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(1) }, AggregateSpec { op: AggregateOp::Count, source: AggregateSource::Column(0) }, ];
656
657 let result = execute_columnar_aggregate(&rows, &predicates, &aggregates, None).unwrap();
658
659 assert_eq!(result.len(), 1);
660 let result_row = &result[0];
661
662 assert!(
665 matches!(result_row.get(0), Some(&SqlValue::Double(sum)) if (sum - 250.0).abs() < 0.001)
666 );
667 assert_eq!(result_row.get(1), Some(&SqlValue::Integer(2)));
669 }
670
671 #[test]
673 fn test_columnar_pipeline_no_filter() {
674 let rows = vec![
675 Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
676 Row::new(vec![SqlValue::Integer(20), SqlValue::Double(2.5)]),
677 Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.5)]),
678 ];
679
680 let predicates = vec![];
681 let aggregates = vec![
682 AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
683 AggregateSpec { op: AggregateOp::Avg, source: AggregateSource::Column(1) },
684 AggregateSpec { op: AggregateOp::Max, source: AggregateSource::Column(0) },
685 ];
686
687 let result = execute_columnar_aggregate(&rows, &predicates, &aggregates, None).unwrap();
688
689 assert_eq!(result.len(), 1);
690 let result_row = &result[0];
691
692 assert_eq!(result_row.get(0), Some(&SqlValue::Integer(60)));
694 assert!(
696 matches!(result_row.get(1), Some(&SqlValue::Double(avg)) if (avg - 2.5).abs() < 0.001)
697 );
698 assert_eq!(result_row.get(2), Some(&SqlValue::Integer(30)));
700 }
701
702 #[test]
704 fn test_columnar_pipeline_empty_result() {
705 let rows =
706 vec![Row::new(vec![SqlValue::Integer(100)]), Row::new(vec![SqlValue::Integer(200)])];
707
708 let predicates =
710 vec![ColumnPredicate::LessThan { column_idx: 0, value: SqlValue::Integer(50) }];
711
712 let aggregates = vec![
713 AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
714 AggregateSpec { op: AggregateOp::Count, source: AggregateSource::Column(0) },
715 ];
716
717 let result = execute_columnar_aggregate(&rows, &predicates, &aggregates, None).unwrap();
718
719 assert_eq!(result.len(), 1);
720 let result_row = &result[0];
721
722 assert_eq!(result_row.get(0), Some(&SqlValue::Null));
724 assert_eq!(result_row.get(1), Some(&SqlValue::Integer(0)));
726 }
727
728 use crate::schema::CombinedSchema;
731 use vibesql_ast::{BinaryOperator, Expression};
732 use vibesql_catalog::{ColumnSchema, TableSchema};
733 use vibesql_types::DataType;
734
735 fn make_test_schema() -> CombinedSchema {
736 let schema = TableSchema::new(
737 "test".to_string(),
738 vec![
739 ColumnSchema::new("quantity".to_string(), DataType::Integer, false),
740 ColumnSchema::new("price".to_string(), DataType::DoublePrecision, false),
741 ],
742 );
743 CombinedSchema::from_table("test".to_string(), schema)
744 }
745
746 fn make_test_rows_for_ast() -> Vec<Row> {
747 vec![
748 Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
749 Row::new(vec![SqlValue::Integer(20), SqlValue::Double(2.5)]),
750 Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.5)]),
751 Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
752 ]
753 }
754
755 #[test]
756 fn test_execute_columnar_simple_aggregate() {
757 let rows = make_test_rows_for_ast();
758 let schema = make_test_schema();
759
760 let aggregates = vec![Expression::AggregateFunction {
762 name: "SUM".to_string(),
763 distinct: false,
764 args: vec![Expression::ColumnRef { table: None, column: "price".to_string() }],
765 }];
766
767 let result = execute_columnar(&rows, None, &aggregates, &schema);
768 assert!(result.is_some());
769
770 let rows_result = result.unwrap();
771 assert!(rows_result.is_ok());
772
773 let result_rows = rows_result.unwrap();
774 assert_eq!(result_rows.len(), 1);
775 assert_eq!(result_rows[0].len(), 1);
776
777 if let Some(SqlValue::Double(sum)) = result_rows[0].get(0) {
779 assert!((sum - 12.0).abs() < 0.001);
780 } else {
781 panic!("Expected Double value");
782 }
783 }
784
785 #[test]
786 fn test_execute_columnar_with_filter() {
787 let rows = make_test_rows_for_ast();
788 let schema = make_test_schema();
789
790 let filter = Expression::BinaryOp {
792 left: Box::new(Expression::ColumnRef { table: None, column: "quantity".to_string() }),
793 op: BinaryOperator::LessThan,
794 right: Box::new(Expression::Literal(SqlValue::Integer(25))),
795 };
796
797 let aggregates = vec![Expression::AggregateFunction {
798 name: "SUM".to_string(),
799 distinct: false,
800 args: vec![Expression::ColumnRef { table: None, column: "price".to_string() }],
801 }];
802
803 let result = execute_columnar(&rows, Some(&filter), &aggregates, &schema);
804 assert!(result.is_some());
805
806 let rows_result = result.unwrap();
807 assert!(rows_result.is_ok());
808
809 let result_rows = rows_result.unwrap();
810 assert_eq!(result_rows.len(), 1);
811 assert_eq!(result_rows[0].len(), 1);
812
813 if let Some(SqlValue::Double(sum)) = result_rows[0].get(0) {
815 assert!((sum - 4.0).abs() < 0.001);
816 } else {
817 panic!("Expected Double value");
818 }
819 }
820
821 #[test]
822 fn test_execute_columnar_multiple_aggregates() {
823 let rows = make_test_rows_for_ast();
824 let schema = make_test_schema();
825
826 let aggregates = vec![
828 Expression::AggregateFunction {
829 name: "SUM".to_string(),
830 distinct: false,
831 args: vec![Expression::ColumnRef { table: None, column: "price".to_string() }],
832 },
833 Expression::AggregateFunction {
834 name: "COUNT".to_string(),
835 distinct: false,
836 args: vec![Expression::Wildcard],
837 },
838 Expression::AggregateFunction {
839 name: "AVG".to_string(),
840 distinct: false,
841 args: vec![Expression::ColumnRef { table: None, column: "quantity".to_string() }],
842 },
843 ];
844
845 let result = execute_columnar(&rows, None, &aggregates, &schema);
846 assert!(result.is_some());
847
848 let rows_result = result.unwrap();
849 assert!(rows_result.is_ok());
850
851 let result_rows = rows_result.unwrap();
852 assert_eq!(result_rows.len(), 1);
853 assert_eq!(result_rows[0].len(), 3);
854
855 if let Some(SqlValue::Double(sum)) = result_rows[0].get(0) {
857 assert!((sum - 12.0).abs() < 0.001);
858 } else {
859 panic!("Expected Double value for SUM");
860 }
861
862 assert_eq!(result_rows[0].get(1), Some(&SqlValue::Integer(4)));
864
865 if let Some(SqlValue::Double(avg)) = result_rows[0].get(2) {
867 assert!((avg - 25.0).abs() < 0.001);
868 } else {
869 panic!("Expected Double value for AVG");
870 }
871 }
872
873 #[test]
874 fn test_execute_columnar_unsupported_distinct() {
875 let rows = make_test_rows_for_ast();
876 let schema = make_test_schema();
877
878 let aggregates = vec![Expression::AggregateFunction {
880 name: "SUM".to_string(),
881 distinct: true,
882 args: vec![Expression::ColumnRef { table: None, column: "price".to_string() }],
883 }];
884
885 let result = execute_columnar(&rows, None, &aggregates, &schema);
886 assert!(result.is_none());
887 }
888
889 #[test]
890 fn test_execute_columnar_unsupported_complex_filter() {
891 let rows = make_test_rows_for_ast();
892 let schema = make_test_schema();
893
894 let filter = Expression::ScalarSubquery(Box::new(vibesql_ast::SelectStmt {
896 with_clause: None,
897 distinct: false,
898 select_list: vec![],
899 into_table: None,
900 into_variables: None,
901 from: None,
902 where_clause: None,
903 group_by: None,
904 having: None,
905 order_by: None,
906 limit: None,
907 offset: None,
908 set_operation: None,
909 }));
910
911 let aggregates = vec![Expression::AggregateFunction {
912 name: "SUM".to_string(),
913 distinct: false,
914 args: vec![Expression::ColumnRef { table: None, column: "price".to_string() }],
915 }];
916
917 let result = execute_columnar(&rows, Some(&filter), &aggregates, &schema);
918 assert!(result.is_none());
919 }
920}