vibesql_executor/pipeline/
native_columnar.rs1use vibesql_ast::{Expression, SelectItem};
8use vibesql_storage::Row;
9
10use crate::errors::ExecutorError;
11
12use super::{ExecutionContext, ExecutionPipeline, PipelineInput, PipelineOutput};
13
14use crate::select::columnar::{
15 extract_aggregates, extract_column_predicates, simd_filter_batch, AggregateOp, AggregateSource,
16 AggregateSpec, ColumnarBatch,
17};
18
19pub struct NativeColumnarPipeline {
50 #[allow(dead_code)]
52 has_columnar_storage: bool,
53}
54
55impl NativeColumnarPipeline {
56 #[inline]
58 pub fn new() -> Self {
59 Self { has_columnar_storage: true }
60 }
61
62 #[inline]
64 #[allow(dead_code)]
65 pub fn with_storage(has_columnar_storage: bool) -> Self {
66 Self { has_columnar_storage }
67 }
68}
69
70impl Default for NativeColumnarPipeline {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl ExecutionPipeline for NativeColumnarPipeline {
77 #[inline]
79 fn create_evaluator<'a>(
80 &self,
81 ctx: &'a ExecutionContext<'a>,
82 ) -> crate::evaluator::CombinedExpressionEvaluator<'a> {
83 ctx.create_evaluator()
84 }
85
86 fn apply_filter(
93 &self,
94 input: PipelineInput<'_>,
95 predicate: Option<&Expression>,
96 ctx: &ExecutionContext<'_>,
97 ) -> Result<PipelineOutput, ExecutorError> {
98 if let PipelineInput::NativeColumnar { table_name, column_indices: _ } = &input {
100 let columnar_table = match ctx.database.get_columnar(table_name) {
102 Ok(Some(ct)) => ct,
103 Ok(None) | Err(_) => {
104 return self.fallback_filter(input.into_rows(), predicate, ctx);
106 }
107 };
108
109 if columnar_table.row_count() == 0 {
111 return Ok(PipelineOutput::Empty);
112 }
113
114 let batch = ColumnarBatch::from_storage_columnar(&columnar_table)?;
116
117 if predicate.is_none() {
119 let rows = batch.to_rows()?;
120 return Ok(PipelineOutput::from_rows(rows));
121 }
122
123 let predicate = predicate.unwrap();
124
125 let predicates = match extract_column_predicates(predicate, ctx.schema) {
127 Some(preds) => preds,
128 None => {
129 let rows = batch.to_rows()?;
131 return self.fallback_filter(rows, Some(predicate), ctx);
132 }
133 };
134
135 let filtered_batch =
137 if predicates.is_empty() { batch } else { simd_filter_batch(&batch, &predicates)? };
138
139 return Ok(PipelineOutput::from_batch(filtered_batch));
142 }
143
144 let rows = input.into_rows();
146
147 if predicate.is_none() {
148 return Ok(PipelineOutput::from_rows(rows));
149 }
150
151 let predicate = predicate.unwrap();
152
153 let predicates = match extract_column_predicates(predicate, ctx.schema) {
155 Some(preds) => preds,
156 None => {
157 return self.fallback_filter(rows, Some(predicate), ctx);
158 }
159 };
160
161 if rows.is_empty() {
162 return Ok(PipelineOutput::Empty);
163 }
164
165 let batch = ColumnarBatch::from_rows(&rows)?;
167
168 let filtered_batch =
170 if predicates.is_empty() { batch } else { simd_filter_batch(&batch, &predicates)? };
171
172 Ok(PipelineOutput::from_batch(filtered_batch))
174 }
175
176 fn apply_projection(
182 &self,
183 input: PipelineInput<'_>,
184 select_items: &[SelectItem],
185 ctx: &ExecutionContext<'_>,
186 ) -> Result<PipelineOutput, ExecutorError> {
187 let rows = match input {
191 PipelineInput::Batch(batch) => {
192 if batch.row_count() == 0 {
193 return Ok(PipelineOutput::Empty);
194 }
195 batch.to_rows()?
196 }
197 PipelineInput::Rows(rows) => {
198 if rows.is_empty() {
199 return Ok(PipelineOutput::Empty);
200 }
201 rows.to_vec()
202 }
203 PipelineInput::RowsOwned(rows) => {
204 if rows.is_empty() {
205 return Ok(PipelineOutput::Empty);
206 }
207 rows
208 }
209 PipelineInput::Empty => return Ok(PipelineOutput::Empty),
210 PipelineInput::NativeColumnar { .. } => {
211 return Ok(PipelineOutput::Empty);
213 }
214 };
215
216 let evaluator = ctx.create_evaluator();
220 let buffer_pool = vibesql_storage::QueryBufferPool::new();
221
222 let mut projected_rows = Vec::with_capacity(rows.len());
223 for row in rows {
224 let projected = crate::select::projection::project_row_combined(
225 &row,
226 select_items,
227 &evaluator,
228 ctx.schema,
229 &None,
230 &buffer_pool,
231 )?;
232 projected_rows.push(projected);
233 }
234
235 Ok(PipelineOutput::from_rows(projected_rows))
236 }
237
238 fn apply_aggregation(
245 &self,
246 input: PipelineInput<'_>,
247 select_items: &[SelectItem],
248 group_by: Option<&[Expression]>,
249 _having: Option<&Expression>,
250 ctx: &ExecutionContext<'_>,
251 ) -> Result<PipelineOutput, ExecutorError> {
252 let agg_exprs: Vec<Expression> = select_items
254 .iter()
255 .filter_map(|item| {
256 if let SelectItem::Expression { expr, .. } = item {
257 Some(expr.clone())
258 } else {
259 None
260 }
261 })
262 .collect();
263
264 let agg_specs = match extract_aggregates(&agg_exprs, ctx.schema) {
266 Some(specs) => specs,
267 None => {
268 return Err(ExecutorError::UnsupportedFeature(
269 "Complex aggregates not supported in native columnar pipeline".to_string(),
270 ));
271 }
272 };
273
274 let batch = match input {
277 PipelineInput::Batch(batch) => batch,
278 PipelineInput::Rows(rows) => {
279 if rows.is_empty() {
280 let values: Vec<vibesql_types::SqlValue> = agg_specs
282 .iter()
283 .map(|spec| match spec.op {
284 AggregateOp::Count => vibesql_types::SqlValue::Integer(0),
285 _ => vibesql_types::SqlValue::Null,
286 })
287 .collect();
288 return Ok(PipelineOutput::from_rows(vec![Row::new(values)]));
289 }
290 ColumnarBatch::from_rows(rows)?
291 }
292 PipelineInput::RowsOwned(rows) => {
293 if rows.is_empty() {
294 let values: Vec<vibesql_types::SqlValue> = agg_specs
296 .iter()
297 .map(|spec| match spec.op {
298 AggregateOp::Count => vibesql_types::SqlValue::Integer(0),
299 _ => vibesql_types::SqlValue::Null,
300 })
301 .collect();
302 return Ok(PipelineOutput::from_rows(vec![Row::new(values)]));
303 }
304 ColumnarBatch::from_rows(&rows)?
305 }
306 PipelineInput::Empty => {
307 let values: Vec<vibesql_types::SqlValue> = agg_specs
309 .iter()
310 .map(|spec| match spec.op {
311 AggregateOp::Count => vibesql_types::SqlValue::Integer(0),
312 _ => vibesql_types::SqlValue::Null,
313 })
314 .collect();
315 return Ok(PipelineOutput::from_rows(vec![Row::new(values)]));
316 }
317 PipelineInput::NativeColumnar { table_name, .. } => {
318 let columnar_table = match ctx.database.get_columnar(&table_name) {
320 Ok(Some(ct)) => ct,
321 Ok(None) | Err(_) => {
322 return Err(ExecutorError::Other(format!(
323 "Table '{}' not found for columnar aggregation",
324 table_name
325 )));
326 }
327 };
328 ColumnarBatch::from_storage_columnar(&columnar_table)?
329 }
330 };
331
332 if batch.row_count() == 0 {
334 let values: Vec<vibesql_types::SqlValue> = agg_specs
335 .iter()
336 .map(|spec| match spec.op {
337 AggregateOp::Count => vibesql_types::SqlValue::Integer(0),
338 _ => vibesql_types::SqlValue::Null,
339 })
340 .collect();
341 return Ok(PipelineOutput::from_rows(vec![Row::new(values)]));
342 }
343
344 if let Some(group_exprs) = group_by {
346 if !group_exprs.is_empty() {
347 return self.execute_group_by(&batch, group_exprs, &agg_specs, ctx);
348 }
349 }
350
351 let needs_schema =
353 agg_specs.iter().any(|spec| matches!(spec.source, AggregateSource::Expression(_)));
354 let schema_ref = if needs_schema { Some(ctx.schema) } else { None };
355
356 let results =
357 crate::select::columnar::compute_aggregates_from_batch(&batch, &agg_specs, schema_ref)?;
358
359 Ok(PipelineOutput::from_rows(vec![Row::new(results)]))
360 }
361
362 fn supports_query_pattern(
364 &self,
365 has_aggregation: bool,
366 _has_group_by: bool,
367 has_joins: bool,
368 ) -> bool {
369 has_aggregation && !has_joins
374 }
375
376 #[inline]
377 fn name(&self) -> &'static str {
378 "NativeColumnarPipeline (SIMD)"
379 }
380}
381
382impl NativeColumnarPipeline {
383 fn fallback_filter(
385 &self,
386 rows: Vec<Row>,
387 predicate: Option<&Expression>,
388 ctx: &ExecutionContext<'_>,
389 ) -> Result<PipelineOutput, ExecutorError> {
390 if predicate.is_none() {
391 return Ok(PipelineOutput::from_rows(rows));
392 }
393
394 let predicate = predicate.unwrap();
395 let evaluator = ctx.create_evaluator();
396
397 let mut filtered = Vec::with_capacity(rows.len());
398 for row in rows {
399 let value = evaluator.eval(predicate, &row)?;
400 let include = match value {
401 vibesql_types::SqlValue::Boolean(true) => true,
402 vibesql_types::SqlValue::Boolean(false) | vibesql_types::SqlValue::Null => false,
403 vibesql_types::SqlValue::Integer(0) => false,
404 vibesql_types::SqlValue::Integer(_) => true,
405 _ => false,
406 };
407 if include {
408 filtered.push(row);
409 }
410 }
411
412 Ok(PipelineOutput::from_rows(filtered))
413 }
414
415 fn execute_group_by(
417 &self,
418 batch: &ColumnarBatch,
419 group_exprs: &[Expression],
420 agg_specs: &[AggregateSpec],
421 ctx: &ExecutionContext<'_>,
422 ) -> Result<PipelineOutput, ExecutorError> {
423 let group_cols: Vec<usize> = group_exprs
425 .iter()
426 .filter_map(|expr| match expr {
427 Expression::ColumnRef { table, column } => {
428 ctx.schema.get_column_index(table.as_deref(), column.as_str())
429 }
430 _ => None,
431 })
432 .collect();
433
434 if group_cols.len() != group_exprs.len() {
435 return Err(ExecutorError::UnsupportedFeature(
436 "GROUP BY with non-column expressions not supported in native columnar".to_string(),
437 ));
438 }
439
440 let agg_cols: Vec<(usize, AggregateOp)> = agg_specs
442 .iter()
443 .filter_map(|spec| match &spec.source {
444 AggregateSource::Column(idx) => Some((*idx, spec.op)),
445 AggregateSource::CountStar => Some((0, AggregateOp::Count)),
446 AggregateSource::Expression(_) => None,
447 })
448 .collect();
449
450 if agg_cols.len() != agg_specs.len() {
451 return Err(ExecutorError::UnsupportedFeature(
452 "Expression aggregates not supported in native columnar GROUP BY".to_string(),
453 ));
454 }
455
456 let result =
458 crate::select::columnar::columnar_group_by_batch(batch, &group_cols, &agg_cols)?;
459
460 Ok(PipelineOutput::from_rows(result))
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467 use crate::schema::CombinedSchema;
468 use vibesql_catalog::TableSchema;
469 use vibesql_types::SqlValue;
470
471 fn create_test_setup() -> (vibesql_storage::Database, CombinedSchema) {
472 let database = vibesql_storage::Database::new();
473 let table_schema = TableSchema::new("test".to_string(), vec![]);
474 let schema = CombinedSchema::from_table("test".to_string(), table_schema);
475 (database, schema)
476 }
477
478 fn make_test_row(values: Vec<i64>) -> Row {
479 Row::new(values.into_iter().map(SqlValue::Integer).collect())
480 }
481
482 #[test]
483 fn test_native_columnar_pipeline_name() {
484 let pipeline = NativeColumnarPipeline::new();
485 let name = pipeline.name();
486 assert!(name.starts_with("NativeColumnarPipeline"));
487 }
488
489 #[test]
490 fn test_native_columnar_pipeline_supports_aggregates() {
491 let pipeline = NativeColumnarPipeline::new();
492
493 let supports_simple = pipeline.supports_query_pattern(true, false, false);
495 assert!(supports_simple);
496
497 assert!(!pipeline.supports_query_pattern(true, false, true));
499 }
500
501 #[test]
502 fn test_native_columnar_pipeline_filter_no_predicate() {
503 let (database, schema) = create_test_setup();
504 let pipeline = NativeColumnarPipeline::new();
505
506 let rows = vec![make_test_row(vec![1, 2]), make_test_row(vec![3, 4])];
507 let input = PipelineInput::from_rows_owned(rows);
508 let ctx = ExecutionContext::new(&schema, &database);
509
510 let result = pipeline.apply_filter(input, None, &ctx).unwrap();
511 assert_eq!(result.row_count(), 2);
512 }
513
514 #[test]
515 fn test_native_columnar_pipeline_default() {
516 let pipeline = NativeColumnarPipeline::default();
517 assert!(pipeline.name().starts_with("NativeColumnarPipeline"));
518 }
519
520 #[test]
521 fn test_native_columnar_pipeline_limit_offset() {
522 let pipeline = NativeColumnarPipeline::new();
523
524 let rows = vec![
525 make_test_row(vec![1]),
526 make_test_row(vec![2]),
527 make_test_row(vec![3]),
528 make_test_row(vec![4]),
529 make_test_row(vec![5]),
530 ];
531 let output = PipelineOutput::from_rows(rows);
532
533 let result = pipeline.apply_limit_offset(output, Some(3), None).unwrap();
535 assert_eq!(result.len(), 3);
536 }
537}