Skip to main content

sochdb_query/
soch_ql_executor.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SOCH-QL Query Executor (Task 6)
19//!
20//! End-to-end SOCH-QL query execution pipeline:
21//! 1. parse(sql) → SochQuery
22//! 2. validate(query, catalog) → Result<()>
23//! 3. plan(query, stats) → QueryPlan
24//! 4. execute(plan, storage) → SochTable
25//!
26//! ## Token Reduction Model
27//!
28//! ```text
29//! tokens_JSON(table) ≈ 4 + Σ(|field_name| + |value| + 4) per row
30//! tokens_TOON(table) ≈ header + Σ(|value| + 1) per row
31//! reduction = 1 - tokens_TOON/tokens_JSON ≈ 0.4 to 0.6
32//!
33//! For 100 rows × 5 fields:
34//! JSON: ~100 × (5 × 15) = 7,500 tokens
35//! TOON: ~50 + 100 × 25 = 2,550 tokens → 66% reduction
36//! ```
37
38use crate::soch_ql::{
39    ComparisonOp, LogicalOp, SelectQuery, SortDirection, SochQlParser, SochQuery, SochResult,
40    SochValue, WhereClause,
41};
42#[cfg(test)]
43use crate::soch_ql::{Condition, OrderBy};
44use std::collections::HashMap;
45use sochdb_core::{Catalog, Result, SochDBError, SochRow, SochValue as CoreSochValue};
46#[cfg(test)]
47use sochdb_core::{SochSchema, SochType};
48
49/// Query plan operators
50#[derive(Debug, Clone)]
51pub enum QueryPlan {
52    /// Full table scan
53    TableScan {
54        table: String,
55        columns: Vec<String>,
56        predicate: Option<Box<QueryPlan>>,
57    },
58    /// Index seek (primary or secondary)
59    IndexSeek { index: String, key_range: KeyRange },
60    /// Filter rows
61    Filter {
62        input: Box<QueryPlan>,
63        predicate: Predicate,
64    },
65    /// Project columns
66    Project {
67        input: Box<QueryPlan>,
68        columns: Vec<String>,
69    },
70    /// Sort results
71    Sort {
72        input: Box<QueryPlan>,
73        order_by: Vec<(String, bool)>, // (column, ascending)
74    },
75    /// Limit results
76    Limit {
77        input: Box<QueryPlan>,
78        count: usize,
79        offset: usize,
80    },
81    /// Empty result
82    Empty,
83}
84
85/// Key range for index seeks
86#[derive(Debug, Clone)]
87pub struct KeyRange {
88    pub start: Option<SochValue>,
89    pub end: Option<SochValue>,
90    pub inclusive_start: bool,
91    pub inclusive_end: bool,
92}
93
94impl KeyRange {
95    pub fn all() -> Self {
96        Self {
97            start: None,
98            end: None,
99            inclusive_start: true,
100            inclusive_end: true,
101        }
102    }
103
104    pub fn eq(value: SochValue) -> Self {
105        Self {
106            start: Some(value.clone()),
107            end: Some(value),
108            inclusive_start: true,
109            inclusive_end: true,
110        }
111    }
112}
113
114/// Query predicate
115#[derive(Debug, Clone)]
116pub struct Predicate {
117    pub conditions: Vec<PredicateCondition>,
118    pub operator: LogicalOp,
119}
120
121/// Single predicate condition (uses CoreSochValue for row compatibility)
122#[derive(Debug, Clone)]
123pub struct PredicateCondition {
124    pub column: String,
125    pub operator: ComparisonOp,
126    pub value: CoreSochValue,
127}
128
129impl PredicateCondition {
130    /// Create from soch_ql SochValue
131    pub fn from_soch_ql(column: String, operator: ComparisonOp, value: &SochValue) -> Self {
132        Self {
133            column,
134            operator,
135            value: Self::convert_value(value),
136        }
137    }
138
139    /// Convert soch_ql::SochValue to CoreSochValue
140    fn convert_value(v: &SochValue) -> CoreSochValue {
141        match v {
142            SochValue::Int(i) => CoreSochValue::Int(*i),
143            SochValue::UInt(u) => CoreSochValue::UInt(*u),
144            SochValue::Float(f) => CoreSochValue::Float(*f),
145            SochValue::Text(s) => CoreSochValue::Text(s.clone()),
146            SochValue::Bool(b) => CoreSochValue::Bool(*b),
147            SochValue::Null => CoreSochValue::Null,
148            SochValue::Binary(b) => CoreSochValue::Binary(b.clone()),
149            SochValue::Array(arr) => {
150                CoreSochValue::Array(arr.iter().map(Self::convert_value).collect())
151            }
152        }
153    }
154
155    /// Evaluate predicate against a row
156    pub fn evaluate(&self, row: &SochRow, column_idx: usize) -> bool {
157        if column_idx >= row.values.len() {
158            return false;
159        }
160
161        let row_value = &row.values[column_idx];
162
163        match self.operator {
164            ComparisonOp::Eq => row_value == &self.value,
165            ComparisonOp::Ne => row_value != &self.value,
166            ComparisonOp::Lt => {
167                Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Less)
168            }
169            ComparisonOp::Le => matches!(
170                Self::compare(row_value, &self.value),
171                Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
172            ),
173            ComparisonOp::Gt => {
174                Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Greater)
175            }
176            ComparisonOp::Ge => matches!(
177                Self::compare(row_value, &self.value),
178                Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
179            ),
180            ComparisonOp::Like => Self::like_match(row_value, &self.value),
181            ComparisonOp::In => Self::in_match(row_value, &self.value),
182            ComparisonOp::SimilarTo => {
183                // SimilarTo is used for vector similarity search
184                // Evaluated by the vector index, not row-by-row comparison
185                // For row-level evaluation, we fall back to Like-style matching
186                Self::like_match(row_value, &self.value)
187            }
188        }
189    }
190
191    fn compare(a: &CoreSochValue, b: &CoreSochValue) -> Option<std::cmp::Ordering> {
192        match (a, b) {
193            (CoreSochValue::Int(a), CoreSochValue::Int(b)) => Some(a.cmp(b)),
194            (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => Some(a.cmp(b)),
195            (CoreSochValue::Float(a), CoreSochValue::Float(b)) => a.partial_cmp(b),
196            (CoreSochValue::Text(a), CoreSochValue::Text(b)) => Some(a.cmp(b)),
197            _ => None,
198        }
199    }
200
201    fn like_match(value: &CoreSochValue, pattern: &CoreSochValue) -> bool {
202        match (value, pattern) {
203            (CoreSochValue::Text(v), CoreSochValue::Text(p)) => {
204                // Simple LIKE: % matches any, _ matches one char
205                let regex_pattern = p.replace('%', ".*").replace('_', ".");
206                regex::Regex::new(&format!("^{}$", regex_pattern))
207                    .map(|re| re.is_match(v))
208                    .unwrap_or(false)
209            }
210            _ => false,
211        }
212    }
213
214    fn in_match(value: &CoreSochValue, list: &CoreSochValue) -> bool {
215        match list {
216            CoreSochValue::Array(values) => values.iter().any(|v| value == v),
217            _ => value == list, // Single value comparison fallback
218        }
219    }
220}
221
222impl Predicate {
223    /// Evaluate predicate against a row
224    pub fn evaluate(&self, row: &SochRow, column_map: &HashMap<String, usize>) -> bool {
225        let results: Vec<bool> = self
226            .conditions
227            .iter()
228            .map(|cond| {
229                column_map
230                    .get(&cond.column)
231                    .map(|&idx| cond.evaluate(row, idx))
232                    .unwrap_or(false)
233            })
234            .collect();
235
236        match self.operator {
237            LogicalOp::And => results.iter().all(|&r| r),
238            LogicalOp::Or => results.iter().any(|&r| r),
239        }
240    }
241}
242
243/// SOCH-QL Query Executor
244///
245/// When constructed with a `StorageBackend`, queries read real data from storage.
246/// Without a backend (`new()`), returns empty result sets (for testing/planning only).
247pub struct SochQlExecutor {
248    storage: Option<std::sync::Arc<dyn crate::optimizer_integration::StorageBackend>>,
249}
250
251impl SochQlExecutor {
252    /// Create a new executor without storage (returns empty results)
253    pub fn new() -> Self {
254        Self { storage: None }
255    }
256
257    /// Create an executor wired to a real storage backend
258    pub fn with_storage(
259        storage: std::sync::Arc<dyn crate::optimizer_integration::StorageBackend>,
260    ) -> Self {
261        Self {
262            storage: Some(storage),
263        }
264    }
265
266    /// Execute a SOCH-QL query string
267    pub fn execute(&self, query: &str, catalog: &Catalog) -> Result<SochResult> {
268        // Parse
269        let parsed = SochQlParser::parse(query)
270            .map_err(|e| SochDBError::InvalidArgument(format!("Parse error: {:?}", e)))?;
271
272        // Validate
273        self.validate(&parsed, catalog)?;
274
275        // Plan
276        let plan = self.plan(&parsed, catalog)?;
277
278        // Execute
279        self.execute_plan(&plan, catalog)
280    }
281
282    /// Validate a parsed query against the catalog
283    pub fn validate(&self, query: &SochQuery, catalog: &Catalog) -> Result<()> {
284        match query {
285            SochQuery::Select(select) => {
286                // Check table exists
287                if catalog.get_table(&select.table).is_none() {
288                    return Err(SochDBError::NotFound(format!(
289                        "Table '{}' not found",
290                        select.table
291                    )));
292                }
293
294                // Check columns exist (if not *)
295                if let Some(entry) = catalog.get_table(&select.table)
296                    && let Some(schema) = &entry.schema
297                {
298                    for col in &select.columns {
299                        if col != "*" && !schema.fields.iter().any(|f| &f.name == col) {
300                            return Err(SochDBError::InvalidArgument(format!(
301                                "Column '{}' not found in table '{}'",
302                                col, select.table
303                            )));
304                        }
305                    }
306                }
307
308                Ok(())
309            }
310            SochQuery::Insert(insert) => {
311                // Check table exists
312                if catalog.get_table(&insert.table).is_none() {
313                    return Err(SochDBError::NotFound(format!(
314                        "Table '{}' not found",
315                        insert.table
316                    )));
317                }
318                Ok(())
319            }
320            SochQuery::CreateTable(create) => {
321                // Check table doesn't exist
322                if catalog.get_table(&create.table).is_some() {
323                    return Err(SochDBError::InvalidArgument(format!(
324                        "Table '{}' already exists",
325                        create.table
326                    )));
327                }
328                Ok(())
329            }
330            SochQuery::DropTable { table } => {
331                if catalog.get_table(table).is_none() {
332                    return Err(SochDBError::NotFound(format!(
333                        "Table '{}' not found",
334                        table
335                    )));
336                }
337                Ok(())
338            }
339        }
340    }
341
342    /// Generate a query plan
343    pub fn plan(&self, query: &SochQuery, catalog: &Catalog) -> Result<QueryPlan> {
344        match query {
345            SochQuery::Select(select) => self.plan_select(select, catalog),
346            _ => Err(SochDBError::InvalidArgument(
347                "Only SELECT queries can be planned".to_string(),
348            )),
349        }
350    }
351
352    fn plan_select(&self, select: &SelectQuery, _catalog: &Catalog) -> Result<QueryPlan> {
353        // Start with table scan
354        let mut plan = QueryPlan::TableScan {
355            table: select.table.clone(),
356            columns: select.columns.clone(),
357            predicate: None,
358        };
359
360        // Add filter if WHERE clause present
361        if let Some(where_clause) = &select.where_clause {
362            let predicate = self.build_predicate(where_clause);
363            plan = QueryPlan::Filter {
364                input: Box::new(plan),
365                predicate,
366            };
367        }
368
369        // Add projection (column selection)
370        if !select.columns.contains(&"*".to_string()) {
371            plan = QueryPlan::Project {
372                input: Box::new(plan),
373                columns: select.columns.clone(),
374            };
375        }
376
377        // Add sort if ORDER BY present
378        if let Some(order_by) = &select.order_by {
379            plan = QueryPlan::Sort {
380                input: Box::new(plan),
381                order_by: vec![(
382                    order_by.column.clone(),
383                    matches!(order_by.direction, SortDirection::Asc),
384                )],
385            };
386        }
387
388        // Add limit if present
389        if select.limit.is_some() || select.offset.is_some() {
390            plan = QueryPlan::Limit {
391                input: Box::new(plan),
392                count: select.limit.unwrap_or(usize::MAX),
393                offset: select.offset.unwrap_or(0),
394            };
395        }
396
397        Ok(plan)
398    }
399
400    fn build_predicate(&self, where_clause: &WhereClause) -> Predicate {
401        Predicate {
402            conditions: where_clause
403                .conditions
404                .iter()
405                .map(|c| PredicateCondition::from_soch_ql(c.column.clone(), c.operator, &c.value))
406                .collect(),
407            operator: where_clause.operator,
408        }
409    }
410
411    /// Execute a query plan
412    ///
413    /// When a `StorageBackend` is present, this reads real data from storage.
414    /// Without one, returns empty result sets (legacy behavior).
415    #[allow(clippy::only_used_in_recursion)]
416    pub fn execute_plan(&self, plan: &QueryPlan, catalog: &Catalog) -> Result<SochResult> {
417        match plan {
418            QueryPlan::Empty => Ok(SochResult {
419                table: "result".to_string(),
420                columns: vec![],
421                rows: vec![],
422            }),
423            QueryPlan::TableScan { table, columns, .. } => {
424                // Get schema from catalog
425                let schema_columns = if let Some(entry) = catalog.get_table(table) {
426                    if let Some(schema) = &entry.schema {
427                        if columns.contains(&"*".to_string()) {
428                            schema.fields.iter().map(|f| f.name.clone()).collect()
429                        } else {
430                            columns.clone()
431                        }
432                    } else {
433                        columns.clone()
434                    }
435                } else {
436                    columns.clone()
437                };
438
439                // ---- Phase 0: Read from storage if backend is wired ----
440                let rows = if let Some(storage) = &self.storage {
441                    let raw_rows = storage.table_scan(table, &schema_columns, None)?;
442                    raw_rows
443                        .into_iter()
444                        .map(|row| {
445                            schema_columns
446                                .iter()
447                                .map(|c| row.get(c).cloned().unwrap_or(SochValue::Null))
448                                .collect()
449                        })
450                        .collect()
451                } else {
452                    vec![] // No storage backend — empty results (legacy)
453                };
454
455                Ok(SochResult {
456                    table: table.clone(),
457                    columns: schema_columns,
458                    rows,
459                })
460            }
461            QueryPlan::Filter { input, predicate } => {
462                let mut result = self.execute_plan(input, catalog)?;
463                // Build column index map
464                let col_map: HashMap<String, usize> = result
465                    .columns
466                    .iter()
467                    .enumerate()
468                    .map(|(i, c)| (c.clone(), i))
469                    .collect();
470                // Filter rows using predicate
471                result.rows.retain(|row| {
472                    let matches: Vec<bool> = predicate
473                        .conditions
474                        .iter()
475                        .map(|cond| {
476                            if let Some(&idx) = col_map.get(&cond.column) {
477                                if idx < row.len() {
478                                    Self::eval_predicate_condition(cond, &row[idx])
479                                } else {
480                                    false
481                                }
482                            } else {
483                                false
484                            }
485                        })
486                        .collect();
487                    match predicate.operator {
488                        LogicalOp::And => matches.iter().all(|&m| m),
489                        LogicalOp::Or => matches.iter().any(|&m| m),
490                    }
491                });
492                Ok(result)
493            }
494            QueryPlan::Project { input, columns } => {
495                let mut result = self.execute_plan(input, catalog)?;
496                // Build index map from current columns
497                let col_map: HashMap<String, usize> = result
498                    .columns
499                    .iter()
500                    .enumerate()
501                    .map(|(i, c)| (c.clone(), i))
502                    .collect();
503                // Re-project rows
504                result.rows = result
505                    .rows
506                    .into_iter()
507                    .map(|row| {
508                        columns
509                            .iter()
510                            .map(|c| {
511                                col_map
512                                    .get(c)
513                                    .and_then(|&i| row.get(i).cloned())
514                                    .unwrap_or(SochValue::Null)
515                            })
516                            .collect()
517                    })
518                    .collect();
519                result.columns = columns.clone();
520                Ok(result)
521            }
522            QueryPlan::Sort { input, order_by } => {
523                let mut result = self.execute_plan(input, catalog)?;
524                let col_map: HashMap<String, usize> = result
525                    .columns
526                    .iter()
527                    .enumerate()
528                    .map(|(i, c)| (c.clone(), i))
529                    .collect();
530                result.rows.sort_by(|a, b| {
531                    for (col, ascending) in order_by {
532                        if let Some(&idx) = col_map.get(col) {
533                            let va = a.get(idx);
534                            let vb = b.get(idx);
535                            let cmp = Self::compare_soch_values(va, vb);
536                            let cmp = if *ascending { cmp } else { cmp.reverse() };
537                            if cmp != std::cmp::Ordering::Equal {
538                                return cmp;
539                            }
540                        }
541                    }
542                    std::cmp::Ordering::Equal
543                });
544                Ok(result)
545            }
546            QueryPlan::Limit {
547                input,
548                count,
549                offset,
550            } => {
551                let mut result = self.execute_plan(input, catalog)?;
552                result.rows = result.rows.into_iter().skip(*offset).take(*count).collect();
553                Ok(result)
554            }
555            QueryPlan::IndexSeek { .. } => {
556                // Index seek — fallback to empty (TODO: wire to storage index)
557                Ok(SochResult {
558                    table: "result".to_string(),
559                    columns: vec![],
560                    rows: vec![],
561                })
562            }
563        }
564    }
565
566    /// Compare two optional SochValues for sorting
567    fn compare_soch_values(a: Option<&SochValue>, b: Option<&SochValue>) -> std::cmp::Ordering {
568        match (a, b) {
569            (None, None) => std::cmp::Ordering::Equal,
570            (None, Some(_)) => std::cmp::Ordering::Less,
571            (Some(_), None) => std::cmp::Ordering::Greater,
572            (Some(a), Some(b)) => match (a, b) {
573                (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
574                (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
575                (SochValue::Float(a), SochValue::Float(b)) => {
576                    a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
577                }
578                (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
579                (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
580                _ => std::cmp::Ordering::Equal,
581            },
582        }
583    }
584
585    /// Evaluate a single predicate condition against a SochValue
586    fn eval_predicate_condition(cond: &PredicateCondition, value: &SochValue) -> bool {
587        // Convert SochValue to CoreSochValue for comparison
588        let core_val = PredicateCondition::convert_value(value);
589        match cond.operator {
590            ComparisonOp::Eq => core_val == cond.value,
591            ComparisonOp::Ne => core_val != cond.value,
592            ComparisonOp::Lt => {
593                PredicateCondition::compare(&core_val, &cond.value)
594                    == Some(std::cmp::Ordering::Less)
595            }
596            ComparisonOp::Le => matches!(
597                PredicateCondition::compare(&core_val, &cond.value),
598                Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
599            ),
600            ComparisonOp::Gt => {
601                PredicateCondition::compare(&core_val, &cond.value)
602                    == Some(std::cmp::Ordering::Greater)
603            }
604            ComparisonOp::Ge => matches!(
605                PredicateCondition::compare(&core_val, &cond.value),
606                Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
607            ),
608            ComparisonOp::Like => PredicateCondition::like_match(&core_val, &cond.value),
609            ComparisonOp::In => PredicateCondition::in_match(&core_val, &cond.value),
610            ComparisonOp::SimilarTo => PredicateCondition::like_match(&core_val, &cond.value),
611        }
612    }
613}
614
615impl Default for SochQlExecutor {
616    fn default() -> Self {
617        Self { storage: None }
618    }
619}
620
621/// Execute a SOCH-QL query (convenience function)
622pub fn execute_sochql(query: &str, catalog: &Catalog) -> Result<SochResult> {
623    SochQlExecutor::new().execute(query, catalog)
624}
625
626/// Estimate token reduction for a TOON result vs JSON
627pub fn estimate_token_reduction(result: &SochResult) -> TokenReductionStats {
628    let row_count = result.rows.len();
629    let col_count = result.columns.len();
630
631    if row_count == 0 || col_count == 0 {
632        return TokenReductionStats::default();
633    }
634
635    // Estimate JSON tokens
636    // Format: [{"col1": "val1", "col2": "val2"}, ...]
637    let avg_col_name_len: usize = result.columns.iter().map(|c| c.len()).sum::<usize>() / col_count;
638    let avg_value_len = 10; // Rough estimate
639
640    // JSON: 2 (brackets) + row_count * (2 + col_count * (col_name + 4 + value))
641    let json_tokens = 2 + row_count * (2 + col_count * (avg_col_name_len + 4 + avg_value_len));
642
643    // TOON: header + row_count * (col_count * value + col_count)
644    // Header: table[count]{col1,col2,...}:
645    let header_tokens = result.table.len() + 10 + result.columns.join(",").len();
646    let soch_tokens = header_tokens + row_count * (col_count * avg_value_len + col_count);
647
648    let reduction = 1.0 - (soch_tokens as f64 / json_tokens as f64);
649
650    TokenReductionStats {
651        json_tokens,
652        soch_tokens,
653        reduction_percent: (reduction * 100.0) as u32,
654        row_count,
655        col_count,
656    }
657}
658
659/// Token reduction statistics
660#[derive(Debug, Clone, Default)]
661pub struct TokenReductionStats {
662    /// Estimated JSON tokens
663    pub json_tokens: usize,
664    /// Estimated TOON tokens
665    pub soch_tokens: usize,
666    /// Reduction percentage
667    pub reduction_percent: u32,
668    /// Row count
669    pub row_count: usize,
670    /// Column count
671    pub col_count: usize,
672}
673
674// ============================================================================
675// Task 12: Vectorized Predicate Evaluation
676// ============================================================================
677
678/// Selection vector for batch predicate evaluation
679///
680/// Maintains indices of rows that pass all predicates so far.
681/// Short-circuits at batch level when selection becomes empty.
682#[derive(Debug, Clone)]
683pub struct SelectionVector {
684    /// Selected row indices (sorted)
685    indices: Vec<u32>,
686    /// Original batch size
687    batch_size: usize,
688}
689
690impl SelectionVector {
691    /// Create selection with all rows selected
692    pub fn all(batch_size: usize) -> Self {
693        Self {
694            indices: (0..batch_size as u32).collect(),
695            batch_size,
696        }
697    }
698
699    /// Create empty selection
700    pub fn empty() -> Self {
701        Self {
702            indices: Vec::new(),
703            batch_size: 0,
704        }
705    }
706
707    /// Create from specific indices
708    pub fn from_indices(indices: Vec<u32>, batch_size: usize) -> Self {
709        Self {
710            indices,
711            batch_size,
712        }
713    }
714
715    /// Check if selection is empty (short-circuit condition)
716    #[inline]
717    pub fn is_empty(&self) -> bool {
718        self.indices.is_empty()
719    }
720
721    /// Number of selected rows
722    #[inline]
723    pub fn len(&self) -> usize {
724        self.indices.len()
725    }
726
727    /// Original batch size
728    #[inline]
729    pub fn batch_size(&self) -> usize {
730        self.batch_size
731    }
732
733    /// Selectivity ratio
734    #[inline]
735    pub fn selectivity(&self) -> f64 {
736        if self.batch_size == 0 {
737            0.0
738        } else {
739            self.len() as f64 / self.batch_size as f64
740        }
741    }
742
743    /// Iterate over selected indices
744    pub fn iter(&self) -> impl Iterator<Item = u32> + '_ {
745        self.indices.iter().copied()
746    }
747
748    /// Filter selection with a predicate, returning new selection
749    pub fn filter<F>(&self, pred: F) -> Self
750    where
751        F: Fn(u32) -> bool,
752    {
753        Self {
754            indices: self.indices.iter().copied().filter(|&i| pred(i)).collect(),
755            batch_size: self.batch_size,
756        }
757    }
758
759    /// Extend with masked indices (for SIMD results)
760    pub fn extend_masked(&mut self, start_idx: usize, mask: u16) {
761        for bit in 0..16 {
762            if (mask >> bit) & 1 == 1 {
763                self.indices.push((start_idx + bit) as u32);
764            }
765        }
766    }
767}
768
769/// Columnar batch for vectorized processing
770#[derive(Debug, Clone)]
771pub struct ColumnBatch {
772    /// Column values
773    pub values: Vec<CoreSochValue>,
774    /// Column name
775    pub name: String,
776}
777
778impl ColumnBatch {
779    /// Create from column data
780    pub fn new(name: String, values: Vec<CoreSochValue>) -> Self {
781        Self { values, name }
782    }
783
784    /// Get value at index
785    #[inline]
786    pub fn get(&self, idx: usize) -> Option<&CoreSochValue> {
787        self.values.get(idx)
788    }
789
790    /// Get raw integer data pointer (for SIMD)
791    #[allow(dead_code)]
792    pub fn as_i64_slice(&self) -> Option<Vec<i64>> {
793        self.values
794            .iter()
795            .map(|v| match v {
796                CoreSochValue::Int(i) => Some(*i),
797                CoreSochValue::UInt(u) => Some(*u as i64),
798                _ => None,
799            })
800            .collect()
801    }
802
803    /// Batch size
804    pub fn len(&self) -> usize {
805        self.values.len()
806    }
807
808    /// Is empty
809    pub fn is_empty(&self) -> bool {
810        self.values.is_empty()
811    }
812}
813
814/// Vectorized predicate for batch evaluation
815#[derive(Debug, Clone)]
816pub enum VectorPredicate {
817    /// Integer greater than
818    IntGt { col_idx: usize, threshold: i64 },
819    /// Integer less than
820    IntLt { col_idx: usize, threshold: i64 },
821    /// Integer equals
822    IntEq { col_idx: usize, value: i64 },
823    /// Integer greater or equal
824    IntGe { col_idx: usize, threshold: i64 },
825    /// Integer less or equal
826    IntLe { col_idx: usize, threshold: i64 },
827    /// String equals
828    StrEq { col_idx: usize, value: String },
829    /// String prefix match
830    StrPrefix { col_idx: usize, prefix: String },
831    /// Boolean equals
832    BoolEq { col_idx: usize, value: bool },
833    /// Is null check
834    IsNull { col_idx: usize },
835    /// Is not null check
836    IsNotNull { col_idx: usize },
837}
838
839/// Vectorized executor for batch predicate evaluation
840///
841/// ## Performance Characteristics
842///
843/// - Traditional row-at-a-time: ~100M rows/sec (branch misprediction bound)
844/// - Vectorized with selection vector: ~1B rows/sec (branch-free)
845///
846/// ## Usage
847///
848/// ```ignore
849/// let executor = VectorizedExecutor::new(1024);
850/// let columns = vec![/* column batches */];
851/// let predicates = vec![/* predicates */];
852/// let selection = executor.evaluate_batch(&columns, &predicates);
853/// ```
854pub struct VectorizedExecutor {
855    /// Batch size for processing
856    batch_size: usize,
857}
858
859impl VectorizedExecutor {
860    /// Create with specified batch size
861    pub fn new(batch_size: usize) -> Self {
862        Self { batch_size }
863    }
864
865    /// Default batch size (1024 rows)
866    pub fn default_batch_size() -> usize {
867        1024
868    }
869
870    /// Evaluate predicates on columnar batch
871    ///
872    /// Returns selection vector of rows that pass all predicates.
873    /// Short-circuits at batch level when selection becomes empty.
874    pub fn evaluate_batch(
875        &self,
876        columns: &[ColumnBatch],
877        predicates: &[VectorPredicate],
878    ) -> SelectionVector {
879        if columns.is_empty() {
880            return SelectionVector::empty();
881        }
882
883        let batch_size = columns[0].len().min(self.batch_size);
884        let mut selection = SelectionVector::all(batch_size);
885
886        // Process predicates with short-circuit
887        for predicate in predicates {
888            if selection.is_empty() {
889                break; // Batch-level short-circuit
890            }
891
892            selection = match predicate {
893                VectorPredicate::IntGt { col_idx, threshold } => {
894                    self.filter_int_gt(&columns[*col_idx], *threshold, &selection)
895                }
896                VectorPredicate::IntLt { col_idx, threshold } => {
897                    self.filter_int_lt(&columns[*col_idx], *threshold, &selection)
898                }
899                VectorPredicate::IntEq { col_idx, value } => {
900                    self.filter_int_eq(&columns[*col_idx], *value, &selection)
901                }
902                VectorPredicate::IntGe { col_idx, threshold } => {
903                    self.filter_int_ge(&columns[*col_idx], *threshold, &selection)
904                }
905                VectorPredicate::IntLe { col_idx, threshold } => {
906                    self.filter_int_le(&columns[*col_idx], *threshold, &selection)
907                }
908                VectorPredicate::StrEq { col_idx, value } => {
909                    self.filter_str_eq(&columns[*col_idx], value, &selection)
910                }
911                VectorPredicate::StrPrefix { col_idx, prefix } => {
912                    self.filter_str_prefix(&columns[*col_idx], prefix, &selection)
913                }
914                VectorPredicate::BoolEq { col_idx, value } => {
915                    self.filter_bool_eq(&columns[*col_idx], *value, &selection)
916                }
917                VectorPredicate::IsNull { col_idx } => {
918                    self.filter_is_null(&columns[*col_idx], &selection)
919                }
920                VectorPredicate::IsNotNull { col_idx } => {
921                    self.filter_is_not_null(&columns[*col_idx], &selection)
922                }
923            };
924        }
925
926        selection
927    }
928
929    /// Filter: column > threshold
930    #[inline]
931    fn filter_int_gt(
932        &self,
933        column: &ColumnBatch,
934        threshold: i64,
935        selection: &SelectionVector,
936    ) -> SelectionVector {
937        selection.filter(|idx| match column.get(idx as usize) {
938            Some(CoreSochValue::Int(v)) => *v > threshold,
939            Some(CoreSochValue::UInt(v)) => (*v as i64) > threshold,
940            _ => false,
941        })
942    }
943
944    /// Filter: column < threshold
945    #[inline]
946    fn filter_int_lt(
947        &self,
948        column: &ColumnBatch,
949        threshold: i64,
950        selection: &SelectionVector,
951    ) -> SelectionVector {
952        selection.filter(|idx| match column.get(idx as usize) {
953            Some(CoreSochValue::Int(v)) => *v < threshold,
954            Some(CoreSochValue::UInt(v)) => (*v as i64) < threshold,
955            _ => false,
956        })
957    }
958
959    /// Filter: column == value
960    #[inline]
961    fn filter_int_eq(
962        &self,
963        column: &ColumnBatch,
964        value: i64,
965        selection: &SelectionVector,
966    ) -> SelectionVector {
967        selection.filter(|idx| match column.get(idx as usize) {
968            Some(CoreSochValue::Int(v)) => *v == value,
969            Some(CoreSochValue::UInt(v)) => (*v as i64) == value,
970            _ => false,
971        })
972    }
973
974    /// Filter: column >= threshold
975    #[inline]
976    fn filter_int_ge(
977        &self,
978        column: &ColumnBatch,
979        threshold: i64,
980        selection: &SelectionVector,
981    ) -> SelectionVector {
982        selection.filter(|idx| match column.get(idx as usize) {
983            Some(CoreSochValue::Int(v)) => *v >= threshold,
984            Some(CoreSochValue::UInt(v)) => (*v as i64) >= threshold,
985            _ => false,
986        })
987    }
988
989    /// Filter: column <= threshold
990    #[inline]
991    fn filter_int_le(
992        &self,
993        column: &ColumnBatch,
994        threshold: i64,
995        selection: &SelectionVector,
996    ) -> SelectionVector {
997        selection.filter(|idx| match column.get(idx as usize) {
998            Some(CoreSochValue::Int(v)) => *v <= threshold,
999            Some(CoreSochValue::UInt(v)) => (*v as i64) <= threshold,
1000            _ => false,
1001        })
1002    }
1003
1004    /// Filter: column == value (string)
1005    #[inline]
1006    fn filter_str_eq(
1007        &self,
1008        column: &ColumnBatch,
1009        value: &str,
1010        selection: &SelectionVector,
1011    ) -> SelectionVector {
1012        selection.filter(|idx| match column.get(idx as usize) {
1013            Some(CoreSochValue::Text(s)) => s == value,
1014            _ => false,
1015        })
1016    }
1017
1018    /// Filter: column starts with prefix
1019    #[inline]
1020    fn filter_str_prefix(
1021        &self,
1022        column: &ColumnBatch,
1023        prefix: &str,
1024        selection: &SelectionVector,
1025    ) -> SelectionVector {
1026        selection.filter(|idx| match column.get(idx as usize) {
1027            Some(CoreSochValue::Text(s)) => s.starts_with(prefix),
1028            _ => false,
1029        })
1030    }
1031
1032    /// Filter: column == value (bool)
1033    #[inline]
1034    fn filter_bool_eq(
1035        &self,
1036        column: &ColumnBatch,
1037        value: bool,
1038        selection: &SelectionVector,
1039    ) -> SelectionVector {
1040        selection.filter(|idx| match column.get(idx as usize) {
1041            Some(CoreSochValue::Bool(b)) => *b == value,
1042            _ => false,
1043        })
1044    }
1045
1046    /// Filter: column IS NULL
1047    #[inline]
1048    fn filter_is_null(&self, column: &ColumnBatch, selection: &SelectionVector) -> SelectionVector {
1049        selection.filter(|idx| matches!(column.get(idx as usize), Some(CoreSochValue::Null)))
1050    }
1051
1052    /// Filter: column IS NOT NULL
1053    #[inline]
1054    fn filter_is_not_null(
1055        &self,
1056        column: &ColumnBatch,
1057        selection: &SelectionVector,
1058    ) -> SelectionVector {
1059        selection
1060            .filter(|idx| !matches!(column.get(idx as usize), Some(CoreSochValue::Null) | None))
1061    }
1062
1063    /// Materialize selected rows from columnar data
1064    pub fn materialize(
1065        &self,
1066        columns: &[ColumnBatch],
1067        selection: &SelectionVector,
1068    ) -> Vec<SochRow> {
1069        selection
1070            .iter()
1071            .map(|idx| {
1072                let values: Vec<CoreSochValue> = columns
1073                    .iter()
1074                    .map(|col| {
1075                        col.get(idx as usize)
1076                            .cloned()
1077                            .unwrap_or(CoreSochValue::Null)
1078                    })
1079                    .collect();
1080                SochRow::new(values)
1081            })
1082            .collect()
1083    }
1084
1085    /// Convert row-oriented data to columnar batches
1086    pub fn row_to_columnar(&self, rows: &[SochRow], column_names: &[String]) -> Vec<ColumnBatch> {
1087        if rows.is_empty() || column_names.is_empty() {
1088            return vec![];
1089        }
1090
1091        let num_cols = column_names.len().min(rows[0].values.len());
1092
1093        (0..num_cols)
1094            .map(|col_idx| {
1095                let values: Vec<CoreSochValue> = rows
1096                    .iter()
1097                    .map(|row| {
1098                        row.values
1099                            .get(col_idx)
1100                            .cloned()
1101                            .unwrap_or(CoreSochValue::Null)
1102                    })
1103                    .collect();
1104                ColumnBatch::new(column_names[col_idx].clone(), values)
1105            })
1106            .collect()
1107    }
1108}
1109
1110impl Default for VectorizedExecutor {
1111    fn default() -> Self {
1112        Self::new(Self::default_batch_size())
1113    }
1114}
1115
1116/// Statistics for vectorized execution
1117#[derive(Debug, Clone, Default)]
1118pub struct VectorizedStats {
1119    /// Rows processed
1120    pub rows_processed: usize,
1121    /// Rows selected
1122    pub rows_selected: usize,
1123    /// Predicates evaluated
1124    pub predicates_evaluated: usize,
1125    /// Short-circuits triggered
1126    pub short_circuits: usize,
1127    /// Processing time (microseconds)
1128    pub time_us: u64,
1129}
1130
1131impl VectorizedStats {
1132    /// Selectivity ratio
1133    pub fn selectivity(&self) -> f64 {
1134        if self.rows_processed == 0 {
1135            0.0
1136        } else {
1137            self.rows_selected as f64 / self.rows_processed as f64
1138        }
1139    }
1140
1141    /// Rows per second
1142    pub fn rows_per_sec(&self) -> f64 {
1143        if self.time_us == 0 {
1144            0.0
1145        } else {
1146            self.rows_processed as f64 / (self.time_us as f64 / 1_000_000.0)
1147        }
1148    }
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153    use super::*;
1154
1155    fn test_catalog() -> Catalog {
1156        let mut catalog = Catalog::new("test_db");
1157
1158        let schema = SochSchema::new("users")
1159            .field("id", SochType::UInt)
1160            .field("name", SochType::Text)
1161            .field("score", SochType::Float);
1162
1163        catalog.create_table(schema, 1).unwrap();
1164        catalog
1165    }
1166
1167    #[test]
1168    fn test_validate_select() {
1169        let catalog = test_catalog();
1170        let executor = SochQlExecutor::new();
1171
1172        let query = SochQuery::Select(SelectQuery {
1173            columns: vec!["id".to_string(), "name".to_string()],
1174            table: "users".to_string(),
1175            where_clause: None,
1176            order_by: None,
1177            limit: None,
1178            offset: None,
1179        });
1180
1181        assert!(executor.validate(&query, &catalog).is_ok());
1182    }
1183
1184    #[test]
1185    fn test_validate_nonexistent_table() {
1186        let catalog = test_catalog();
1187        let executor = SochQlExecutor::new();
1188
1189        let query = SochQuery::Select(SelectQuery {
1190            columns: vec!["*".to_string()],
1191            table: "nonexistent".to_string(),
1192            where_clause: None,
1193            order_by: None,
1194            limit: None,
1195            offset: None,
1196        });
1197
1198        assert!(executor.validate(&query, &catalog).is_err());
1199    }
1200
1201    #[test]
1202    fn test_plan_select() {
1203        let catalog = test_catalog();
1204        let executor = SochQlExecutor::new();
1205
1206        let select = SelectQuery {
1207            columns: vec!["id".to_string(), "name".to_string()],
1208            table: "users".to_string(),
1209            where_clause: Some(WhereClause {
1210                conditions: vec![Condition {
1211                    column: "score".to_string(),
1212                    operator: ComparisonOp::Gt,
1213                    value: SochValue::Float(80.0),
1214                }],
1215                operator: LogicalOp::And,
1216            }),
1217            order_by: Some(OrderBy {
1218                column: "score".to_string(),
1219                direction: SortDirection::Desc,
1220            }),
1221            limit: Some(10),
1222            offset: None,
1223        };
1224
1225        let plan = executor.plan_select(&select, &catalog).unwrap();
1226
1227        // Should be: Limit(Sort(Project(Filter(TableScan))))
1228        match plan {
1229            QueryPlan::Limit { input, count, .. } => {
1230                assert_eq!(count, 10);
1231                match *input {
1232                    QueryPlan::Sort { input, order_by } => {
1233                        assert_eq!(order_by[0].0, "score");
1234                        assert!(!order_by[0].1); // Descending = false
1235                        match *input {
1236                            QueryPlan::Project { input, columns } => {
1237                                assert_eq!(columns, vec!["id", "name"]);
1238                                match *input {
1239                                    QueryPlan::Filter { predicate, .. } => {
1240                                        assert_eq!(predicate.conditions.len(), 1);
1241                                    }
1242                                    _ => panic!("Expected Filter"),
1243                                }
1244                            }
1245                            _ => panic!("Expected Project"),
1246                        }
1247                    }
1248                    _ => panic!("Expected Sort"),
1249                }
1250            }
1251            _ => panic!("Expected Limit"),
1252        }
1253    }
1254
1255    #[test]
1256    fn test_predicate_evaluation() {
1257        let cond = PredicateCondition {
1258            column: "score".to_string(),
1259            operator: ComparisonOp::Gt,
1260            value: CoreSochValue::Float(80.0),
1261        };
1262
1263        let row_pass = SochRow::new(vec![
1264            CoreSochValue::UInt(1),
1265            CoreSochValue::Text("Alice".to_string()),
1266            CoreSochValue::Float(95.0),
1267        ]);
1268
1269        let row_fail = SochRow::new(vec![
1270            CoreSochValue::UInt(2),
1271            CoreSochValue::Text("Bob".to_string()),
1272            CoreSochValue::Float(75.0),
1273        ]);
1274
1275        assert!(cond.evaluate(&row_pass, 2));
1276        assert!(!cond.evaluate(&row_fail, 2));
1277    }
1278
1279    #[test]
1280    fn test_token_reduction() {
1281        // Use more rows and longer field names to show real reduction
1282        let result = SochResult {
1283            table: "user_statistics".to_string(),
1284            columns: vec![
1285                "user_id".to_string(),
1286                "full_name".to_string(),
1287                "email_address".to_string(),
1288                "registration_date".to_string(),
1289                "last_login".to_string(),
1290            ],
1291            rows: (0..20)
1292                .map(|i| {
1293                    vec![
1294                        SochValue::UInt(i as u64),
1295                        SochValue::Text(format!("User Number {}", i)),
1296                        SochValue::Text(format!("user{}@example.com", i)),
1297                        SochValue::Text("2024-01-15".to_string()),
1298                        SochValue::Text("2024-03-20".to_string()),
1299                    ]
1300                })
1301                .collect(),
1302        };
1303
1304        let stats = estimate_token_reduction(&result);
1305
1306        println!("JSON tokens: {}", stats.json_tokens);
1307        println!("TOON tokens: {}", stats.soch_tokens);
1308        println!("Reduction: {}%", stats.reduction_percent);
1309
1310        // With many rows and repeated column names, TOON should be more efficient
1311        assert!(stats.soch_tokens < stats.json_tokens);
1312        assert!(stats.reduction_percent > 0); // Any reduction is valuable
1313    }
1314
1315    // ========================================================================
1316    // Task 12: Vectorized Predicate Evaluation Tests
1317    // ========================================================================
1318
1319    #[test]
1320    fn test_selection_vector_basic() {
1321        let sel = SelectionVector::all(100);
1322        assert_eq!(sel.len(), 100);
1323        assert!(!sel.is_empty());
1324        assert_eq!(sel.selectivity(), 1.0);
1325
1326        let empty = SelectionVector::empty();
1327        assert!(empty.is_empty());
1328        assert_eq!(empty.selectivity(), 0.0);
1329    }
1330
1331    #[test]
1332    fn test_selection_vector_filter() {
1333        let sel = SelectionVector::all(10);
1334
1335        // Keep only even indices
1336        let filtered = sel.filter(|i| i % 2 == 0);
1337        assert_eq!(filtered.len(), 5);
1338
1339        let indices: Vec<u32> = filtered.iter().collect();
1340        assert_eq!(indices, vec![0, 2, 4, 6, 8]);
1341    }
1342
1343    #[test]
1344    fn test_vectorized_int_filter() {
1345        let executor = VectorizedExecutor::new(1024);
1346
1347        // Create a column with values 0-9
1348        let column = ColumnBatch::new(
1349            "value".to_string(),
1350            (0..10).map(CoreSochValue::Int).collect(),
1351        );
1352
1353        let predicates = vec![VectorPredicate::IntGt {
1354            col_idx: 0,
1355            threshold: 5,
1356        }];
1357
1358        let selection = executor.evaluate_batch(&[column], &predicates);
1359
1360        // Should select 6, 7, 8, 9 (4 values > 5)
1361        assert_eq!(selection.len(), 4);
1362        let indices: Vec<u32> = selection.iter().collect();
1363        assert_eq!(indices, vec![6, 7, 8, 9]);
1364    }
1365
1366    #[test]
1367    fn test_vectorized_multiple_predicates() {
1368        let executor = VectorizedExecutor::new(1024);
1369
1370        // Create columns
1371        let id_col = ColumnBatch::new("id".to_string(), (0..100).map(CoreSochValue::Int).collect());
1372
1373        let status_col = ColumnBatch::new(
1374            "active".to_string(),
1375            (0..100).map(|i| CoreSochValue::Bool(i % 2 == 0)).collect(),
1376        );
1377
1378        let predicates = vec![
1379            VectorPredicate::IntGe {
1380                col_idx: 0,
1381                threshold: 50,
1382            },
1383            VectorPredicate::IntLt {
1384                col_idx: 0,
1385                threshold: 60,
1386            },
1387            VectorPredicate::BoolEq {
1388                col_idx: 1,
1389                value: true,
1390            },
1391        ];
1392
1393        let selection = executor.evaluate_batch(&[id_col, status_col], &predicates);
1394
1395        // Should select: 50, 52, 54, 56, 58 (even numbers in [50, 60))
1396        assert_eq!(selection.len(), 5);
1397        let indices: Vec<u32> = selection.iter().collect();
1398        assert_eq!(indices, vec![50, 52, 54, 56, 58]);
1399    }
1400
1401    #[test]
1402    fn test_vectorized_short_circuit() {
1403        let executor = VectorizedExecutor::new(1024);
1404
1405        // Create a column with all values < 0
1406        let column = ColumnBatch::new(
1407            "value".to_string(),
1408            (0..100).map(|_| CoreSochValue::Int(-1)).collect(),
1409        );
1410
1411        // First predicate eliminates everything
1412        let predicates = vec![
1413            VectorPredicate::IntGt {
1414                col_idx: 0,
1415                threshold: 0,
1416            },
1417            // These should not even be evaluated due to short-circuit
1418            VectorPredicate::IntLt {
1419                col_idx: 0,
1420                threshold: 100,
1421            },
1422            VectorPredicate::IntEq {
1423                col_idx: 0,
1424                value: 50,
1425            },
1426        ];
1427
1428        let selection = executor.evaluate_batch(&[column], &predicates);
1429        assert!(selection.is_empty());
1430    }
1431
1432    #[test]
1433    fn test_vectorized_string_predicates() {
1434        let executor = VectorizedExecutor::new(1024);
1435
1436        let names = [
1437            "Alice", "Bob", "Carol", "Dave", "Alice", "Alice", "Bob", "Carol",
1438        ];
1439        let column = ColumnBatch::new(
1440            "name".to_string(),
1441            names
1442                .iter()
1443                .map(|s| CoreSochValue::Text(s.to_string()))
1444                .collect(),
1445        );
1446
1447        let predicates = vec![VectorPredicate::StrEq {
1448            col_idx: 0,
1449            value: "Alice".to_string(),
1450        }];
1451
1452        let selection = executor.evaluate_batch(&[column], &predicates);
1453
1454        // Should select indices 0, 4, 5 (where name == "Alice")
1455        assert_eq!(selection.len(), 3);
1456        let indices: Vec<u32> = selection.iter().collect();
1457        assert_eq!(indices, vec![0, 4, 5]);
1458    }
1459
1460    #[test]
1461    fn test_vectorized_null_handling() {
1462        let executor = VectorizedExecutor::new(1024);
1463
1464        let values = vec![
1465            CoreSochValue::Int(1),
1466            CoreSochValue::Null,
1467            CoreSochValue::Int(2),
1468            CoreSochValue::Null,
1469            CoreSochValue::Int(3),
1470        ];
1471        let column = ColumnBatch::new("value".to_string(), values);
1472
1473        let predicates = vec![VectorPredicate::IsNull { col_idx: 0 }];
1474        let null_selection = executor.evaluate_batch(std::slice::from_ref(&column), &predicates);
1475        assert_eq!(null_selection.len(), 2); // indices 1, 3
1476
1477        let not_null_predicates = vec![VectorPredicate::IsNotNull { col_idx: 0 }];
1478        let not_null_selection = executor.evaluate_batch(&[column], &not_null_predicates);
1479        assert_eq!(not_null_selection.len(), 3); // indices 0, 2, 4
1480    }
1481
1482    #[test]
1483    fn test_row_to_columnar_conversion() {
1484        let executor = VectorizedExecutor::new(1024);
1485
1486        let rows = vec![
1487            SochRow::new(vec![
1488                CoreSochValue::Int(1),
1489                CoreSochValue::Text("Alice".to_string()),
1490            ]),
1491            SochRow::new(vec![
1492                CoreSochValue::Int(2),
1493                CoreSochValue::Text("Bob".to_string()),
1494            ]),
1495            SochRow::new(vec![
1496                CoreSochValue::Int(3),
1497                CoreSochValue::Text("Carol".to_string()),
1498            ]),
1499        ];
1500
1501        let column_names = vec!["id".to_string(), "name".to_string()];
1502        let columns = executor.row_to_columnar(&rows, &column_names);
1503
1504        assert_eq!(columns.len(), 2);
1505        assert_eq!(columns[0].name, "id");
1506        assert_eq!(columns[1].name, "name");
1507        assert_eq!(columns[0].len(), 3);
1508        assert_eq!(columns[1].len(), 3);
1509    }
1510
1511    #[test]
1512    fn test_materialize_selected_rows() {
1513        let executor = VectorizedExecutor::new(1024);
1514
1515        let id_col = ColumnBatch::new(
1516            "id".to_string(),
1517            vec![
1518                CoreSochValue::Int(1),
1519                CoreSochValue::Int(2),
1520                CoreSochValue::Int(3),
1521            ],
1522        );
1523        let name_col = ColumnBatch::new(
1524            "name".to_string(),
1525            vec![
1526                CoreSochValue::Text("Alice".to_string()),
1527                CoreSochValue::Text("Bob".to_string()),
1528                CoreSochValue::Text("Carol".to_string()),
1529            ],
1530        );
1531
1532        // Select rows 0 and 2
1533        let selection = SelectionVector::from_indices(vec![0, 2], 3);
1534
1535        let rows = executor.materialize(&[id_col, name_col], &selection);
1536
1537        assert_eq!(rows.len(), 2);
1538        assert_eq!(rows[0].values[0], CoreSochValue::Int(1));
1539        assert_eq!(rows[0].values[1], CoreSochValue::Text("Alice".to_string()));
1540        assert_eq!(rows[1].values[0], CoreSochValue::Int(3));
1541        assert_eq!(rows[1].values[1], CoreSochValue::Text("Carol".to_string()));
1542    }
1543}