vibesql_executor/pipeline/
native_columnar.rs1use vibesql_ast::{Expression, SelectItem};
8use vibesql_storage::Row;
9
10use super::{ExecutionContext, ExecutionPipeline, PipelineInput, PipelineOutput};
11use crate::{
12 errors::ExecutorError,
13 select::columnar::{
14 extract_aggregates, extract_column_predicates, simd_filter_batch, AggregateOp,
15 AggregateSource, AggregateSpec, ColumnarBatch,
16 },
17};
18
19pub struct NativeColumnarPipeline {
50 #[allow(dead_code)]
52 has_columnar_storage: bool,
53}
54
55impl NativeColumnarPipeline {
56 #[inline]
58 pub fn new() -> Self {
59 Self { has_columnar_storage: true }
60 }
61
62 #[inline]
64 #[allow(dead_code)]
65 pub fn with_storage(has_columnar_storage: bool) -> Self {
66 Self { has_columnar_storage }
67 }
68}
69
70impl Default for NativeColumnarPipeline {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl ExecutionPipeline for NativeColumnarPipeline {
77 #[inline]
79 fn create_evaluator<'a>(
80 &self,
81 ctx: &'a ExecutionContext<'a>,
82 ) -> crate::evaluator::CombinedExpressionEvaluator<'a> {
83 ctx.create_evaluator()
84 }
85
86 fn apply_filter(
93 &self,
94 input: PipelineInput<'_>,
95 predicate: Option<&Expression>,
96 ctx: &ExecutionContext<'_>,
97 ) -> Result<PipelineOutput, ExecutorError> {
98 if let PipelineInput::NativeColumnar { table_name, column_indices: _ } = &input {
100 let columnar_table = match ctx.database.get_columnar(table_name) {
102 Ok(Some(ct)) => ct,
103 Ok(None) | Err(_) => {
104 return self.fallback_filter(input.into_rows(), predicate, ctx);
106 }
107 };
108
109 if columnar_table.row_count() == 0 {
111 return Ok(PipelineOutput::Empty);
112 }
113
114 let batch = ColumnarBatch::from_storage_columnar(&columnar_table)?;
116
117 if predicate.is_none() {
119 let rows = batch.to_rows()?;
120 return Ok(PipelineOutput::from_rows(rows));
121 }
122
123 let predicate = predicate.unwrap();
124
125 let predicates = match extract_column_predicates(predicate, ctx.schema) {
127 Some(preds) => preds,
128 None => {
129 let rows = batch.to_rows()?;
131 return self.fallback_filter(rows, Some(predicate), ctx);
132 }
133 };
134
135 let filtered_batch =
137 if predicates.is_empty() { batch } else { simd_filter_batch(&batch, &predicates)? };
138
139 return Ok(PipelineOutput::from_batch(filtered_batch));
142 }
143
144 let rows = input.into_rows();
146
147 if predicate.is_none() {
148 return Ok(PipelineOutput::from_rows(rows));
149 }
150
151 let predicate = predicate.unwrap();
152
153 let predicates = match extract_column_predicates(predicate, ctx.schema) {
155 Some(preds) => preds,
156 None => {
157 return self.fallback_filter(rows, Some(predicate), ctx);
158 }
159 };
160
161 if rows.is_empty() {
162 return Ok(PipelineOutput::Empty);
163 }
164
165 let batch = ColumnarBatch::from_rows(&rows)?;
167
168 let filtered_batch =
170 if predicates.is_empty() { batch } else { simd_filter_batch(&batch, &predicates)? };
171
172 Ok(PipelineOutput::from_batch(filtered_batch))
174 }
175
176 fn apply_projection(
182 &self,
183 input: PipelineInput<'_>,
184 select_items: &[SelectItem],
185 ctx: &ExecutionContext<'_>,
186 ) -> Result<PipelineOutput, ExecutorError> {
187 let rows = match input {
191 PipelineInput::Batch(batch) => {
192 if batch.row_count() == 0 {
193 return Ok(PipelineOutput::Empty);
194 }
195 batch.to_rows()?
196 }
197 PipelineInput::Rows(rows) => {
198 if rows.is_empty() {
199 return Ok(PipelineOutput::Empty);
200 }
201 rows.to_vec()
202 }
203 PipelineInput::RowsOwned(rows) => {
204 if rows.is_empty() {
205 return Ok(PipelineOutput::Empty);
206 }
207 rows
208 }
209 PipelineInput::Empty => return Ok(PipelineOutput::Empty),
210 PipelineInput::NativeColumnar { .. } => {
211 return Ok(PipelineOutput::Empty);
213 }
214 };
215
216 let evaluator = ctx.create_evaluator();
220 let buffer_pool = vibesql_storage::QueryBufferPool::new();
221
222 let mut projected_rows = Vec::with_capacity(rows.len());
223 for row in rows {
224 let projected = crate::select::projection::project_row_combined(
225 &row,
226 select_items,
227 &evaluator,
228 ctx.schema,
229 &None,
230 &buffer_pool,
231 )?;
232 projected_rows.push(projected);
233 }
234
235 Ok(PipelineOutput::from_rows(projected_rows))
236 }
237
238 fn apply_aggregation(
245 &self,
246 input: PipelineInput<'_>,
247 select_items: &[SelectItem],
248 group_by: Option<&[Expression]>,
249 having: Option<&Expression>,
250 ctx: &ExecutionContext<'_>,
251 ) -> Result<PipelineOutput, ExecutorError> {
252 if having.is_some() {
254 return Err(ExecutorError::UnsupportedFeature(
255 "HAVING not supported in native columnar pipeline".to_string(),
256 ));
257 }
258
259 let agg_exprs: Vec<Expression> = select_items
261 .iter()
262 .filter_map(|item| {
263 if let SelectItem::Expression { expr, .. } = item {
264 Some(expr.clone())
265 } else {
266 None
267 }
268 })
269 .collect();
270
271 let agg_specs = match extract_aggregates(&agg_exprs, ctx.schema) {
273 Some(specs) => specs,
274 None => {
275 return Err(ExecutorError::UnsupportedFeature(
276 "Complex aggregates not supported in native columnar pipeline".to_string(),
277 ));
278 }
279 };
280
281 let has_group_by = group_by.is_some_and(|exprs| !exprs.is_empty());
283 let group_by_count = group_by.map_or(0, |exprs| exprs.len());
284
285 let select_non_agg_count = select_items
287 .iter()
288 .filter(|item| {
289 matches!(item, SelectItem::Expression { expr, .. }
290 if !matches!(expr, Expression::AggregateFunction { .. }))
291 })
292 .count();
293
294 if has_group_by && select_non_agg_count != group_by_count {
299 return Err(ExecutorError::UnsupportedFeature(format!(
300 "GROUP BY with SELECT list that doesn't include all group keys not supported in native columnar (SELECT has {} non-aggs, GROUP BY has {} keys)",
301 select_non_agg_count, group_by_count
302 )));
303 }
304
305 if has_group_by {
310 let mut first_agg_pos = None;
311 let mut last_non_agg_pos = None;
312 for (i, item) in select_items.iter().enumerate() {
313 if let SelectItem::Expression { expr, .. } = item {
314 if matches!(expr, Expression::AggregateFunction { .. }) {
315 if first_agg_pos.is_none() {
316 first_agg_pos = Some(i);
317 }
318 } else {
319 last_non_agg_pos = Some(i);
320 }
321 }
322 }
323 if let (Some(first_agg), Some(last_non_agg)) = (first_agg_pos, last_non_agg_pos) {
325 if first_agg < last_non_agg {
326 log::debug!(
327 "Native columnar fallback: aggregate at {} before non-aggregate at {}",
328 first_agg,
329 last_non_agg
330 );
331 return Err(ExecutorError::UnsupportedFeature(
332 "SELECT list with aggregates before GROUP BY columns not supported in native columnar".to_string(),
333 ));
334 }
335 }
336 }
337
338 let return_empty_result = |has_gb: bool, specs: &[AggregateSpec]| -> PipelineOutput {
342 if has_gb {
343 PipelineOutput::from_rows(vec![])
345 } else {
346 let values: Vec<vibesql_types::SqlValue> = specs
348 .iter()
349 .map(|spec| match spec.op {
350 AggregateOp::Count => vibesql_types::SqlValue::Integer(0),
351 _ => vibesql_types::SqlValue::Null,
352 })
353 .collect();
354 PipelineOutput::from_rows(vec![Row::new(values)])
355 }
356 };
357
358 let batch = match input {
361 PipelineInput::Batch(batch) => batch,
362 PipelineInput::Rows(rows) => {
363 if rows.is_empty() {
364 return Ok(return_empty_result(has_group_by, &agg_specs));
365 }
366 ColumnarBatch::from_rows(rows)?
367 }
368 PipelineInput::RowsOwned(rows) => {
369 if rows.is_empty() {
370 return Ok(return_empty_result(has_group_by, &agg_specs));
371 }
372 ColumnarBatch::from_rows(&rows)?
373 }
374 PipelineInput::Empty => {
375 return Ok(return_empty_result(has_group_by, &agg_specs));
376 }
377 PipelineInput::NativeColumnar { table_name, .. } => {
378 let columnar_table = match ctx.database.get_columnar(&table_name) {
380 Ok(Some(ct)) => ct,
381 Ok(None) | Err(_) => {
382 return Err(ExecutorError::Other(format!(
383 "Table '{}' not found for columnar aggregation",
384 table_name
385 )));
386 }
387 };
388 ColumnarBatch::from_storage_columnar(&columnar_table)?
389 }
390 };
391
392 if batch.row_count() == 0 {
394 return Ok(return_empty_result(has_group_by, &agg_specs));
395 }
396
397 if let Some(group_exprs) = group_by {
399 if !group_exprs.is_empty() {
400 return self.execute_group_by(&batch, group_exprs, &agg_specs, ctx);
401 }
402 }
403
404 let needs_schema =
406 agg_specs.iter().any(|spec| matches!(spec.source, AggregateSource::Expression(_)));
407 let schema_ref = if needs_schema { Some(ctx.schema) } else { None };
408
409 let results =
410 crate::select::columnar::compute_aggregates_from_batch(&batch, &agg_specs, schema_ref)?;
411
412 Ok(PipelineOutput::from_rows(vec![Row::new(results)]))
413 }
414
415 fn supports_query_pattern(
417 &self,
418 has_aggregation: bool,
419 _has_group_by: bool,
420 has_joins: bool,
421 ) -> bool {
422 has_aggregation && !has_joins
427 }
428
429 #[inline]
430 fn name(&self) -> &'static str {
431 "NativeColumnarPipeline (SIMD)"
432 }
433}
434
435impl NativeColumnarPipeline {
436 fn fallback_filter(
438 &self,
439 rows: Vec<Row>,
440 predicate: Option<&Expression>,
441 ctx: &ExecutionContext<'_>,
442 ) -> Result<PipelineOutput, ExecutorError> {
443 if predicate.is_none() {
444 return Ok(PipelineOutput::from_rows(rows));
445 }
446
447 let predicate = predicate.unwrap();
448 let evaluator = ctx.create_evaluator();
449
450 let mut filtered = Vec::with_capacity(rows.len());
451 for row in rows {
452 let value = evaluator.eval(predicate, &row)?;
453 let include = match value {
454 vibesql_types::SqlValue::Boolean(true) => true,
455 vibesql_types::SqlValue::Boolean(false) | vibesql_types::SqlValue::Null => false,
456 vibesql_types::SqlValue::Integer(0) => false,
457 vibesql_types::SqlValue::Integer(_) => true,
458 _ => false,
459 };
460 if include {
461 filtered.push(row);
462 }
463 }
464
465 Ok(PipelineOutput::from_rows(filtered))
466 }
467
468 fn execute_group_by(
470 &self,
471 batch: &ColumnarBatch,
472 group_exprs: &[Expression],
473 agg_specs: &[AggregateSpec],
474 ctx: &ExecutionContext<'_>,
475 ) -> Result<PipelineOutput, ExecutorError> {
476 let group_cols: Vec<usize> = group_exprs
478 .iter()
479 .filter_map(|expr| match expr {
480 Expression::ColumnRef(col_id) => {
481 ctx.schema.get_column_index(col_id.table_canonical(), col_id.column_canonical())
482 }
483 _ => None,
484 })
485 .collect();
486
487 if group_cols.len() != group_exprs.len() {
488 return Err(ExecutorError::UnsupportedFeature(
489 "GROUP BY with non-column expressions not supported in native columnar".to_string(),
490 ));
491 }
492
493 let agg_cols: Vec<(usize, AggregateOp)> = agg_specs
495 .iter()
496 .filter_map(|spec| match &spec.source {
497 AggregateSource::Column(idx) => Some((*idx, spec.op)),
498 AggregateSource::CountStar => Some((0, AggregateOp::Count)),
499 AggregateSource::Expression(_) => None,
500 })
501 .collect();
502
503 if agg_cols.len() != agg_specs.len() {
504 return Err(ExecutorError::UnsupportedFeature(
505 "Expression aggregates not supported in native columnar GROUP BY".to_string(),
506 ));
507 }
508
509 let result =
511 crate::select::columnar::columnar_group_by_batch(batch, &group_cols, &agg_cols)?;
512
513 Ok(PipelineOutput::from_rows(result))
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use vibesql_catalog::TableSchema;
520 use vibesql_types::SqlValue;
521
522 use super::*;
523 use crate::schema::CombinedSchema;
524
525 fn create_test_setup() -> (vibesql_storage::Database, CombinedSchema) {
526 let database = vibesql_storage::Database::new();
527 let table_schema = TableSchema::new("test".to_string(), vec![]);
528 let schema = CombinedSchema::from_table("test".to_string(), table_schema);
529 (database, schema)
530 }
531
532 fn make_test_row(values: Vec<i64>) -> Row {
533 Row::new(values.into_iter().map(SqlValue::Integer).collect::<Vec<_>>())
534 }
535
536 #[test]
537 fn test_native_columnar_pipeline_name() {
538 let pipeline = NativeColumnarPipeline::new();
539 let name = pipeline.name();
540 assert!(name.starts_with("NativeColumnarPipeline"));
541 }
542
543 #[test]
544 fn test_native_columnar_pipeline_supports_aggregates() {
545 let pipeline = NativeColumnarPipeline::new();
546
547 let supports_simple = pipeline.supports_query_pattern(true, false, false);
549 assert!(supports_simple);
550
551 assert!(!pipeline.supports_query_pattern(true, false, true));
553 }
554
555 #[test]
556 fn test_native_columnar_pipeline_filter_no_predicate() {
557 let (database, schema) = create_test_setup();
558 let pipeline = NativeColumnarPipeline::new();
559
560 let rows = vec![make_test_row(vec![1, 2]), make_test_row(vec![3, 4])];
561 let input = PipelineInput::from_rows_owned(rows);
562 let ctx = ExecutionContext::new(&schema, &database);
563
564 let result = pipeline.apply_filter(input, None, &ctx).unwrap();
565 assert_eq!(result.row_count(), 2);
566 }
567
568 #[test]
569 fn test_native_columnar_pipeline_default() {
570 let pipeline = NativeColumnarPipeline::default();
571 assert!(pipeline.name().starts_with("NativeColumnarPipeline"));
572 }
573
574 #[test]
575 fn test_native_columnar_pipeline_limit_offset() {
576 let pipeline = NativeColumnarPipeline::new();
577
578 let rows = vec![
579 make_test_row(vec![1]),
580 make_test_row(vec![2]),
581 make_test_row(vec![3]),
582 make_test_row(vec![4]),
583 make_test_row(vec![5]),
584 ];
585 let output = PipelineOutput::from_rows(rows);
586
587 let result = pipeline.apply_limit_offset(output, Some(3), None).unwrap();
589 assert_eq!(result.len(), 3);
590 }
591}