Skip to main content

sochdb_query/
soch_ql_executor.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SOCH-QL Query Executor (Task 6)
19//!
20//! End-to-end SOCH-QL query execution pipeline:
21//! 1. parse(sql) → SochQuery
22//! 2. validate(query, catalog) → Result<()>
23//! 3. plan(query, stats) → QueryPlan
24//! 4. execute(plan, storage) → SochTable
25//!
26//! ## Token Reduction Model
27//!
28//! ```text
29//! tokens_JSON(table) ≈ 4 + Σ(|field_name| + |value| + 4) per row
30//! tokens_TOON(table) ≈ header + Σ(|value| + 1) per row
31//! reduction = 1 - tokens_TOON/tokens_JSON ≈ 0.4 to 0.6
32//!
33//! For 100 rows × 5 fields:
34//! JSON: ~100 × (5 × 15) = 7,500 tokens
35//! TOON: ~50 + 100 × 25 = 2,550 tokens → 66% reduction
36//! ```
37
38use crate::soch_ql::{
39    ComparisonOp, LogicalOp, SelectQuery, SortDirection, SochQlParser, SochQuery, SochResult,
40    SochValue, WhereClause,
41};
42#[cfg(test)]
43use crate::soch_ql::{Condition, OrderBy};
44use std::collections::HashMap;
45use sochdb_core::{Catalog, Result, SochDBError, SochRow, SochValue as CoreSochValue};
46#[cfg(test)]
47use sochdb_core::{SochSchema, SochType};
48
49/// Query plan operators
50#[derive(Debug, Clone)]
51pub enum QueryPlan {
52    /// Full table scan
53    TableScan {
54        table: String,
55        columns: Vec<String>,
56        predicate: Option<Box<QueryPlan>>,
57    },
58    /// Index seek (primary or secondary)
59    IndexSeek { index: String, key_range: KeyRange },
60    /// Filter rows
61    Filter {
62        input: Box<QueryPlan>,
63        predicate: Predicate,
64    },
65    /// Project columns
66    Project {
67        input: Box<QueryPlan>,
68        columns: Vec<String>,
69    },
70    /// Sort results
71    Sort {
72        input: Box<QueryPlan>,
73        order_by: Vec<(String, bool)>, // (column, ascending)
74    },
75    /// Limit results
76    Limit {
77        input: Box<QueryPlan>,
78        count: usize,
79        offset: usize,
80    },
81    /// Empty result
82    Empty,
83}
84
85/// Key range for index seeks
86#[derive(Debug, Clone)]
87pub struct KeyRange {
88    pub start: Option<SochValue>,
89    pub end: Option<SochValue>,
90    pub inclusive_start: bool,
91    pub inclusive_end: bool,
92}
93
94impl KeyRange {
95    pub fn all() -> Self {
96        Self {
97            start: None,
98            end: None,
99            inclusive_start: true,
100            inclusive_end: true,
101        }
102    }
103
104    pub fn eq(value: SochValue) -> Self {
105        Self {
106            start: Some(value.clone()),
107            end: Some(value),
108            inclusive_start: true,
109            inclusive_end: true,
110        }
111    }
112}
113
114/// Query predicate
115#[derive(Debug, Clone)]
116pub struct Predicate {
117    pub conditions: Vec<PredicateCondition>,
118    pub operator: LogicalOp,
119}
120
121/// Single predicate condition (uses CoreSochValue for row compatibility)
122#[derive(Debug, Clone)]
123pub struct PredicateCondition {
124    pub column: String,
125    pub operator: ComparisonOp,
126    pub value: CoreSochValue,
127}
128
129impl PredicateCondition {
130    /// Create from soch_ql SochValue
131    pub fn from_soch_ql(column: String, operator: ComparisonOp, value: &SochValue) -> Self {
132        Self {
133            column,
134            operator,
135            value: Self::convert_value(value),
136        }
137    }
138
139    /// Convert soch_ql::SochValue to CoreSochValue
140    fn convert_value(v: &SochValue) -> CoreSochValue {
141        match v {
142            SochValue::Int(i) => CoreSochValue::Int(*i),
143            SochValue::UInt(u) => CoreSochValue::UInt(*u),
144            SochValue::Float(f) => CoreSochValue::Float(*f),
145            SochValue::Text(s) => CoreSochValue::Text(s.clone()),
146            SochValue::Bool(b) => CoreSochValue::Bool(*b),
147            SochValue::Null => CoreSochValue::Null,
148            SochValue::Binary(b) => CoreSochValue::Binary(b.clone()),
149            SochValue::Array(arr) => {
150                CoreSochValue::Array(arr.iter().map(Self::convert_value).collect())
151            }
152        }
153    }
154
155    /// Evaluate predicate against a row
156    pub fn evaluate(&self, row: &SochRow, column_idx: usize) -> bool {
157        if column_idx >= row.values.len() {
158            return false;
159        }
160
161        let row_value = &row.values[column_idx];
162
163        match self.operator {
164            ComparisonOp::Eq => row_value == &self.value,
165            ComparisonOp::Ne => row_value != &self.value,
166            ComparisonOp::Lt => {
167                Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Less)
168            }
169            ComparisonOp::Le => matches!(
170                Self::compare(row_value, &self.value),
171                Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
172            ),
173            ComparisonOp::Gt => {
174                Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Greater)
175            }
176            ComparisonOp::Ge => matches!(
177                Self::compare(row_value, &self.value),
178                Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
179            ),
180            ComparisonOp::Like => Self::like_match(row_value, &self.value),
181            ComparisonOp::In => Self::in_match(row_value, &self.value),
182            ComparisonOp::SimilarTo => {
183                // SimilarTo is used for vector similarity search
184                // Evaluated by the vector index, not row-by-row comparison
185                // For row-level evaluation, we fall back to Like-style matching
186                Self::like_match(row_value, &self.value)
187            }
188        }
189    }
190
191    fn compare(a: &CoreSochValue, b: &CoreSochValue) -> Option<std::cmp::Ordering> {
192        match (a, b) {
193            (CoreSochValue::Int(a), CoreSochValue::Int(b)) => Some(a.cmp(b)),
194            (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => Some(a.cmp(b)),
195            (CoreSochValue::Float(a), CoreSochValue::Float(b)) => a.partial_cmp(b),
196            (CoreSochValue::Text(a), CoreSochValue::Text(b)) => Some(a.cmp(b)),
197            _ => None,
198        }
199    }
200
201    fn like_match(value: &CoreSochValue, pattern: &CoreSochValue) -> bool {
202        match (value, pattern) {
203            (CoreSochValue::Text(v), CoreSochValue::Text(p)) => {
204                // Simple LIKE: % matches any, _ matches one char
205                let regex_pattern = p.replace('%', ".*").replace('_', ".");
206                regex::Regex::new(&format!("^{}$", regex_pattern))
207                    .map(|re| re.is_match(v))
208                    .unwrap_or(false)
209            }
210            _ => false,
211        }
212    }
213
214    fn in_match(value: &CoreSochValue, list: &CoreSochValue) -> bool {
215        match list {
216            CoreSochValue::Array(values) => values.iter().any(|v| value == v),
217            _ => value == list, // Single value comparison fallback
218        }
219    }
220}
221
222impl Predicate {
223    /// Evaluate predicate against a row
224    pub fn evaluate(&self, row: &SochRow, column_map: &HashMap<String, usize>) -> bool {
225        let results: Vec<bool> = self
226            .conditions
227            .iter()
228            .map(|cond| {
229                column_map
230                    .get(&cond.column)
231                    .map(|&idx| cond.evaluate(row, idx))
232                    .unwrap_or(false)
233            })
234            .collect();
235
236        match self.operator {
237            LogicalOp::And => results.iter().all(|&r| r),
238            LogicalOp::Or => results.iter().any(|&r| r),
239        }
240    }
241}
242
243/// SOCH-QL Query Executor
244pub struct SochQlExecutor;
245
246impl SochQlExecutor {
247    /// Create a new executor
248    pub fn new() -> Self {
249        Self
250    }
251
252    /// Execute a SOCH-QL query string
253    pub fn execute(&self, query: &str, catalog: &Catalog) -> Result<SochResult> {
254        // Parse
255        let parsed = SochQlParser::parse(query)
256            .map_err(|e| SochDBError::InvalidArgument(format!("Parse error: {:?}", e)))?;
257
258        // Validate
259        self.validate(&parsed, catalog)?;
260
261        // Plan
262        let plan = self.plan(&parsed, catalog)?;
263
264        // Execute
265        self.execute_plan(&plan, catalog)
266    }
267
268    /// Validate a parsed query against the catalog
269    pub fn validate(&self, query: &SochQuery, catalog: &Catalog) -> Result<()> {
270        match query {
271            SochQuery::Select(select) => {
272                // Check table exists
273                if catalog.get_table(&select.table).is_none() {
274                    return Err(SochDBError::NotFound(format!(
275                        "Table '{}' not found",
276                        select.table
277                    )));
278                }
279
280                // Check columns exist (if not *)
281                if let Some(entry) = catalog.get_table(&select.table)
282                    && let Some(schema) = &entry.schema
283                {
284                    for col in &select.columns {
285                        if col != "*" && !schema.fields.iter().any(|f| &f.name == col) {
286                            return Err(SochDBError::InvalidArgument(format!(
287                                "Column '{}' not found in table '{}'",
288                                col, select.table
289                            )));
290                        }
291                    }
292                }
293
294                Ok(())
295            }
296            SochQuery::Insert(insert) => {
297                // Check table exists
298                if catalog.get_table(&insert.table).is_none() {
299                    return Err(SochDBError::NotFound(format!(
300                        "Table '{}' not found",
301                        insert.table
302                    )));
303                }
304                Ok(())
305            }
306            SochQuery::CreateTable(create) => {
307                // Check table doesn't exist
308                if catalog.get_table(&create.table).is_some() {
309                    return Err(SochDBError::InvalidArgument(format!(
310                        "Table '{}' already exists",
311                        create.table
312                    )));
313                }
314                Ok(())
315            }
316            SochQuery::DropTable { table } => {
317                if catalog.get_table(table).is_none() {
318                    return Err(SochDBError::NotFound(format!(
319                        "Table '{}' not found",
320                        table
321                    )));
322                }
323                Ok(())
324            }
325        }
326    }
327
328    /// Generate a query plan
329    pub fn plan(&self, query: &SochQuery, catalog: &Catalog) -> Result<QueryPlan> {
330        match query {
331            SochQuery::Select(select) => self.plan_select(select, catalog),
332            _ => Err(SochDBError::InvalidArgument(
333                "Only SELECT queries can be planned".to_string(),
334            )),
335        }
336    }
337
338    fn plan_select(&self, select: &SelectQuery, _catalog: &Catalog) -> Result<QueryPlan> {
339        // Start with table scan
340        let mut plan = QueryPlan::TableScan {
341            table: select.table.clone(),
342            columns: select.columns.clone(),
343            predicate: None,
344        };
345
346        // Add filter if WHERE clause present
347        if let Some(where_clause) = &select.where_clause {
348            let predicate = self.build_predicate(where_clause);
349            plan = QueryPlan::Filter {
350                input: Box::new(plan),
351                predicate,
352            };
353        }
354
355        // Add projection (column selection)
356        if !select.columns.contains(&"*".to_string()) {
357            plan = QueryPlan::Project {
358                input: Box::new(plan),
359                columns: select.columns.clone(),
360            };
361        }
362
363        // Add sort if ORDER BY present
364        if let Some(order_by) = &select.order_by {
365            plan = QueryPlan::Sort {
366                input: Box::new(plan),
367                order_by: vec![(
368                    order_by.column.clone(),
369                    matches!(order_by.direction, SortDirection::Asc),
370                )],
371            };
372        }
373
374        // Add limit if present
375        if select.limit.is_some() || select.offset.is_some() {
376            plan = QueryPlan::Limit {
377                input: Box::new(plan),
378                count: select.limit.unwrap_or(usize::MAX),
379                offset: select.offset.unwrap_or(0),
380            };
381        }
382
383        Ok(plan)
384    }
385
386    fn build_predicate(&self, where_clause: &WhereClause) -> Predicate {
387        Predicate {
388            conditions: where_clause
389                .conditions
390                .iter()
391                .map(|c| PredicateCondition::from_soch_ql(c.column.clone(), c.operator, &c.value))
392                .collect(),
393            operator: where_clause.operator,
394        }
395    }
396
397    /// Execute a query plan
398    #[allow(clippy::only_used_in_recursion)]
399    pub fn execute_plan(&self, plan: &QueryPlan, catalog: &Catalog) -> Result<SochResult> {
400        // For now, return empty results (storage integration pending)
401        // This is the interface that will connect to StorageEngine
402        match plan {
403            QueryPlan::Empty => Ok(SochResult {
404                table: "result".to_string(),
405                columns: vec![],
406                rows: vec![],
407            }),
408            QueryPlan::TableScan { table, columns, .. } => {
409                // Get schema from catalog
410                let schema_columns = if let Some(entry) = catalog.get_table(table) {
411                    if let Some(schema) = &entry.schema {
412                        if columns.contains(&"*".to_string()) {
413                            schema.fields.iter().map(|f| f.name.clone()).collect()
414                        } else {
415                            columns.clone()
416                        }
417                    } else {
418                        columns.clone()
419                    }
420                } else {
421                    columns.clone()
422                };
423
424                Ok(SochResult {
425                    table: table.clone(),
426                    columns: schema_columns,
427                    rows: vec![], // Storage integration will populate this
428                })
429            }
430            QueryPlan::Filter { input, .. } => self.execute_plan(input, catalog),
431            QueryPlan::Project { input, columns } => {
432                let mut result = self.execute_plan(input, catalog)?;
433                result.columns = columns.clone();
434                Ok(result)
435            }
436            QueryPlan::Sort { input, .. } => self.execute_plan(input, catalog),
437            QueryPlan::Limit {
438                input,
439                count,
440                offset,
441            } => {
442                let mut result = self.execute_plan(input, catalog)?;
443                result.rows = result.rows.into_iter().skip(*offset).take(*count).collect();
444                Ok(result)
445            }
446            QueryPlan::IndexSeek { .. } => Ok(SochResult {
447                table: "result".to_string(),
448                columns: vec![],
449                rows: vec![],
450            }),
451        }
452    }
453}
454
455impl Default for SochQlExecutor {
456    fn default() -> Self {
457        Self::new()
458    }
459}
460
461/// Execute a SOCH-QL query (convenience function)
462pub fn execute_sochql(query: &str, catalog: &Catalog) -> Result<SochResult> {
463    SochQlExecutor::new().execute(query, catalog)
464}
465
466/// Estimate token reduction for a TOON result vs JSON
467pub fn estimate_token_reduction(result: &SochResult) -> TokenReductionStats {
468    let row_count = result.rows.len();
469    let col_count = result.columns.len();
470
471    if row_count == 0 || col_count == 0 {
472        return TokenReductionStats::default();
473    }
474
475    // Estimate JSON tokens
476    // Format: [{"col1": "val1", "col2": "val2"}, ...]
477    let avg_col_name_len: usize = result.columns.iter().map(|c| c.len()).sum::<usize>() / col_count;
478    let avg_value_len = 10; // Rough estimate
479
480    // JSON: 2 (brackets) + row_count * (2 + col_count * (col_name + 4 + value))
481    let json_tokens = 2 + row_count * (2 + col_count * (avg_col_name_len + 4 + avg_value_len));
482
483    // TOON: header + row_count * (col_count * value + col_count)
484    // Header: table[count]{col1,col2,...}:
485    let header_tokens = result.table.len() + 10 + result.columns.join(",").len();
486    let soch_tokens = header_tokens + row_count * (col_count * avg_value_len + col_count);
487
488    let reduction = 1.0 - (soch_tokens as f64 / json_tokens as f64);
489
490    TokenReductionStats {
491        json_tokens,
492        soch_tokens,
493        reduction_percent: (reduction * 100.0) as u32,
494        row_count,
495        col_count,
496    }
497}
498
499/// Token reduction statistics
500#[derive(Debug, Clone, Default)]
501pub struct TokenReductionStats {
502    /// Estimated JSON tokens
503    pub json_tokens: usize,
504    /// Estimated TOON tokens
505    pub soch_tokens: usize,
506    /// Reduction percentage
507    pub reduction_percent: u32,
508    /// Row count
509    pub row_count: usize,
510    /// Column count
511    pub col_count: usize,
512}
513
514// ============================================================================
515// Task 12: Vectorized Predicate Evaluation
516// ============================================================================
517
518/// Selection vector for batch predicate evaluation
519///
520/// Maintains indices of rows that pass all predicates so far.
521/// Short-circuits at batch level when selection becomes empty.
522#[derive(Debug, Clone)]
523pub struct SelectionVector {
524    /// Selected row indices (sorted)
525    indices: Vec<u32>,
526    /// Original batch size
527    batch_size: usize,
528}
529
530impl SelectionVector {
531    /// Create selection with all rows selected
532    pub fn all(batch_size: usize) -> Self {
533        Self {
534            indices: (0..batch_size as u32).collect(),
535            batch_size,
536        }
537    }
538
539    /// Create empty selection
540    pub fn empty() -> Self {
541        Self {
542            indices: Vec::new(),
543            batch_size: 0,
544        }
545    }
546
547    /// Create from specific indices
548    pub fn from_indices(indices: Vec<u32>, batch_size: usize) -> Self {
549        Self {
550            indices,
551            batch_size,
552        }
553    }
554
555    /// Check if selection is empty (short-circuit condition)
556    #[inline]
557    pub fn is_empty(&self) -> bool {
558        self.indices.is_empty()
559    }
560
561    /// Number of selected rows
562    #[inline]
563    pub fn len(&self) -> usize {
564        self.indices.len()
565    }
566
567    /// Original batch size
568    #[inline]
569    pub fn batch_size(&self) -> usize {
570        self.batch_size
571    }
572
573    /// Selectivity ratio
574    #[inline]
575    pub fn selectivity(&self) -> f64 {
576        if self.batch_size == 0 {
577            0.0
578        } else {
579            self.len() as f64 / self.batch_size as f64
580        }
581    }
582
583    /// Iterate over selected indices
584    pub fn iter(&self) -> impl Iterator<Item = u32> + '_ {
585        self.indices.iter().copied()
586    }
587
588    /// Filter selection with a predicate, returning new selection
589    pub fn filter<F>(&self, pred: F) -> Self
590    where
591        F: Fn(u32) -> bool,
592    {
593        Self {
594            indices: self.indices.iter().copied().filter(|&i| pred(i)).collect(),
595            batch_size: self.batch_size,
596        }
597    }
598
599    /// Extend with masked indices (for SIMD results)
600    pub fn extend_masked(&mut self, start_idx: usize, mask: u16) {
601        for bit in 0..16 {
602            if (mask >> bit) & 1 == 1 {
603                self.indices.push((start_idx + bit) as u32);
604            }
605        }
606    }
607}
608
609/// Columnar batch for vectorized processing
610#[derive(Debug, Clone)]
611pub struct ColumnBatch {
612    /// Column values
613    pub values: Vec<CoreSochValue>,
614    /// Column name
615    pub name: String,
616}
617
618impl ColumnBatch {
619    /// Create from column data
620    pub fn new(name: String, values: Vec<CoreSochValue>) -> Self {
621        Self { values, name }
622    }
623
624    /// Get value at index
625    #[inline]
626    pub fn get(&self, idx: usize) -> Option<&CoreSochValue> {
627        self.values.get(idx)
628    }
629
630    /// Get raw integer data pointer (for SIMD)
631    #[allow(dead_code)]
632    pub fn as_i64_slice(&self) -> Option<Vec<i64>> {
633        self.values
634            .iter()
635            .map(|v| match v {
636                CoreSochValue::Int(i) => Some(*i),
637                CoreSochValue::UInt(u) => Some(*u as i64),
638                _ => None,
639            })
640            .collect()
641    }
642
643    /// Batch size
644    pub fn len(&self) -> usize {
645        self.values.len()
646    }
647
648    /// Is empty
649    pub fn is_empty(&self) -> bool {
650        self.values.is_empty()
651    }
652}
653
654/// Vectorized predicate for batch evaluation
655#[derive(Debug, Clone)]
656pub enum VectorPredicate {
657    /// Integer greater than
658    IntGt { col_idx: usize, threshold: i64 },
659    /// Integer less than
660    IntLt { col_idx: usize, threshold: i64 },
661    /// Integer equals
662    IntEq { col_idx: usize, value: i64 },
663    /// Integer greater or equal
664    IntGe { col_idx: usize, threshold: i64 },
665    /// Integer less or equal
666    IntLe { col_idx: usize, threshold: i64 },
667    /// String equals
668    StrEq { col_idx: usize, value: String },
669    /// String prefix match
670    StrPrefix { col_idx: usize, prefix: String },
671    /// Boolean equals
672    BoolEq { col_idx: usize, value: bool },
673    /// Is null check
674    IsNull { col_idx: usize },
675    /// Is not null check
676    IsNotNull { col_idx: usize },
677}
678
679/// Vectorized executor for batch predicate evaluation
680///
681/// ## Performance Characteristics
682///
683/// - Traditional row-at-a-time: ~100M rows/sec (branch misprediction bound)
684/// - Vectorized with selection vector: ~1B rows/sec (branch-free)
685///
686/// ## Usage
687///
688/// ```ignore
689/// let executor = VectorizedExecutor::new(1024);
690/// let columns = vec![/* column batches */];
691/// let predicates = vec![/* predicates */];
692/// let selection = executor.evaluate_batch(&columns, &predicates);
693/// ```
694pub struct VectorizedExecutor {
695    /// Batch size for processing
696    batch_size: usize,
697}
698
699impl VectorizedExecutor {
700    /// Create with specified batch size
701    pub fn new(batch_size: usize) -> Self {
702        Self { batch_size }
703    }
704
705    /// Default batch size (1024 rows)
706    pub fn default_batch_size() -> usize {
707        1024
708    }
709
710    /// Evaluate predicates on columnar batch
711    ///
712    /// Returns selection vector of rows that pass all predicates.
713    /// Short-circuits at batch level when selection becomes empty.
714    pub fn evaluate_batch(
715        &self,
716        columns: &[ColumnBatch],
717        predicates: &[VectorPredicate],
718    ) -> SelectionVector {
719        if columns.is_empty() {
720            return SelectionVector::empty();
721        }
722
723        let batch_size = columns[0].len().min(self.batch_size);
724        let mut selection = SelectionVector::all(batch_size);
725
726        // Process predicates with short-circuit
727        for predicate in predicates {
728            if selection.is_empty() {
729                break; // Batch-level short-circuit
730            }
731
732            selection = match predicate {
733                VectorPredicate::IntGt { col_idx, threshold } => {
734                    self.filter_int_gt(&columns[*col_idx], *threshold, &selection)
735                }
736                VectorPredicate::IntLt { col_idx, threshold } => {
737                    self.filter_int_lt(&columns[*col_idx], *threshold, &selection)
738                }
739                VectorPredicate::IntEq { col_idx, value } => {
740                    self.filter_int_eq(&columns[*col_idx], *value, &selection)
741                }
742                VectorPredicate::IntGe { col_idx, threshold } => {
743                    self.filter_int_ge(&columns[*col_idx], *threshold, &selection)
744                }
745                VectorPredicate::IntLe { col_idx, threshold } => {
746                    self.filter_int_le(&columns[*col_idx], *threshold, &selection)
747                }
748                VectorPredicate::StrEq { col_idx, value } => {
749                    self.filter_str_eq(&columns[*col_idx], value, &selection)
750                }
751                VectorPredicate::StrPrefix { col_idx, prefix } => {
752                    self.filter_str_prefix(&columns[*col_idx], prefix, &selection)
753                }
754                VectorPredicate::BoolEq { col_idx, value } => {
755                    self.filter_bool_eq(&columns[*col_idx], *value, &selection)
756                }
757                VectorPredicate::IsNull { col_idx } => {
758                    self.filter_is_null(&columns[*col_idx], &selection)
759                }
760                VectorPredicate::IsNotNull { col_idx } => {
761                    self.filter_is_not_null(&columns[*col_idx], &selection)
762                }
763            };
764        }
765
766        selection
767    }
768
769    /// Filter: column > threshold
770    #[inline]
771    fn filter_int_gt(
772        &self,
773        column: &ColumnBatch,
774        threshold: i64,
775        selection: &SelectionVector,
776    ) -> SelectionVector {
777        selection.filter(|idx| match column.get(idx as usize) {
778            Some(CoreSochValue::Int(v)) => *v > threshold,
779            Some(CoreSochValue::UInt(v)) => (*v as i64) > threshold,
780            _ => false,
781        })
782    }
783
784    /// Filter: column < threshold
785    #[inline]
786    fn filter_int_lt(
787        &self,
788        column: &ColumnBatch,
789        threshold: i64,
790        selection: &SelectionVector,
791    ) -> SelectionVector {
792        selection.filter(|idx| match column.get(idx as usize) {
793            Some(CoreSochValue::Int(v)) => *v < threshold,
794            Some(CoreSochValue::UInt(v)) => (*v as i64) < threshold,
795            _ => false,
796        })
797    }
798
799    /// Filter: column == value
800    #[inline]
801    fn filter_int_eq(
802        &self,
803        column: &ColumnBatch,
804        value: i64,
805        selection: &SelectionVector,
806    ) -> SelectionVector {
807        selection.filter(|idx| match column.get(idx as usize) {
808            Some(CoreSochValue::Int(v)) => *v == value,
809            Some(CoreSochValue::UInt(v)) => (*v as i64) == value,
810            _ => false,
811        })
812    }
813
814    /// Filter: column >= threshold
815    #[inline]
816    fn filter_int_ge(
817        &self,
818        column: &ColumnBatch,
819        threshold: i64,
820        selection: &SelectionVector,
821    ) -> SelectionVector {
822        selection.filter(|idx| match column.get(idx as usize) {
823            Some(CoreSochValue::Int(v)) => *v >= threshold,
824            Some(CoreSochValue::UInt(v)) => (*v as i64) >= threshold,
825            _ => false,
826        })
827    }
828
829    /// Filter: column <= threshold
830    #[inline]
831    fn filter_int_le(
832        &self,
833        column: &ColumnBatch,
834        threshold: i64,
835        selection: &SelectionVector,
836    ) -> SelectionVector {
837        selection.filter(|idx| match column.get(idx as usize) {
838            Some(CoreSochValue::Int(v)) => *v <= threshold,
839            Some(CoreSochValue::UInt(v)) => (*v as i64) <= threshold,
840            _ => false,
841        })
842    }
843
844    /// Filter: column == value (string)
845    #[inline]
846    fn filter_str_eq(
847        &self,
848        column: &ColumnBatch,
849        value: &str,
850        selection: &SelectionVector,
851    ) -> SelectionVector {
852        selection.filter(|idx| match column.get(idx as usize) {
853            Some(CoreSochValue::Text(s)) => s == value,
854            _ => false,
855        })
856    }
857
858    /// Filter: column starts with prefix
859    #[inline]
860    fn filter_str_prefix(
861        &self,
862        column: &ColumnBatch,
863        prefix: &str,
864        selection: &SelectionVector,
865    ) -> SelectionVector {
866        selection.filter(|idx| match column.get(idx as usize) {
867            Some(CoreSochValue::Text(s)) => s.starts_with(prefix),
868            _ => false,
869        })
870    }
871
872    /// Filter: column == value (bool)
873    #[inline]
874    fn filter_bool_eq(
875        &self,
876        column: &ColumnBatch,
877        value: bool,
878        selection: &SelectionVector,
879    ) -> SelectionVector {
880        selection.filter(|idx| match column.get(idx as usize) {
881            Some(CoreSochValue::Bool(b)) => *b == value,
882            _ => false,
883        })
884    }
885
886    /// Filter: column IS NULL
887    #[inline]
888    fn filter_is_null(&self, column: &ColumnBatch, selection: &SelectionVector) -> SelectionVector {
889        selection.filter(|idx| matches!(column.get(idx as usize), Some(CoreSochValue::Null)))
890    }
891
892    /// Filter: column IS NOT NULL
893    #[inline]
894    fn filter_is_not_null(
895        &self,
896        column: &ColumnBatch,
897        selection: &SelectionVector,
898    ) -> SelectionVector {
899        selection
900            .filter(|idx| !matches!(column.get(idx as usize), Some(CoreSochValue::Null) | None))
901    }
902
903    /// Materialize selected rows from columnar data
904    pub fn materialize(
905        &self,
906        columns: &[ColumnBatch],
907        selection: &SelectionVector,
908    ) -> Vec<SochRow> {
909        selection
910            .iter()
911            .map(|idx| {
912                let values: Vec<CoreSochValue> = columns
913                    .iter()
914                    .map(|col| {
915                        col.get(idx as usize)
916                            .cloned()
917                            .unwrap_or(CoreSochValue::Null)
918                    })
919                    .collect();
920                SochRow::new(values)
921            })
922            .collect()
923    }
924
925    /// Convert row-oriented data to columnar batches
926    pub fn row_to_columnar(&self, rows: &[SochRow], column_names: &[String]) -> Vec<ColumnBatch> {
927        if rows.is_empty() || column_names.is_empty() {
928            return vec![];
929        }
930
931        let num_cols = column_names.len().min(rows[0].values.len());
932
933        (0..num_cols)
934            .map(|col_idx| {
935                let values: Vec<CoreSochValue> = rows
936                    .iter()
937                    .map(|row| {
938                        row.values
939                            .get(col_idx)
940                            .cloned()
941                            .unwrap_or(CoreSochValue::Null)
942                    })
943                    .collect();
944                ColumnBatch::new(column_names[col_idx].clone(), values)
945            })
946            .collect()
947    }
948}
949
950impl Default for VectorizedExecutor {
951    fn default() -> Self {
952        Self::new(Self::default_batch_size())
953    }
954}
955
956/// Statistics for vectorized execution
957#[derive(Debug, Clone, Default)]
958pub struct VectorizedStats {
959    /// Rows processed
960    pub rows_processed: usize,
961    /// Rows selected
962    pub rows_selected: usize,
963    /// Predicates evaluated
964    pub predicates_evaluated: usize,
965    /// Short-circuits triggered
966    pub short_circuits: usize,
967    /// Processing time (microseconds)
968    pub time_us: u64,
969}
970
971impl VectorizedStats {
972    /// Selectivity ratio
973    pub fn selectivity(&self) -> f64 {
974        if self.rows_processed == 0 {
975            0.0
976        } else {
977            self.rows_selected as f64 / self.rows_processed as f64
978        }
979    }
980
981    /// Rows per second
982    pub fn rows_per_sec(&self) -> f64 {
983        if self.time_us == 0 {
984            0.0
985        } else {
986            self.rows_processed as f64 / (self.time_us as f64 / 1_000_000.0)
987        }
988    }
989}
990
991#[cfg(test)]
992mod tests {
993    use super::*;
994
995    fn test_catalog() -> Catalog {
996        let mut catalog = Catalog::new("test_db");
997
998        let schema = SochSchema::new("users")
999            .field("id", SochType::UInt)
1000            .field("name", SochType::Text)
1001            .field("score", SochType::Float);
1002
1003        catalog.create_table(schema, 1).unwrap();
1004        catalog
1005    }
1006
1007    #[test]
1008    fn test_validate_select() {
1009        let catalog = test_catalog();
1010        let executor = SochQlExecutor::new();
1011
1012        let query = SochQuery::Select(SelectQuery {
1013            columns: vec!["id".to_string(), "name".to_string()],
1014            table: "users".to_string(),
1015            where_clause: None,
1016            order_by: None,
1017            limit: None,
1018            offset: None,
1019        });
1020
1021        assert!(executor.validate(&query, &catalog).is_ok());
1022    }
1023
1024    #[test]
1025    fn test_validate_nonexistent_table() {
1026        let catalog = test_catalog();
1027        let executor = SochQlExecutor::new();
1028
1029        let query = SochQuery::Select(SelectQuery {
1030            columns: vec!["*".to_string()],
1031            table: "nonexistent".to_string(),
1032            where_clause: None,
1033            order_by: None,
1034            limit: None,
1035            offset: None,
1036        });
1037
1038        assert!(executor.validate(&query, &catalog).is_err());
1039    }
1040
1041    #[test]
1042    fn test_plan_select() {
1043        let catalog = test_catalog();
1044        let executor = SochQlExecutor::new();
1045
1046        let select = SelectQuery {
1047            columns: vec!["id".to_string(), "name".to_string()],
1048            table: "users".to_string(),
1049            where_clause: Some(WhereClause {
1050                conditions: vec![Condition {
1051                    column: "score".to_string(),
1052                    operator: ComparisonOp::Gt,
1053                    value: SochValue::Float(80.0),
1054                }],
1055                operator: LogicalOp::And,
1056            }),
1057            order_by: Some(OrderBy {
1058                column: "score".to_string(),
1059                direction: SortDirection::Desc,
1060            }),
1061            limit: Some(10),
1062            offset: None,
1063        };
1064
1065        let plan = executor.plan_select(&select, &catalog).unwrap();
1066
1067        // Should be: Limit(Sort(Project(Filter(TableScan))))
1068        match plan {
1069            QueryPlan::Limit { input, count, .. } => {
1070                assert_eq!(count, 10);
1071                match *input {
1072                    QueryPlan::Sort { input, order_by } => {
1073                        assert_eq!(order_by[0].0, "score");
1074                        assert!(!order_by[0].1); // Descending = false
1075                        match *input {
1076                            QueryPlan::Project { input, columns } => {
1077                                assert_eq!(columns, vec!["id", "name"]);
1078                                match *input {
1079                                    QueryPlan::Filter { predicate, .. } => {
1080                                        assert_eq!(predicate.conditions.len(), 1);
1081                                    }
1082                                    _ => panic!("Expected Filter"),
1083                                }
1084                            }
1085                            _ => panic!("Expected Project"),
1086                        }
1087                    }
1088                    _ => panic!("Expected Sort"),
1089                }
1090            }
1091            _ => panic!("Expected Limit"),
1092        }
1093    }
1094
1095    #[test]
1096    fn test_predicate_evaluation() {
1097        let cond = PredicateCondition {
1098            column: "score".to_string(),
1099            operator: ComparisonOp::Gt,
1100            value: CoreSochValue::Float(80.0),
1101        };
1102
1103        let row_pass = SochRow::new(vec![
1104            CoreSochValue::UInt(1),
1105            CoreSochValue::Text("Alice".to_string()),
1106            CoreSochValue::Float(95.0),
1107        ]);
1108
1109        let row_fail = SochRow::new(vec![
1110            CoreSochValue::UInt(2),
1111            CoreSochValue::Text("Bob".to_string()),
1112            CoreSochValue::Float(75.0),
1113        ]);
1114
1115        assert!(cond.evaluate(&row_pass, 2));
1116        assert!(!cond.evaluate(&row_fail, 2));
1117    }
1118
1119    #[test]
1120    fn test_token_reduction() {
1121        // Use more rows and longer field names to show real reduction
1122        let result = SochResult {
1123            table: "user_statistics".to_string(),
1124            columns: vec![
1125                "user_id".to_string(),
1126                "full_name".to_string(),
1127                "email_address".to_string(),
1128                "registration_date".to_string(),
1129                "last_login".to_string(),
1130            ],
1131            rows: (0..20)
1132                .map(|i| {
1133                    vec![
1134                        SochValue::UInt(i as u64),
1135                        SochValue::Text(format!("User Number {}", i)),
1136                        SochValue::Text(format!("user{}@example.com", i)),
1137                        SochValue::Text("2024-01-15".to_string()),
1138                        SochValue::Text("2024-03-20".to_string()),
1139                    ]
1140                })
1141                .collect(),
1142        };
1143
1144        let stats = estimate_token_reduction(&result);
1145
1146        println!("JSON tokens: {}", stats.json_tokens);
1147        println!("TOON tokens: {}", stats.soch_tokens);
1148        println!("Reduction: {}%", stats.reduction_percent);
1149
1150        // With many rows and repeated column names, TOON should be more efficient
1151        assert!(stats.soch_tokens < stats.json_tokens);
1152        assert!(stats.reduction_percent > 0); // Any reduction is valuable
1153    }
1154
1155    // ========================================================================
1156    // Task 12: Vectorized Predicate Evaluation Tests
1157    // ========================================================================
1158
1159    #[test]
1160    fn test_selection_vector_basic() {
1161        let sel = SelectionVector::all(100);
1162        assert_eq!(sel.len(), 100);
1163        assert!(!sel.is_empty());
1164        assert_eq!(sel.selectivity(), 1.0);
1165
1166        let empty = SelectionVector::empty();
1167        assert!(empty.is_empty());
1168        assert_eq!(empty.selectivity(), 0.0);
1169    }
1170
1171    #[test]
1172    fn test_selection_vector_filter() {
1173        let sel = SelectionVector::all(10);
1174
1175        // Keep only even indices
1176        let filtered = sel.filter(|i| i % 2 == 0);
1177        assert_eq!(filtered.len(), 5);
1178
1179        let indices: Vec<u32> = filtered.iter().collect();
1180        assert_eq!(indices, vec![0, 2, 4, 6, 8]);
1181    }
1182
1183    #[test]
1184    fn test_vectorized_int_filter() {
1185        let executor = VectorizedExecutor::new(1024);
1186
1187        // Create a column with values 0-9
1188        let column = ColumnBatch::new(
1189            "value".to_string(),
1190            (0..10).map(CoreSochValue::Int).collect(),
1191        );
1192
1193        let predicates = vec![VectorPredicate::IntGt {
1194            col_idx: 0,
1195            threshold: 5,
1196        }];
1197
1198        let selection = executor.evaluate_batch(&[column], &predicates);
1199
1200        // Should select 6, 7, 8, 9 (4 values > 5)
1201        assert_eq!(selection.len(), 4);
1202        let indices: Vec<u32> = selection.iter().collect();
1203        assert_eq!(indices, vec![6, 7, 8, 9]);
1204    }
1205
1206    #[test]
1207    fn test_vectorized_multiple_predicates() {
1208        let executor = VectorizedExecutor::new(1024);
1209
1210        // Create columns
1211        let id_col = ColumnBatch::new("id".to_string(), (0..100).map(CoreSochValue::Int).collect());
1212
1213        let status_col = ColumnBatch::new(
1214            "active".to_string(),
1215            (0..100).map(|i| CoreSochValue::Bool(i % 2 == 0)).collect(),
1216        );
1217
1218        let predicates = vec![
1219            VectorPredicate::IntGe {
1220                col_idx: 0,
1221                threshold: 50,
1222            },
1223            VectorPredicate::IntLt {
1224                col_idx: 0,
1225                threshold: 60,
1226            },
1227            VectorPredicate::BoolEq {
1228                col_idx: 1,
1229                value: true,
1230            },
1231        ];
1232
1233        let selection = executor.evaluate_batch(&[id_col, status_col], &predicates);
1234
1235        // Should select: 50, 52, 54, 56, 58 (even numbers in [50, 60))
1236        assert_eq!(selection.len(), 5);
1237        let indices: Vec<u32> = selection.iter().collect();
1238        assert_eq!(indices, vec![50, 52, 54, 56, 58]);
1239    }
1240
1241    #[test]
1242    fn test_vectorized_short_circuit() {
1243        let executor = VectorizedExecutor::new(1024);
1244
1245        // Create a column with all values < 0
1246        let column = ColumnBatch::new(
1247            "value".to_string(),
1248            (0..100).map(|_| CoreSochValue::Int(-1)).collect(),
1249        );
1250
1251        // First predicate eliminates everything
1252        let predicates = vec![
1253            VectorPredicate::IntGt {
1254                col_idx: 0,
1255                threshold: 0,
1256            },
1257            // These should not even be evaluated due to short-circuit
1258            VectorPredicate::IntLt {
1259                col_idx: 0,
1260                threshold: 100,
1261            },
1262            VectorPredicate::IntEq {
1263                col_idx: 0,
1264                value: 50,
1265            },
1266        ];
1267
1268        let selection = executor.evaluate_batch(&[column], &predicates);
1269        assert!(selection.is_empty());
1270    }
1271
1272    #[test]
1273    fn test_vectorized_string_predicates() {
1274        let executor = VectorizedExecutor::new(1024);
1275
1276        let names = [
1277            "Alice", "Bob", "Carol", "Dave", "Alice", "Alice", "Bob", "Carol",
1278        ];
1279        let column = ColumnBatch::new(
1280            "name".to_string(),
1281            names
1282                .iter()
1283                .map(|s| CoreSochValue::Text(s.to_string()))
1284                .collect(),
1285        );
1286
1287        let predicates = vec![VectorPredicate::StrEq {
1288            col_idx: 0,
1289            value: "Alice".to_string(),
1290        }];
1291
1292        let selection = executor.evaluate_batch(&[column], &predicates);
1293
1294        // Should select indices 0, 4, 5 (where name == "Alice")
1295        assert_eq!(selection.len(), 3);
1296        let indices: Vec<u32> = selection.iter().collect();
1297        assert_eq!(indices, vec![0, 4, 5]);
1298    }
1299
1300    #[test]
1301    fn test_vectorized_null_handling() {
1302        let executor = VectorizedExecutor::new(1024);
1303
1304        let values = vec![
1305            CoreSochValue::Int(1),
1306            CoreSochValue::Null,
1307            CoreSochValue::Int(2),
1308            CoreSochValue::Null,
1309            CoreSochValue::Int(3),
1310        ];
1311        let column = ColumnBatch::new("value".to_string(), values);
1312
1313        let predicates = vec![VectorPredicate::IsNull { col_idx: 0 }];
1314        let null_selection = executor.evaluate_batch(std::slice::from_ref(&column), &predicates);
1315        assert_eq!(null_selection.len(), 2); // indices 1, 3
1316
1317        let not_null_predicates = vec![VectorPredicate::IsNotNull { col_idx: 0 }];
1318        let not_null_selection = executor.evaluate_batch(&[column], &not_null_predicates);
1319        assert_eq!(not_null_selection.len(), 3); // indices 0, 2, 4
1320    }
1321
1322    #[test]
1323    fn test_row_to_columnar_conversion() {
1324        let executor = VectorizedExecutor::new(1024);
1325
1326        let rows = vec![
1327            SochRow::new(vec![
1328                CoreSochValue::Int(1),
1329                CoreSochValue::Text("Alice".to_string()),
1330            ]),
1331            SochRow::new(vec![
1332                CoreSochValue::Int(2),
1333                CoreSochValue::Text("Bob".to_string()),
1334            ]),
1335            SochRow::new(vec![
1336                CoreSochValue::Int(3),
1337                CoreSochValue::Text("Carol".to_string()),
1338            ]),
1339        ];
1340
1341        let column_names = vec!["id".to_string(), "name".to_string()];
1342        let columns = executor.row_to_columnar(&rows, &column_names);
1343
1344        assert_eq!(columns.len(), 2);
1345        assert_eq!(columns[0].name, "id");
1346        assert_eq!(columns[1].name, "name");
1347        assert_eq!(columns[0].len(), 3);
1348        assert_eq!(columns[1].len(), 3);
1349    }
1350
1351    #[test]
1352    fn test_materialize_selected_rows() {
1353        let executor = VectorizedExecutor::new(1024);
1354
1355        let id_col = ColumnBatch::new(
1356            "id".to_string(),
1357            vec![
1358                CoreSochValue::Int(1),
1359                CoreSochValue::Int(2),
1360                CoreSochValue::Int(3),
1361            ],
1362        );
1363        let name_col = ColumnBatch::new(
1364            "name".to_string(),
1365            vec![
1366                CoreSochValue::Text("Alice".to_string()),
1367                CoreSochValue::Text("Bob".to_string()),
1368                CoreSochValue::Text("Carol".to_string()),
1369            ],
1370        );
1371
1372        // Select rows 0 and 2
1373        let selection = SelectionVector::from_indices(vec![0, 2], 3);
1374
1375        let rows = executor.materialize(&[id_col, name_col], &selection);
1376
1377        assert_eq!(rows.len(), 2);
1378        assert_eq!(rows[0].values[0], CoreSochValue::Int(1));
1379        assert_eq!(rows[0].values[1], CoreSochValue::Text("Alice".to_string()));
1380        assert_eq!(rows[1].values[0], CoreSochValue::Int(3));
1381        assert_eq!(rows[1].values[1], CoreSochValue::Text("Carol".to_string()));
1382    }
1383}