Skip to main content

sochdb_query/
soch_ql_executor.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SOCH-QL Query Executor (Task 6)
19//!
20//! End-to-end SOCH-QL query execution pipeline:
21//! 1. parse(sql) → SochQuery
22//! 2. validate(query, catalog) → Result<()>
23//! 3. plan(query, stats) → QueryPlan
24//! 4. execute(plan, storage) → SochTable
25//!
26//! ## Token Reduction Model
27//!
28//! ```text
29//! tokens_JSON(table) ≈ 4 + Σ(|field_name| + |value| + 4) per row
30//! tokens_TOON(table) ≈ header + Σ(|value| + 1) per row
31//! reduction = 1 - tokens_TOON/tokens_JSON ≈ 0.4 to 0.6
32//!
33//! For 100 rows × 5 fields:
34//! JSON: ~100 × (5 × 15) = 7,500 tokens
35//! TOON: ~50 + 100 × 25 = 2,550 tokens → 66% reduction
36//! ```
37
38use crate::soch_ql::{
39    ComparisonOp, LogicalOp, SelectQuery, SochQlParser, SochQuery, SochResult, SochValue,
40    SortDirection, WhereClause,
41};
42#[cfg(test)]
43use crate::soch_ql::{Condition, OrderBy};
44use sochdb_core::{Catalog, Result, SochDBError, SochRow, SochValue as CoreSochValue};
45#[cfg(test)]
46use sochdb_core::{SochSchema, SochType};
47use std::collections::HashMap;
48
49/// Query plan operators
50#[derive(Debug, Clone)]
51pub enum QueryPlan {
52    /// Full table scan
53    TableScan {
54        table: String,
55        columns: Vec<String>,
56        predicate: Option<Box<QueryPlan>>,
57    },
58    /// Index seek (primary or secondary)
59    IndexSeek { index: String, key_range: KeyRange },
60    /// Filter rows
61    Filter {
62        input: Box<QueryPlan>,
63        predicate: Predicate,
64    },
65    /// Project columns
66    Project {
67        input: Box<QueryPlan>,
68        columns: Vec<String>,
69    },
70    /// Sort results
71    Sort {
72        input: Box<QueryPlan>,
73        order_by: Vec<(String, bool)>, // (column, ascending)
74    },
75    /// Limit results
76    Limit {
77        input: Box<QueryPlan>,
78        count: usize,
79        offset: usize,
80    },
81    /// Empty result
82    Empty,
83}
84
85/// Key range for index seeks
86#[derive(Debug, Clone)]
87pub struct KeyRange {
88    pub start: Option<SochValue>,
89    pub end: Option<SochValue>,
90    pub inclusive_start: bool,
91    pub inclusive_end: bool,
92}
93
94impl KeyRange {
95    pub fn all() -> Self {
96        Self {
97            start: None,
98            end: None,
99            inclusive_start: true,
100            inclusive_end: true,
101        }
102    }
103
104    pub fn eq(value: SochValue) -> Self {
105        Self {
106            start: Some(value.clone()),
107            end: Some(value),
108            inclusive_start: true,
109            inclusive_end: true,
110        }
111    }
112}
113
114/// Query predicate
115#[derive(Debug, Clone)]
116pub struct Predicate {
117    pub conditions: Vec<PredicateCondition>,
118    pub operator: LogicalOp,
119}
120
121/// Single predicate condition (uses CoreSochValue for row compatibility)
122#[derive(Debug, Clone)]
123pub struct PredicateCondition {
124    pub column: String,
125    pub operator: ComparisonOp,
126    pub value: CoreSochValue,
127}
128
129impl PredicateCondition {
130    /// Create from soch_ql SochValue
131    pub fn from_soch_ql(column: String, operator: ComparisonOp, value: &SochValue) -> Self {
132        Self {
133            column,
134            operator,
135            value: Self::convert_value(value),
136        }
137    }
138
139    /// Convert soch_ql::SochValue to CoreSochValue
140    fn convert_value(v: &SochValue) -> CoreSochValue {
141        match v {
142            SochValue::Int(i) => CoreSochValue::Int(*i),
143            SochValue::UInt(u) => CoreSochValue::UInt(*u),
144            SochValue::Float(f) => CoreSochValue::Float(*f),
145            SochValue::Text(s) => CoreSochValue::Text(s.clone()),
146            SochValue::Bool(b) => CoreSochValue::Bool(*b),
147            SochValue::Null => CoreSochValue::Null,
148            SochValue::Binary(b) => CoreSochValue::Binary(b.clone()),
149            SochValue::Array(arr) => {
150                CoreSochValue::Array(arr.iter().map(Self::convert_value).collect())
151            }
152        }
153    }
154
155    /// Evaluate predicate against a row
156    pub fn evaluate(&self, row: &SochRow, column_idx: usize) -> bool {
157        if column_idx >= row.values.len() {
158            return false;
159        }
160
161        let row_value = &row.values[column_idx];
162
163        match self.operator {
164            ComparisonOp::Eq => row_value == &self.value,
165            ComparisonOp::Ne => row_value != &self.value,
166            ComparisonOp::Lt => {
167                Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Less)
168            }
169            ComparisonOp::Le => matches!(
170                Self::compare(row_value, &self.value),
171                Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
172            ),
173            ComparisonOp::Gt => {
174                Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Greater)
175            }
176            ComparisonOp::Ge => matches!(
177                Self::compare(row_value, &self.value),
178                Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
179            ),
180            ComparisonOp::Like => Self::like_match(row_value, &self.value),
181            ComparisonOp::In => Self::in_match(row_value, &self.value),
182            ComparisonOp::SimilarTo => {
183                // SimilarTo is used for vector similarity search
184                // Evaluated by the vector index, not row-by-row comparison
185                // For row-level evaluation, we fall back to Like-style matching
186                Self::like_match(row_value, &self.value)
187            }
188        }
189    }
190
191    fn compare(a: &CoreSochValue, b: &CoreSochValue) -> Option<std::cmp::Ordering> {
192        match (a, b) {
193            (CoreSochValue::Int(a), CoreSochValue::Int(b)) => Some(a.cmp(b)),
194            (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => Some(a.cmp(b)),
195            (CoreSochValue::Float(a), CoreSochValue::Float(b)) => a.partial_cmp(b),
196            (CoreSochValue::Text(a), CoreSochValue::Text(b)) => Some(a.cmp(b)),
197            _ => None,
198        }
199    }
200
201    fn like_match(value: &CoreSochValue, pattern: &CoreSochValue) -> bool {
202        match (value, pattern) {
203            (CoreSochValue::Text(v), CoreSochValue::Text(p)) => {
204                // Canonical LIKE matcher: % matches any run, _ matches one char,
205                // all other characters (including regex metacharacters) literal.
206                crate::like::like_match(v, p)
207            }
208            _ => false,
209        }
210    }
211
212    fn in_match(value: &CoreSochValue, list: &CoreSochValue) -> bool {
213        match list {
214            CoreSochValue::Array(values) => values.iter().any(|v| value == v),
215            _ => value == list, // Single value comparison fallback
216        }
217    }
218}
219
220impl Predicate {
221    /// Evaluate predicate against a row
222    pub fn evaluate(&self, row: &SochRow, column_map: &HashMap<String, usize>) -> bool {
223        let results: Vec<bool> = self
224            .conditions
225            .iter()
226            .map(|cond| {
227                column_map
228                    .get(&cond.column)
229                    .map(|&idx| cond.evaluate(row, idx))
230                    .unwrap_or(false)
231            })
232            .collect();
233
234        match self.operator {
235            LogicalOp::And => results.iter().all(|&r| r),
236            LogicalOp::Or => results.iter().any(|&r| r),
237        }
238    }
239}
240
241/// SOCH-QL Query Executor
242///
243/// When constructed with a `StorageBackend`, queries read real data from storage.
244/// Without a backend (`new()`), returns empty result sets (for testing/planning only).
245pub struct SochQlExecutor {
246    storage: Option<std::sync::Arc<dyn crate::optimizer_integration::StorageBackend>>,
247}
248
249impl SochQlExecutor {
250    /// Create a new executor without storage (returns empty results)
251    pub fn new() -> Self {
252        Self { storage: None }
253    }
254
255    /// Create an executor wired to a real storage backend
256    pub fn with_storage(
257        storage: std::sync::Arc<dyn crate::optimizer_integration::StorageBackend>,
258    ) -> Self {
259        Self {
260            storage: Some(storage),
261        }
262    }
263
264    /// Execute a SOCH-QL query string
265    pub fn execute(&self, query: &str, catalog: &Catalog) -> Result<SochResult> {
266        // Parse
267        let parsed = SochQlParser::parse(query)
268            .map_err(|e| SochDBError::InvalidArgument(format!("Parse error: {:?}", e)))?;
269
270        // Validate
271        self.validate(&parsed, catalog)?;
272
273        // Plan
274        let plan = self.plan(&parsed, catalog)?;
275
276        // Execute
277        self.execute_plan(&plan, catalog)
278    }
279
280    /// Validate a parsed query against the catalog
281    pub fn validate(&self, query: &SochQuery, catalog: &Catalog) -> Result<()> {
282        match query {
283            SochQuery::Select(select) => {
284                // Check table exists
285                if catalog.get_table(&select.table).is_none() {
286                    return Err(SochDBError::NotFound(format!(
287                        "Table '{}' not found",
288                        select.table
289                    )));
290                }
291
292                // Check columns exist (if not *)
293                if let Some(entry) = catalog.get_table(&select.table)
294                    && let Some(schema) = &entry.schema
295                {
296                    for col in &select.columns {
297                        if col != "*" && !schema.fields.iter().any(|f| &f.name == col) {
298                            return Err(SochDBError::InvalidArgument(format!(
299                                "Column '{}' not found in table '{}'",
300                                col, select.table
301                            )));
302                        }
303                    }
304                }
305
306                Ok(())
307            }
308            SochQuery::Insert(insert) => {
309                // Check table exists
310                if catalog.get_table(&insert.table).is_none() {
311                    return Err(SochDBError::NotFound(format!(
312                        "Table '{}' not found",
313                        insert.table
314                    )));
315                }
316                Ok(())
317            }
318            SochQuery::CreateTable(create) => {
319                // Check table doesn't exist
320                if catalog.get_table(&create.table).is_some() {
321                    return Err(SochDBError::InvalidArgument(format!(
322                        "Table '{}' already exists",
323                        create.table
324                    )));
325                }
326                Ok(())
327            }
328            SochQuery::DropTable { table } => {
329                if catalog.get_table(table).is_none() {
330                    return Err(SochDBError::NotFound(format!(
331                        "Table '{}' not found",
332                        table
333                    )));
334                }
335                Ok(())
336            }
337        }
338    }
339
340    /// Generate a query plan
341    pub fn plan(&self, query: &SochQuery, catalog: &Catalog) -> Result<QueryPlan> {
342        match query {
343            SochQuery::Select(select) => self.plan_select(select, catalog),
344            _ => Err(SochDBError::InvalidArgument(
345                "Only SELECT queries can be planned".to_string(),
346            )),
347        }
348    }
349
350    fn plan_select(&self, select: &SelectQuery, _catalog: &Catalog) -> Result<QueryPlan> {
351        // Start with table scan
352        let mut plan = QueryPlan::TableScan {
353            table: select.table.clone(),
354            columns: select.columns.clone(),
355            predicate: None,
356        };
357
358        // Add filter if WHERE clause present
359        if let Some(where_clause) = &select.where_clause {
360            let predicate = self.build_predicate(where_clause);
361            plan = QueryPlan::Filter {
362                input: Box::new(plan),
363                predicate,
364            };
365        }
366
367        // Add projection (column selection)
368        if !select.columns.contains(&"*".to_string()) {
369            plan = QueryPlan::Project {
370                input: Box::new(plan),
371                columns: select.columns.clone(),
372            };
373        }
374
375        // Add sort if ORDER BY present
376        if let Some(order_by) = &select.order_by {
377            plan = QueryPlan::Sort {
378                input: Box::new(plan),
379                order_by: vec![(
380                    order_by.column.clone(),
381                    matches!(order_by.direction, SortDirection::Asc),
382                )],
383            };
384        }
385
386        // Add limit if present
387        if select.limit.is_some() || select.offset.is_some() {
388            plan = QueryPlan::Limit {
389                input: Box::new(plan),
390                count: select.limit.unwrap_or(usize::MAX),
391                offset: select.offset.unwrap_or(0),
392            };
393        }
394
395        Ok(plan)
396    }
397
398    fn build_predicate(&self, where_clause: &WhereClause) -> Predicate {
399        Predicate {
400            conditions: where_clause
401                .conditions
402                .iter()
403                .map(|c| PredicateCondition::from_soch_ql(c.column.clone(), c.operator, &c.value))
404                .collect(),
405            operator: where_clause.operator,
406        }
407    }
408
409    /// Execute a query plan
410    ///
411    /// When a `StorageBackend` is present, this reads real data from storage.
412    /// Without one, returns empty result sets (legacy behavior).
413    #[allow(clippy::only_used_in_recursion)]
414    pub fn execute_plan(&self, plan: &QueryPlan, catalog: &Catalog) -> Result<SochResult> {
415        match plan {
416            QueryPlan::Empty => Ok(SochResult {
417                table: "result".to_string(),
418                columns: vec![],
419                rows: vec![],
420            }),
421            QueryPlan::TableScan { table, columns, .. } => {
422                // Get schema from catalog
423                let schema_columns = if let Some(entry) = catalog.get_table(table) {
424                    if let Some(schema) = &entry.schema {
425                        if columns.contains(&"*".to_string()) {
426                            schema.fields.iter().map(|f| f.name.clone()).collect()
427                        } else {
428                            columns.clone()
429                        }
430                    } else {
431                        columns.clone()
432                    }
433                } else {
434                    columns.clone()
435                };
436
437                // ---- Phase 0: Read from storage if backend is wired ----
438                let rows = if let Some(storage) = &self.storage {
439                    let raw_rows = storage.table_scan(table, &schema_columns, None)?;
440                    raw_rows
441                        .into_iter()
442                        .map(|row| {
443                            schema_columns
444                                .iter()
445                                .map(|c| row.get(c).cloned().unwrap_or(SochValue::Null))
446                                .collect()
447                        })
448                        .collect()
449                } else {
450                    vec![] // No storage backend — empty results (legacy)
451                };
452
453                Ok(SochResult {
454                    table: table.clone(),
455                    columns: schema_columns,
456                    rows,
457                })
458            }
459            QueryPlan::Filter { input, predicate } => {
460                let mut result = self.execute_plan(input, catalog)?;
461                // Build column index map
462                let col_map: HashMap<String, usize> = result
463                    .columns
464                    .iter()
465                    .enumerate()
466                    .map(|(i, c)| (c.clone(), i))
467                    .collect();
468                // Filter rows using predicate
469                result.rows.retain(|row| {
470                    let matches: Vec<bool> = predicate
471                        .conditions
472                        .iter()
473                        .map(|cond| {
474                            if let Some(&idx) = col_map.get(&cond.column) {
475                                if idx < row.len() {
476                                    Self::eval_predicate_condition(cond, &row[idx])
477                                } else {
478                                    false
479                                }
480                            } else {
481                                false
482                            }
483                        })
484                        .collect();
485                    match predicate.operator {
486                        LogicalOp::And => matches.iter().all(|&m| m),
487                        LogicalOp::Or => matches.iter().any(|&m| m),
488                    }
489                });
490                Ok(result)
491            }
492            QueryPlan::Project { input, columns } => {
493                let mut result = self.execute_plan(input, catalog)?;
494                // Build index map from current columns
495                let col_map: HashMap<String, usize> = result
496                    .columns
497                    .iter()
498                    .enumerate()
499                    .map(|(i, c)| (c.clone(), i))
500                    .collect();
501                // Re-project rows
502                result.rows = result
503                    .rows
504                    .into_iter()
505                    .map(|row| {
506                        columns
507                            .iter()
508                            .map(|c| {
509                                col_map
510                                    .get(c)
511                                    .and_then(|&i| row.get(i).cloned())
512                                    .unwrap_or(SochValue::Null)
513                            })
514                            .collect()
515                    })
516                    .collect();
517                result.columns = columns.clone();
518                Ok(result)
519            }
520            QueryPlan::Sort { input, order_by } => {
521                let mut result = self.execute_plan(input, catalog)?;
522                let col_map: HashMap<String, usize> = result
523                    .columns
524                    .iter()
525                    .enumerate()
526                    .map(|(i, c)| (c.clone(), i))
527                    .collect();
528                result.rows.sort_by(|a, b| {
529                    for (col, ascending) in order_by {
530                        if let Some(&idx) = col_map.get(col) {
531                            let va = a.get(idx);
532                            let vb = b.get(idx);
533                            let cmp = Self::compare_soch_values(va, vb);
534                            let cmp = if *ascending { cmp } else { cmp.reverse() };
535                            if cmp != std::cmp::Ordering::Equal {
536                                return cmp;
537                            }
538                        }
539                    }
540                    std::cmp::Ordering::Equal
541                });
542                Ok(result)
543            }
544            QueryPlan::Limit {
545                input,
546                count,
547                offset,
548            } => {
549                let mut result = self.execute_plan(input, catalog)?;
550                result.rows = result.rows.into_iter().skip(*offset).take(*count).collect();
551                Ok(result)
552            }
553            QueryPlan::IndexSeek { .. } => {
554                // Index seek — fallback to empty (TODO: wire to storage index)
555                Ok(SochResult {
556                    table: "result".to_string(),
557                    columns: vec![],
558                    rows: vec![],
559                })
560            }
561        }
562    }
563
564    /// Compare two optional SochValues for sorting
565    fn compare_soch_values(a: Option<&SochValue>, b: Option<&SochValue>) -> std::cmp::Ordering {
566        match (a, b) {
567            (None, None) => std::cmp::Ordering::Equal,
568            (None, Some(_)) => std::cmp::Ordering::Less,
569            (Some(_), None) => std::cmp::Ordering::Greater,
570            (Some(a), Some(b)) => match (a, b) {
571                (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
572                (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
573                (SochValue::Float(a), SochValue::Float(b)) => {
574                    a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
575                }
576                (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
577                (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
578                _ => std::cmp::Ordering::Equal,
579            },
580        }
581    }
582
583    /// Evaluate a single predicate condition against a SochValue
584    fn eval_predicate_condition(cond: &PredicateCondition, value: &SochValue) -> bool {
585        // Convert SochValue to CoreSochValue for comparison
586        let core_val = PredicateCondition::convert_value(value);
587        match cond.operator {
588            ComparisonOp::Eq => core_val == cond.value,
589            ComparisonOp::Ne => core_val != cond.value,
590            ComparisonOp::Lt => {
591                PredicateCondition::compare(&core_val, &cond.value)
592                    == Some(std::cmp::Ordering::Less)
593            }
594            ComparisonOp::Le => matches!(
595                PredicateCondition::compare(&core_val, &cond.value),
596                Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
597            ),
598            ComparisonOp::Gt => {
599                PredicateCondition::compare(&core_val, &cond.value)
600                    == Some(std::cmp::Ordering::Greater)
601            }
602            ComparisonOp::Ge => matches!(
603                PredicateCondition::compare(&core_val, &cond.value),
604                Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
605            ),
606            ComparisonOp::Like => PredicateCondition::like_match(&core_val, &cond.value),
607            ComparisonOp::In => PredicateCondition::in_match(&core_val, &cond.value),
608            ComparisonOp::SimilarTo => PredicateCondition::like_match(&core_val, &cond.value),
609        }
610    }
611}
612
613impl Default for SochQlExecutor {
614    fn default() -> Self {
615        Self { storage: None }
616    }
617}
618
619/// Execute a SOCH-QL query (convenience function)
620pub fn execute_sochql(query: &str, catalog: &Catalog) -> Result<SochResult> {
621    SochQlExecutor::new().execute(query, catalog)
622}
623
624/// Estimate token reduction for a TOON result vs JSON
625pub fn estimate_token_reduction(result: &SochResult) -> TokenReductionStats {
626    let row_count = result.rows.len();
627    let col_count = result.columns.len();
628
629    if row_count == 0 || col_count == 0 {
630        return TokenReductionStats::default();
631    }
632
633    // Estimate JSON tokens
634    // Format: [{"col1": "val1", "col2": "val2"}, ...]
635    let avg_col_name_len: usize = result.columns.iter().map(|c| c.len()).sum::<usize>() / col_count;
636    let avg_value_len = 10; // Rough estimate
637
638    // JSON: 2 (brackets) + row_count * (2 + col_count * (col_name + 4 + value))
639    let json_tokens = 2 + row_count * (2 + col_count * (avg_col_name_len + 4 + avg_value_len));
640
641    // TOON: header + row_count * (col_count * value + col_count)
642    // Header: table[count]{col1,col2,...}:
643    let header_tokens = result.table.len() + 10 + result.columns.join(",").len();
644    let soch_tokens = header_tokens + row_count * (col_count * avg_value_len + col_count);
645
646    let reduction = 1.0 - (soch_tokens as f64 / json_tokens as f64);
647
648    TokenReductionStats {
649        json_tokens,
650        soch_tokens,
651        reduction_percent: (reduction * 100.0) as u32,
652        row_count,
653        col_count,
654    }
655}
656
657/// Token reduction statistics
658#[derive(Debug, Clone, Default)]
659pub struct TokenReductionStats {
660    /// Estimated JSON tokens
661    pub json_tokens: usize,
662    /// Estimated TOON tokens
663    pub soch_tokens: usize,
664    /// Reduction percentage
665    pub reduction_percent: u32,
666    /// Row count
667    pub row_count: usize,
668    /// Column count
669    pub col_count: usize,
670}
671
672// ============================================================================
673// Task 12: Vectorized Predicate Evaluation
674// ============================================================================
675
676/// Selection vector for batch predicate evaluation
677///
678/// Maintains indices of rows that pass all predicates so far.
679/// Short-circuits at batch level when selection becomes empty.
680#[derive(Debug, Clone)]
681pub struct SelectionVector {
682    /// Selected row indices (sorted)
683    indices: Vec<u32>,
684    /// Original batch size
685    batch_size: usize,
686}
687
688impl SelectionVector {
689    /// Create selection with all rows selected
690    pub fn all(batch_size: usize) -> Self {
691        Self {
692            indices: (0..batch_size as u32).collect(),
693            batch_size,
694        }
695    }
696
697    /// Create empty selection
698    pub fn empty() -> Self {
699        Self {
700            indices: Vec::new(),
701            batch_size: 0,
702        }
703    }
704
705    /// Create from specific indices
706    pub fn from_indices(indices: Vec<u32>, batch_size: usize) -> Self {
707        Self {
708            indices,
709            batch_size,
710        }
711    }
712
713    /// Check if selection is empty (short-circuit condition)
714    #[inline]
715    pub fn is_empty(&self) -> bool {
716        self.indices.is_empty()
717    }
718
719    /// Number of selected rows
720    #[inline]
721    pub fn len(&self) -> usize {
722        self.indices.len()
723    }
724
725    /// Original batch size
726    #[inline]
727    pub fn batch_size(&self) -> usize {
728        self.batch_size
729    }
730
731    /// Selectivity ratio
732    #[inline]
733    pub fn selectivity(&self) -> f64 {
734        if self.batch_size == 0 {
735            0.0
736        } else {
737            self.len() as f64 / self.batch_size as f64
738        }
739    }
740
741    /// Iterate over selected indices
742    pub fn iter(&self) -> impl Iterator<Item = u32> + '_ {
743        self.indices.iter().copied()
744    }
745
746    /// Filter selection with a predicate, returning new selection
747    pub fn filter<F>(&self, pred: F) -> Self
748    where
749        F: Fn(u32) -> bool,
750    {
751        Self {
752            indices: self.indices.iter().copied().filter(|&i| pred(i)).collect(),
753            batch_size: self.batch_size,
754        }
755    }
756
757    /// Extend with masked indices (for SIMD results)
758    pub fn extend_masked(&mut self, start_idx: usize, mask: u16) {
759        for bit in 0..16 {
760            if (mask >> bit) & 1 == 1 {
761                self.indices.push((start_idx + bit) as u32);
762            }
763        }
764    }
765}
766
767/// Columnar batch for vectorized processing
768#[derive(Debug, Clone)]
769pub struct ColumnBatch {
770    /// Column values
771    pub values: Vec<CoreSochValue>,
772    /// Column name
773    pub name: String,
774}
775
776impl ColumnBatch {
777    /// Create from column data
778    pub fn new(name: String, values: Vec<CoreSochValue>) -> Self {
779        Self { values, name }
780    }
781
782    /// Get value at index
783    #[inline]
784    pub fn get(&self, idx: usize) -> Option<&CoreSochValue> {
785        self.values.get(idx)
786    }
787
788    /// Get raw integer data pointer (for SIMD)
789    #[allow(dead_code)]
790    pub fn as_i64_slice(&self) -> Option<Vec<i64>> {
791        self.values
792            .iter()
793            .map(|v| match v {
794                CoreSochValue::Int(i) => Some(*i),
795                CoreSochValue::UInt(u) => Some(*u as i64),
796                _ => None,
797            })
798            .collect()
799    }
800
801    /// Batch size
802    pub fn len(&self) -> usize {
803        self.values.len()
804    }
805
806    /// Is empty
807    pub fn is_empty(&self) -> bool {
808        self.values.is_empty()
809    }
810}
811
812/// Vectorized predicate for batch evaluation
813#[derive(Debug, Clone)]
814pub enum VectorPredicate {
815    /// Integer greater than
816    IntGt { col_idx: usize, threshold: i64 },
817    /// Integer less than
818    IntLt { col_idx: usize, threshold: i64 },
819    /// Integer equals
820    IntEq { col_idx: usize, value: i64 },
821    /// Integer greater or equal
822    IntGe { col_idx: usize, threshold: i64 },
823    /// Integer less or equal
824    IntLe { col_idx: usize, threshold: i64 },
825    /// String equals
826    StrEq { col_idx: usize, value: String },
827    /// String prefix match
828    StrPrefix { col_idx: usize, prefix: String },
829    /// Boolean equals
830    BoolEq { col_idx: usize, value: bool },
831    /// Is null check
832    IsNull { col_idx: usize },
833    /// Is not null check
834    IsNotNull { col_idx: usize },
835}
836
837/// Vectorized executor for batch predicate evaluation
838///
839/// ## Performance Characteristics
840///
841/// - Traditional row-at-a-time: ~100M rows/sec (branch misprediction bound)
842/// - Vectorized with selection vector: ~1B rows/sec (branch-free)
843///
844/// ## Usage
845///
846/// ```ignore
847/// let executor = VectorizedExecutor::new(1024);
848/// let columns = vec![/* column batches */];
849/// let predicates = vec![/* predicates */];
850/// let selection = executor.evaluate_batch(&columns, &predicates);
851/// ```
852pub struct VectorizedExecutor {
853    /// Batch size for processing
854    batch_size: usize,
855}
856
857impl VectorizedExecutor {
858    /// Create with specified batch size
859    pub fn new(batch_size: usize) -> Self {
860        Self { batch_size }
861    }
862
863    /// Default batch size (1024 rows)
864    pub fn default_batch_size() -> usize {
865        1024
866    }
867
868    /// Evaluate predicates on columnar batch
869    ///
870    /// Returns selection vector of rows that pass all predicates.
871    /// Short-circuits at batch level when selection becomes empty.
872    pub fn evaluate_batch(
873        &self,
874        columns: &[ColumnBatch],
875        predicates: &[VectorPredicate],
876    ) -> SelectionVector {
877        if columns.is_empty() {
878            return SelectionVector::empty();
879        }
880
881        let batch_size = columns[0].len().min(self.batch_size);
882        let mut selection = SelectionVector::all(batch_size);
883
884        // Process predicates with short-circuit
885        for predicate in predicates {
886            if selection.is_empty() {
887                break; // Batch-level short-circuit
888            }
889
890            selection = match predicate {
891                VectorPredicate::IntGt { col_idx, threshold } => {
892                    self.filter_int_gt(&columns[*col_idx], *threshold, &selection)
893                }
894                VectorPredicate::IntLt { col_idx, threshold } => {
895                    self.filter_int_lt(&columns[*col_idx], *threshold, &selection)
896                }
897                VectorPredicate::IntEq { col_idx, value } => {
898                    self.filter_int_eq(&columns[*col_idx], *value, &selection)
899                }
900                VectorPredicate::IntGe { col_idx, threshold } => {
901                    self.filter_int_ge(&columns[*col_idx], *threshold, &selection)
902                }
903                VectorPredicate::IntLe { col_idx, threshold } => {
904                    self.filter_int_le(&columns[*col_idx], *threshold, &selection)
905                }
906                VectorPredicate::StrEq { col_idx, value } => {
907                    self.filter_str_eq(&columns[*col_idx], value, &selection)
908                }
909                VectorPredicate::StrPrefix { col_idx, prefix } => {
910                    self.filter_str_prefix(&columns[*col_idx], prefix, &selection)
911                }
912                VectorPredicate::BoolEq { col_idx, value } => {
913                    self.filter_bool_eq(&columns[*col_idx], *value, &selection)
914                }
915                VectorPredicate::IsNull { col_idx } => {
916                    self.filter_is_null(&columns[*col_idx], &selection)
917                }
918                VectorPredicate::IsNotNull { col_idx } => {
919                    self.filter_is_not_null(&columns[*col_idx], &selection)
920                }
921            };
922        }
923
924        selection
925    }
926
927    /// Filter: column > threshold
928    #[inline]
929    fn filter_int_gt(
930        &self,
931        column: &ColumnBatch,
932        threshold: i64,
933        selection: &SelectionVector,
934    ) -> SelectionVector {
935        selection.filter(|idx| match column.get(idx as usize) {
936            Some(CoreSochValue::Int(v)) => *v > threshold,
937            Some(CoreSochValue::UInt(v)) => (*v as i64) > threshold,
938            _ => false,
939        })
940    }
941
942    /// Filter: column < threshold
943    #[inline]
944    fn filter_int_lt(
945        &self,
946        column: &ColumnBatch,
947        threshold: i64,
948        selection: &SelectionVector,
949    ) -> SelectionVector {
950        selection.filter(|idx| match column.get(idx as usize) {
951            Some(CoreSochValue::Int(v)) => *v < threshold,
952            Some(CoreSochValue::UInt(v)) => (*v as i64) < threshold,
953            _ => false,
954        })
955    }
956
957    /// Filter: column == value
958    #[inline]
959    fn filter_int_eq(
960        &self,
961        column: &ColumnBatch,
962        value: i64,
963        selection: &SelectionVector,
964    ) -> SelectionVector {
965        selection.filter(|idx| match column.get(idx as usize) {
966            Some(CoreSochValue::Int(v)) => *v == value,
967            Some(CoreSochValue::UInt(v)) => (*v as i64) == value,
968            _ => false,
969        })
970    }
971
972    /// Filter: column >= threshold
973    #[inline]
974    fn filter_int_ge(
975        &self,
976        column: &ColumnBatch,
977        threshold: i64,
978        selection: &SelectionVector,
979    ) -> SelectionVector {
980        selection.filter(|idx| match column.get(idx as usize) {
981            Some(CoreSochValue::Int(v)) => *v >= threshold,
982            Some(CoreSochValue::UInt(v)) => (*v as i64) >= threshold,
983            _ => false,
984        })
985    }
986
987    /// Filter: column <= threshold
988    #[inline]
989    fn filter_int_le(
990        &self,
991        column: &ColumnBatch,
992        threshold: i64,
993        selection: &SelectionVector,
994    ) -> SelectionVector {
995        selection.filter(|idx| match column.get(idx as usize) {
996            Some(CoreSochValue::Int(v)) => *v <= threshold,
997            Some(CoreSochValue::UInt(v)) => (*v as i64) <= threshold,
998            _ => false,
999        })
1000    }
1001
1002    /// Filter: column == value (string)
1003    #[inline]
1004    fn filter_str_eq(
1005        &self,
1006        column: &ColumnBatch,
1007        value: &str,
1008        selection: &SelectionVector,
1009    ) -> SelectionVector {
1010        selection.filter(|idx| match column.get(idx as usize) {
1011            Some(CoreSochValue::Text(s)) => s == value,
1012            _ => false,
1013        })
1014    }
1015
1016    /// Filter: column starts with prefix
1017    #[inline]
1018    fn filter_str_prefix(
1019        &self,
1020        column: &ColumnBatch,
1021        prefix: &str,
1022        selection: &SelectionVector,
1023    ) -> SelectionVector {
1024        selection.filter(|idx| match column.get(idx as usize) {
1025            Some(CoreSochValue::Text(s)) => s.starts_with(prefix),
1026            _ => false,
1027        })
1028    }
1029
1030    /// Filter: column == value (bool)
1031    #[inline]
1032    fn filter_bool_eq(
1033        &self,
1034        column: &ColumnBatch,
1035        value: bool,
1036        selection: &SelectionVector,
1037    ) -> SelectionVector {
1038        selection.filter(|idx| match column.get(idx as usize) {
1039            Some(CoreSochValue::Bool(b)) => *b == value,
1040            _ => false,
1041        })
1042    }
1043
1044    /// Filter: column IS NULL
1045    #[inline]
1046    fn filter_is_null(&self, column: &ColumnBatch, selection: &SelectionVector) -> SelectionVector {
1047        selection.filter(|idx| matches!(column.get(idx as usize), Some(CoreSochValue::Null)))
1048    }
1049
1050    /// Filter: column IS NOT NULL
1051    #[inline]
1052    fn filter_is_not_null(
1053        &self,
1054        column: &ColumnBatch,
1055        selection: &SelectionVector,
1056    ) -> SelectionVector {
1057        selection
1058            .filter(|idx| !matches!(column.get(idx as usize), Some(CoreSochValue::Null) | None))
1059    }
1060
1061    /// Materialize selected rows from columnar data
1062    pub fn materialize(
1063        &self,
1064        columns: &[ColumnBatch],
1065        selection: &SelectionVector,
1066    ) -> Vec<SochRow> {
1067        selection
1068            .iter()
1069            .map(|idx| {
1070                let values: Vec<CoreSochValue> = columns
1071                    .iter()
1072                    .map(|col| {
1073                        col.get(idx as usize)
1074                            .cloned()
1075                            .unwrap_or(CoreSochValue::Null)
1076                    })
1077                    .collect();
1078                SochRow::new(values)
1079            })
1080            .collect()
1081    }
1082
1083    /// Convert row-oriented data to columnar batches
1084    pub fn row_to_columnar(&self, rows: &[SochRow], column_names: &[String]) -> Vec<ColumnBatch> {
1085        if rows.is_empty() || column_names.is_empty() {
1086            return vec![];
1087        }
1088
1089        let num_cols = column_names.len().min(rows[0].values.len());
1090
1091        (0..num_cols)
1092            .map(|col_idx| {
1093                let values: Vec<CoreSochValue> = rows
1094                    .iter()
1095                    .map(|row| {
1096                        row.values
1097                            .get(col_idx)
1098                            .cloned()
1099                            .unwrap_or(CoreSochValue::Null)
1100                    })
1101                    .collect();
1102                ColumnBatch::new(column_names[col_idx].clone(), values)
1103            })
1104            .collect()
1105    }
1106}
1107
1108impl Default for VectorizedExecutor {
1109    fn default() -> Self {
1110        Self::new(Self::default_batch_size())
1111    }
1112}
1113
1114/// Statistics for vectorized execution
1115#[derive(Debug, Clone, Default)]
1116pub struct VectorizedStats {
1117    /// Rows processed
1118    pub rows_processed: usize,
1119    /// Rows selected
1120    pub rows_selected: usize,
1121    /// Predicates evaluated
1122    pub predicates_evaluated: usize,
1123    /// Short-circuits triggered
1124    pub short_circuits: usize,
1125    /// Processing time (microseconds)
1126    pub time_us: u64,
1127}
1128
1129impl VectorizedStats {
1130    /// Selectivity ratio
1131    pub fn selectivity(&self) -> f64 {
1132        if self.rows_processed == 0 {
1133            0.0
1134        } else {
1135            self.rows_selected as f64 / self.rows_processed as f64
1136        }
1137    }
1138
1139    /// Rows per second
1140    pub fn rows_per_sec(&self) -> f64 {
1141        if self.time_us == 0 {
1142            0.0
1143        } else {
1144            self.rows_processed as f64 / (self.time_us as f64 / 1_000_000.0)
1145        }
1146    }
1147}
1148
1149#[cfg(test)]
1150mod tests {
1151    use super::*;
1152
1153    fn test_catalog() -> Catalog {
1154        let mut catalog = Catalog::new("test_db");
1155
1156        let schema = SochSchema::new("users")
1157            .field("id", SochType::UInt)
1158            .field("name", SochType::Text)
1159            .field("score", SochType::Float);
1160
1161        catalog.create_table(schema, 1).unwrap();
1162        catalog
1163    }
1164
1165    #[test]
1166    fn test_validate_select() {
1167        let catalog = test_catalog();
1168        let executor = SochQlExecutor::new();
1169
1170        let query = SochQuery::Select(SelectQuery {
1171            columns: vec!["id".to_string(), "name".to_string()],
1172            table: "users".to_string(),
1173            where_clause: None,
1174            order_by: None,
1175            limit: None,
1176            offset: None,
1177        });
1178
1179        assert!(executor.validate(&query, &catalog).is_ok());
1180    }
1181
1182    #[test]
1183    fn test_validate_nonexistent_table() {
1184        let catalog = test_catalog();
1185        let executor = SochQlExecutor::new();
1186
1187        let query = SochQuery::Select(SelectQuery {
1188            columns: vec!["*".to_string()],
1189            table: "nonexistent".to_string(),
1190            where_clause: None,
1191            order_by: None,
1192            limit: None,
1193            offset: None,
1194        });
1195
1196        assert!(executor.validate(&query, &catalog).is_err());
1197    }
1198
1199    #[test]
1200    fn test_plan_select() {
1201        let catalog = test_catalog();
1202        let executor = SochQlExecutor::new();
1203
1204        let select = SelectQuery {
1205            columns: vec!["id".to_string(), "name".to_string()],
1206            table: "users".to_string(),
1207            where_clause: Some(WhereClause {
1208                conditions: vec![Condition {
1209                    column: "score".to_string(),
1210                    operator: ComparisonOp::Gt,
1211                    value: SochValue::Float(80.0),
1212                }],
1213                operator: LogicalOp::And,
1214            }),
1215            order_by: Some(OrderBy {
1216                column: "score".to_string(),
1217                direction: SortDirection::Desc,
1218            }),
1219            limit: Some(10),
1220            offset: None,
1221        };
1222
1223        let plan = executor.plan_select(&select, &catalog).unwrap();
1224
1225        // Should be: Limit(Sort(Project(Filter(TableScan))))
1226        match plan {
1227            QueryPlan::Limit { input, count, .. } => {
1228                assert_eq!(count, 10);
1229                match *input {
1230                    QueryPlan::Sort { input, order_by } => {
1231                        assert_eq!(order_by[0].0, "score");
1232                        assert!(!order_by[0].1); // Descending = false
1233                        match *input {
1234                            QueryPlan::Project { input, columns } => {
1235                                assert_eq!(columns, vec!["id", "name"]);
1236                                match *input {
1237                                    QueryPlan::Filter { predicate, .. } => {
1238                                        assert_eq!(predicate.conditions.len(), 1);
1239                                    }
1240                                    _ => panic!("Expected Filter"),
1241                                }
1242                            }
1243                            _ => panic!("Expected Project"),
1244                        }
1245                    }
1246                    _ => panic!("Expected Sort"),
1247                }
1248            }
1249            _ => panic!("Expected Limit"),
1250        }
1251    }
1252
1253    #[test]
1254    fn test_predicate_evaluation() {
1255        let cond = PredicateCondition {
1256            column: "score".to_string(),
1257            operator: ComparisonOp::Gt,
1258            value: CoreSochValue::Float(80.0),
1259        };
1260
1261        let row_pass = SochRow::new(vec![
1262            CoreSochValue::UInt(1),
1263            CoreSochValue::Text("Alice".to_string()),
1264            CoreSochValue::Float(95.0),
1265        ]);
1266
1267        let row_fail = SochRow::new(vec![
1268            CoreSochValue::UInt(2),
1269            CoreSochValue::Text("Bob".to_string()),
1270            CoreSochValue::Float(75.0),
1271        ]);
1272
1273        assert!(cond.evaluate(&row_pass, 2));
1274        assert!(!cond.evaluate(&row_fail, 2));
1275    }
1276
1277    #[test]
1278    fn test_token_reduction() {
1279        // Use more rows and longer field names to show real reduction
1280        let result = SochResult {
1281            table: "user_statistics".to_string(),
1282            columns: vec![
1283                "user_id".to_string(),
1284                "full_name".to_string(),
1285                "email_address".to_string(),
1286                "registration_date".to_string(),
1287                "last_login".to_string(),
1288            ],
1289            rows: (0..20)
1290                .map(|i| {
1291                    vec![
1292                        SochValue::UInt(i as u64),
1293                        SochValue::Text(format!("User Number {}", i)),
1294                        SochValue::Text(format!("user{}@example.com", i)),
1295                        SochValue::Text("2024-01-15".to_string()),
1296                        SochValue::Text("2024-03-20".to_string()),
1297                    ]
1298                })
1299                .collect(),
1300        };
1301
1302        let stats = estimate_token_reduction(&result);
1303
1304        println!("JSON tokens: {}", stats.json_tokens);
1305        println!("TOON tokens: {}", stats.soch_tokens);
1306        println!("Reduction: {}%", stats.reduction_percent);
1307
1308        // With many rows and repeated column names, TOON should be more efficient
1309        assert!(stats.soch_tokens < stats.json_tokens);
1310        assert!(stats.reduction_percent > 0); // Any reduction is valuable
1311    }
1312
1313    // ========================================================================
1314    // Task 12: Vectorized Predicate Evaluation Tests
1315    // ========================================================================
1316
1317    #[test]
1318    fn test_selection_vector_basic() {
1319        let sel = SelectionVector::all(100);
1320        assert_eq!(sel.len(), 100);
1321        assert!(!sel.is_empty());
1322        assert_eq!(sel.selectivity(), 1.0);
1323
1324        let empty = SelectionVector::empty();
1325        assert!(empty.is_empty());
1326        assert_eq!(empty.selectivity(), 0.0);
1327    }
1328
1329    #[test]
1330    fn test_selection_vector_filter() {
1331        let sel = SelectionVector::all(10);
1332
1333        // Keep only even indices
1334        let filtered = sel.filter(|i| i % 2 == 0);
1335        assert_eq!(filtered.len(), 5);
1336
1337        let indices: Vec<u32> = filtered.iter().collect();
1338        assert_eq!(indices, vec![0, 2, 4, 6, 8]);
1339    }
1340
1341    #[test]
1342    fn test_vectorized_int_filter() {
1343        let executor = VectorizedExecutor::new(1024);
1344
1345        // Create a column with values 0-9
1346        let column = ColumnBatch::new(
1347            "value".to_string(),
1348            (0..10).map(CoreSochValue::Int).collect(),
1349        );
1350
1351        let predicates = vec![VectorPredicate::IntGt {
1352            col_idx: 0,
1353            threshold: 5,
1354        }];
1355
1356        let selection = executor.evaluate_batch(&[column], &predicates);
1357
1358        // Should select 6, 7, 8, 9 (4 values > 5)
1359        assert_eq!(selection.len(), 4);
1360        let indices: Vec<u32> = selection.iter().collect();
1361        assert_eq!(indices, vec![6, 7, 8, 9]);
1362    }
1363
1364    #[test]
1365    fn test_vectorized_multiple_predicates() {
1366        let executor = VectorizedExecutor::new(1024);
1367
1368        // Create columns
1369        let id_col = ColumnBatch::new("id".to_string(), (0..100).map(CoreSochValue::Int).collect());
1370
1371        let status_col = ColumnBatch::new(
1372            "active".to_string(),
1373            (0..100).map(|i| CoreSochValue::Bool(i % 2 == 0)).collect(),
1374        );
1375
1376        let predicates = vec![
1377            VectorPredicate::IntGe {
1378                col_idx: 0,
1379                threshold: 50,
1380            },
1381            VectorPredicate::IntLt {
1382                col_idx: 0,
1383                threshold: 60,
1384            },
1385            VectorPredicate::BoolEq {
1386                col_idx: 1,
1387                value: true,
1388            },
1389        ];
1390
1391        let selection = executor.evaluate_batch(&[id_col, status_col], &predicates);
1392
1393        // Should select: 50, 52, 54, 56, 58 (even numbers in [50, 60))
1394        assert_eq!(selection.len(), 5);
1395        let indices: Vec<u32> = selection.iter().collect();
1396        assert_eq!(indices, vec![50, 52, 54, 56, 58]);
1397    }
1398
1399    #[test]
1400    fn test_vectorized_short_circuit() {
1401        let executor = VectorizedExecutor::new(1024);
1402
1403        // Create a column with all values < 0
1404        let column = ColumnBatch::new(
1405            "value".to_string(),
1406            (0..100).map(|_| CoreSochValue::Int(-1)).collect(),
1407        );
1408
1409        // First predicate eliminates everything
1410        let predicates = vec![
1411            VectorPredicate::IntGt {
1412                col_idx: 0,
1413                threshold: 0,
1414            },
1415            // These should not even be evaluated due to short-circuit
1416            VectorPredicate::IntLt {
1417                col_idx: 0,
1418                threshold: 100,
1419            },
1420            VectorPredicate::IntEq {
1421                col_idx: 0,
1422                value: 50,
1423            },
1424        ];
1425
1426        let selection = executor.evaluate_batch(&[column], &predicates);
1427        assert!(selection.is_empty());
1428    }
1429
1430    #[test]
1431    fn test_vectorized_string_predicates() {
1432        let executor = VectorizedExecutor::new(1024);
1433
1434        let names = [
1435            "Alice", "Bob", "Carol", "Dave", "Alice", "Alice", "Bob", "Carol",
1436        ];
1437        let column = ColumnBatch::new(
1438            "name".to_string(),
1439            names
1440                .iter()
1441                .map(|s| CoreSochValue::Text(s.to_string()))
1442                .collect(),
1443        );
1444
1445        let predicates = vec![VectorPredicate::StrEq {
1446            col_idx: 0,
1447            value: "Alice".to_string(),
1448        }];
1449
1450        let selection = executor.evaluate_batch(&[column], &predicates);
1451
1452        // Should select indices 0, 4, 5 (where name == "Alice")
1453        assert_eq!(selection.len(), 3);
1454        let indices: Vec<u32> = selection.iter().collect();
1455        assert_eq!(indices, vec![0, 4, 5]);
1456    }
1457
1458    #[test]
1459    fn test_vectorized_null_handling() {
1460        let executor = VectorizedExecutor::new(1024);
1461
1462        let values = vec![
1463            CoreSochValue::Int(1),
1464            CoreSochValue::Null,
1465            CoreSochValue::Int(2),
1466            CoreSochValue::Null,
1467            CoreSochValue::Int(3),
1468        ];
1469        let column = ColumnBatch::new("value".to_string(), values);
1470
1471        let predicates = vec![VectorPredicate::IsNull { col_idx: 0 }];
1472        let null_selection = executor.evaluate_batch(std::slice::from_ref(&column), &predicates);
1473        assert_eq!(null_selection.len(), 2); // indices 1, 3
1474
1475        let not_null_predicates = vec![VectorPredicate::IsNotNull { col_idx: 0 }];
1476        let not_null_selection = executor.evaluate_batch(&[column], &not_null_predicates);
1477        assert_eq!(not_null_selection.len(), 3); // indices 0, 2, 4
1478    }
1479
1480    #[test]
1481    fn test_row_to_columnar_conversion() {
1482        let executor = VectorizedExecutor::new(1024);
1483
1484        let rows = vec![
1485            SochRow::new(vec![
1486                CoreSochValue::Int(1),
1487                CoreSochValue::Text("Alice".to_string()),
1488            ]),
1489            SochRow::new(vec![
1490                CoreSochValue::Int(2),
1491                CoreSochValue::Text("Bob".to_string()),
1492            ]),
1493            SochRow::new(vec![
1494                CoreSochValue::Int(3),
1495                CoreSochValue::Text("Carol".to_string()),
1496            ]),
1497        ];
1498
1499        let column_names = vec!["id".to_string(), "name".to_string()];
1500        let columns = executor.row_to_columnar(&rows, &column_names);
1501
1502        assert_eq!(columns.len(), 2);
1503        assert_eq!(columns[0].name, "id");
1504        assert_eq!(columns[1].name, "name");
1505        assert_eq!(columns[0].len(), 3);
1506        assert_eq!(columns[1].len(), 3);
1507    }
1508
1509    #[test]
1510    fn test_materialize_selected_rows() {
1511        let executor = VectorizedExecutor::new(1024);
1512
1513        let id_col = ColumnBatch::new(
1514            "id".to_string(),
1515            vec![
1516                CoreSochValue::Int(1),
1517                CoreSochValue::Int(2),
1518                CoreSochValue::Int(3),
1519            ],
1520        );
1521        let name_col = ColumnBatch::new(
1522            "name".to_string(),
1523            vec![
1524                CoreSochValue::Text("Alice".to_string()),
1525                CoreSochValue::Text("Bob".to_string()),
1526                CoreSochValue::Text("Carol".to_string()),
1527            ],
1528        );
1529
1530        // Select rows 0 and 2
1531        let selection = SelectionVector::from_indices(vec![0, 2], 3);
1532
1533        let rows = executor.materialize(&[id_col, name_col], &selection);
1534
1535        assert_eq!(rows.len(), 2);
1536        assert_eq!(rows[0].values[0], CoreSochValue::Int(1));
1537        assert_eq!(rows[0].values[1], CoreSochValue::Text("Alice".to_string()));
1538        assert_eq!(rows[1].values[0], CoreSochValue::Int(3));
1539        assert_eq!(rows[1].values[1], CoreSochValue::Text("Carol".to_string()));
1540    }
1541}