sochdb_query/
soch_ql_executor.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SOCH-QL Query Executor (Task 6)
16//!
17//! End-to-end SOCH-QL query execution pipeline:
18//! 1. parse(sql) → SochQuery
19//! 2. validate(query, catalog) → Result<()>
20//! 3. plan(query, stats) → QueryPlan
21//! 4. execute(plan, storage) → SochTable
22//!
23//! ## Token Reduction Model
24//!
25//! ```text
26//! tokens_JSON(table) ≈ 4 + Σ(|field_name| + |value| + 4) per row
27//! tokens_TOON(table) ≈ header + Σ(|value| + 1) per row
28//! reduction = 1 - tokens_TOON/tokens_JSON ≈ 0.4 to 0.6
29//!
30//! For 100 rows × 5 fields:
31//! JSON: ~100 × (5 × 15) = 7,500 tokens
32//! TOON: ~50 + 100 × 25 = 2,550 tokens → 66% reduction
33//! ```
34
35use crate::soch_ql::{
36    ComparisonOp, LogicalOp, SelectQuery, SortDirection, SochQlParser, SochQuery, SochResult,
37    SochValue, WhereClause,
38};
39#[cfg(test)]
40use crate::soch_ql::{Condition, OrderBy};
41use std::collections::HashMap;
42use sochdb_core::{Catalog, Result, SochDBError, SochRow, SochValue as CoreSochValue};
43#[cfg(test)]
44use sochdb_core::{SochSchema, SochType};
45
46/// Query plan operators
47#[derive(Debug, Clone)]
48pub enum QueryPlan {
49    /// Full table scan
50    TableScan {
51        table: String,
52        columns: Vec<String>,
53        predicate: Option<Box<QueryPlan>>,
54    },
55    /// Index seek (primary or secondary)
56    IndexSeek { index: String, key_range: KeyRange },
57    /// Filter rows
58    Filter {
59        input: Box<QueryPlan>,
60        predicate: Predicate,
61    },
62    /// Project columns
63    Project {
64        input: Box<QueryPlan>,
65        columns: Vec<String>,
66    },
67    /// Sort results
68    Sort {
69        input: Box<QueryPlan>,
70        order_by: Vec<(String, bool)>, // (column, ascending)
71    },
72    /// Limit results
73    Limit {
74        input: Box<QueryPlan>,
75        count: usize,
76        offset: usize,
77    },
78    /// Empty result
79    Empty,
80}
81
82/// Key range for index seeks
83#[derive(Debug, Clone)]
84pub struct KeyRange {
85    pub start: Option<SochValue>,
86    pub end: Option<SochValue>,
87    pub inclusive_start: bool,
88    pub inclusive_end: bool,
89}
90
91impl KeyRange {
92    pub fn all() -> Self {
93        Self {
94            start: None,
95            end: None,
96            inclusive_start: true,
97            inclusive_end: true,
98        }
99    }
100
101    pub fn eq(value: SochValue) -> Self {
102        Self {
103            start: Some(value.clone()),
104            end: Some(value),
105            inclusive_start: true,
106            inclusive_end: true,
107        }
108    }
109}
110
111/// Query predicate
112#[derive(Debug, Clone)]
113pub struct Predicate {
114    pub conditions: Vec<PredicateCondition>,
115    pub operator: LogicalOp,
116}
117
118/// Single predicate condition (uses CoreSochValue for row compatibility)
119#[derive(Debug, Clone)]
120pub struct PredicateCondition {
121    pub column: String,
122    pub operator: ComparisonOp,
123    pub value: CoreSochValue,
124}
125
126impl PredicateCondition {
127    /// Create from soch_ql SochValue
128    pub fn from_soch_ql(column: String, operator: ComparisonOp, value: &SochValue) -> Self {
129        Self {
130            column,
131            operator,
132            value: Self::convert_value(value),
133        }
134    }
135
136    /// Convert soch_ql::SochValue to CoreSochValue
137    fn convert_value(v: &SochValue) -> CoreSochValue {
138        match v {
139            SochValue::Int(i) => CoreSochValue::Int(*i),
140            SochValue::UInt(u) => CoreSochValue::UInt(*u),
141            SochValue::Float(f) => CoreSochValue::Float(*f),
142            SochValue::Text(s) => CoreSochValue::Text(s.clone()),
143            SochValue::Bool(b) => CoreSochValue::Bool(*b),
144            SochValue::Null => CoreSochValue::Null,
145            SochValue::Binary(b) => CoreSochValue::Binary(b.clone()),
146            SochValue::Array(arr) => {
147                CoreSochValue::Array(arr.iter().map(Self::convert_value).collect())
148            }
149        }
150    }
151
152    /// Evaluate predicate against a row
153    pub fn evaluate(&self, row: &SochRow, column_idx: usize) -> bool {
154        if column_idx >= row.values.len() {
155            return false;
156        }
157
158        let row_value = &row.values[column_idx];
159
160        match self.operator {
161            ComparisonOp::Eq => row_value == &self.value,
162            ComparisonOp::Ne => row_value != &self.value,
163            ComparisonOp::Lt => {
164                Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Less)
165            }
166            ComparisonOp::Le => matches!(
167                Self::compare(row_value, &self.value),
168                Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
169            ),
170            ComparisonOp::Gt => {
171                Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Greater)
172            }
173            ComparisonOp::Ge => matches!(
174                Self::compare(row_value, &self.value),
175                Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
176            ),
177            ComparisonOp::Like => Self::like_match(row_value, &self.value),
178            ComparisonOp::In => Self::in_match(row_value, &self.value),
179            ComparisonOp::SimilarTo => {
180                // SimilarTo is used for vector similarity search
181                // Evaluated by the vector index, not row-by-row comparison
182                // For row-level evaluation, we fall back to Like-style matching
183                Self::like_match(row_value, &self.value)
184            }
185        }
186    }
187
188    fn compare(a: &CoreSochValue, b: &CoreSochValue) -> Option<std::cmp::Ordering> {
189        match (a, b) {
190            (CoreSochValue::Int(a), CoreSochValue::Int(b)) => Some(a.cmp(b)),
191            (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => Some(a.cmp(b)),
192            (CoreSochValue::Float(a), CoreSochValue::Float(b)) => a.partial_cmp(b),
193            (CoreSochValue::Text(a), CoreSochValue::Text(b)) => Some(a.cmp(b)),
194            _ => None,
195        }
196    }
197
198    fn like_match(value: &CoreSochValue, pattern: &CoreSochValue) -> bool {
199        match (value, pattern) {
200            (CoreSochValue::Text(v), CoreSochValue::Text(p)) => {
201                // Simple LIKE: % matches any, _ matches one char
202                let regex_pattern = p.replace('%', ".*").replace('_', ".");
203                regex::Regex::new(&format!("^{}$", regex_pattern))
204                    .map(|re| re.is_match(v))
205                    .unwrap_or(false)
206            }
207            _ => false,
208        }
209    }
210
211    fn in_match(value: &CoreSochValue, list: &CoreSochValue) -> bool {
212        match list {
213            CoreSochValue::Array(values) => values.iter().any(|v| value == v),
214            _ => value == list, // Single value comparison fallback
215        }
216    }
217}
218
219impl Predicate {
220    /// Evaluate predicate against a row
221    pub fn evaluate(&self, row: &SochRow, column_map: &HashMap<String, usize>) -> bool {
222        let results: Vec<bool> = self
223            .conditions
224            .iter()
225            .map(|cond| {
226                column_map
227                    .get(&cond.column)
228                    .map(|&idx| cond.evaluate(row, idx))
229                    .unwrap_or(false)
230            })
231            .collect();
232
233        match self.operator {
234            LogicalOp::And => results.iter().all(|&r| r),
235            LogicalOp::Or => results.iter().any(|&r| r),
236        }
237    }
238}
239
240/// SOCH-QL Query Executor
241pub struct SochQlExecutor;
242
243impl SochQlExecutor {
244    /// Create a new executor
245    pub fn new() -> Self {
246        Self
247    }
248
249    /// Execute a SOCH-QL query string
250    pub fn execute(&self, query: &str, catalog: &Catalog) -> Result<SochResult> {
251        // Parse
252        let parsed = SochQlParser::parse(query)
253            .map_err(|e| SochDBError::InvalidArgument(format!("Parse error: {:?}", e)))?;
254
255        // Validate
256        self.validate(&parsed, catalog)?;
257
258        // Plan
259        let plan = self.plan(&parsed, catalog)?;
260
261        // Execute
262        self.execute_plan(&plan, catalog)
263    }
264
265    /// Validate a parsed query against the catalog
266    pub fn validate(&self, query: &SochQuery, catalog: &Catalog) -> Result<()> {
267        match query {
268            SochQuery::Select(select) => {
269                // Check table exists
270                if catalog.get_table(&select.table).is_none() {
271                    return Err(SochDBError::NotFound(format!(
272                        "Table '{}' not found",
273                        select.table
274                    )));
275                }
276
277                // Check columns exist (if not *)
278                if let Some(entry) = catalog.get_table(&select.table)
279                    && let Some(schema) = &entry.schema
280                {
281                    for col in &select.columns {
282                        if col != "*" && !schema.fields.iter().any(|f| &f.name == col) {
283                            return Err(SochDBError::InvalidArgument(format!(
284                                "Column '{}' not found in table '{}'",
285                                col, select.table
286                            )));
287                        }
288                    }
289                }
290
291                Ok(())
292            }
293            SochQuery::Insert(insert) => {
294                // Check table exists
295                if catalog.get_table(&insert.table).is_none() {
296                    return Err(SochDBError::NotFound(format!(
297                        "Table '{}' not found",
298                        insert.table
299                    )));
300                }
301                Ok(())
302            }
303            SochQuery::CreateTable(create) => {
304                // Check table doesn't exist
305                if catalog.get_table(&create.table).is_some() {
306                    return Err(SochDBError::InvalidArgument(format!(
307                        "Table '{}' already exists",
308                        create.table
309                    )));
310                }
311                Ok(())
312            }
313            SochQuery::DropTable { table } => {
314                if catalog.get_table(table).is_none() {
315                    return Err(SochDBError::NotFound(format!(
316                        "Table '{}' not found",
317                        table
318                    )));
319                }
320                Ok(())
321            }
322        }
323    }
324
325    /// Generate a query plan
326    pub fn plan(&self, query: &SochQuery, catalog: &Catalog) -> Result<QueryPlan> {
327        match query {
328            SochQuery::Select(select) => self.plan_select(select, catalog),
329            _ => Err(SochDBError::InvalidArgument(
330                "Only SELECT queries can be planned".to_string(),
331            )),
332        }
333    }
334
335    fn plan_select(&self, select: &SelectQuery, _catalog: &Catalog) -> Result<QueryPlan> {
336        // Start with table scan
337        let mut plan = QueryPlan::TableScan {
338            table: select.table.clone(),
339            columns: select.columns.clone(),
340            predicate: None,
341        };
342
343        // Add filter if WHERE clause present
344        if let Some(where_clause) = &select.where_clause {
345            let predicate = self.build_predicate(where_clause);
346            plan = QueryPlan::Filter {
347                input: Box::new(plan),
348                predicate,
349            };
350        }
351
352        // Add projection (column selection)
353        if !select.columns.contains(&"*".to_string()) {
354            plan = QueryPlan::Project {
355                input: Box::new(plan),
356                columns: select.columns.clone(),
357            };
358        }
359
360        // Add sort if ORDER BY present
361        if let Some(order_by) = &select.order_by {
362            plan = QueryPlan::Sort {
363                input: Box::new(plan),
364                order_by: vec![(
365                    order_by.column.clone(),
366                    matches!(order_by.direction, SortDirection::Asc),
367                )],
368            };
369        }
370
371        // Add limit if present
372        if select.limit.is_some() || select.offset.is_some() {
373            plan = QueryPlan::Limit {
374                input: Box::new(plan),
375                count: select.limit.unwrap_or(usize::MAX),
376                offset: select.offset.unwrap_or(0),
377            };
378        }
379
380        Ok(plan)
381    }
382
383    fn build_predicate(&self, where_clause: &WhereClause) -> Predicate {
384        Predicate {
385            conditions: where_clause
386                .conditions
387                .iter()
388                .map(|c| PredicateCondition::from_soch_ql(c.column.clone(), c.operator, &c.value))
389                .collect(),
390            operator: where_clause.operator,
391        }
392    }
393
394    /// Execute a query plan
395    #[allow(clippy::only_used_in_recursion)]
396    pub fn execute_plan(&self, plan: &QueryPlan, catalog: &Catalog) -> Result<SochResult> {
397        // For now, return empty results (storage integration pending)
398        // This is the interface that will connect to StorageEngine
399        match plan {
400            QueryPlan::Empty => Ok(SochResult {
401                table: "result".to_string(),
402                columns: vec![],
403                rows: vec![],
404            }),
405            QueryPlan::TableScan { table, columns, .. } => {
406                // Get schema from catalog
407                let schema_columns = if let Some(entry) = catalog.get_table(table) {
408                    if let Some(schema) = &entry.schema {
409                        if columns.contains(&"*".to_string()) {
410                            schema.fields.iter().map(|f| f.name.clone()).collect()
411                        } else {
412                            columns.clone()
413                        }
414                    } else {
415                        columns.clone()
416                    }
417                } else {
418                    columns.clone()
419                };
420
421                Ok(SochResult {
422                    table: table.clone(),
423                    columns: schema_columns,
424                    rows: vec![], // Storage integration will populate this
425                })
426            }
427            QueryPlan::Filter { input, .. } => self.execute_plan(input, catalog),
428            QueryPlan::Project { input, columns } => {
429                let mut result = self.execute_plan(input, catalog)?;
430                result.columns = columns.clone();
431                Ok(result)
432            }
433            QueryPlan::Sort { input, .. } => self.execute_plan(input, catalog),
434            QueryPlan::Limit {
435                input,
436                count,
437                offset,
438            } => {
439                let mut result = self.execute_plan(input, catalog)?;
440                result.rows = result.rows.into_iter().skip(*offset).take(*count).collect();
441                Ok(result)
442            }
443            QueryPlan::IndexSeek { .. } => Ok(SochResult {
444                table: "result".to_string(),
445                columns: vec![],
446                rows: vec![],
447            }),
448        }
449    }
450}
451
452impl Default for SochQlExecutor {
453    fn default() -> Self {
454        Self::new()
455    }
456}
457
458/// Execute a SOCH-QL query (convenience function)
459pub fn execute_sochql(query: &str, catalog: &Catalog) -> Result<SochResult> {
460    SochQlExecutor::new().execute(query, catalog)
461}
462
463/// Estimate token reduction for a TOON result vs JSON
464pub fn estimate_token_reduction(result: &SochResult) -> TokenReductionStats {
465    let row_count = result.rows.len();
466    let col_count = result.columns.len();
467
468    if row_count == 0 || col_count == 0 {
469        return TokenReductionStats::default();
470    }
471
472    // Estimate JSON tokens
473    // Format: [{"col1": "val1", "col2": "val2"}, ...]
474    let avg_col_name_len: usize = result.columns.iter().map(|c| c.len()).sum::<usize>() / col_count;
475    let avg_value_len = 10; // Rough estimate
476
477    // JSON: 2 (brackets) + row_count * (2 + col_count * (col_name + 4 + value))
478    let json_tokens = 2 + row_count * (2 + col_count * (avg_col_name_len + 4 + avg_value_len));
479
480    // TOON: header + row_count * (col_count * value + col_count)
481    // Header: table[count]{col1,col2,...}:
482    let header_tokens = result.table.len() + 10 + result.columns.join(",").len();
483    let soch_tokens = header_tokens + row_count * (col_count * avg_value_len + col_count);
484
485    let reduction = 1.0 - (soch_tokens as f64 / json_tokens as f64);
486
487    TokenReductionStats {
488        json_tokens,
489        soch_tokens,
490        reduction_percent: (reduction * 100.0) as u32,
491        row_count,
492        col_count,
493    }
494}
495
496/// Token reduction statistics
497#[derive(Debug, Clone, Default)]
498pub struct TokenReductionStats {
499    /// Estimated JSON tokens
500    pub json_tokens: usize,
501    /// Estimated TOON tokens
502    pub soch_tokens: usize,
503    /// Reduction percentage
504    pub reduction_percent: u32,
505    /// Row count
506    pub row_count: usize,
507    /// Column count
508    pub col_count: usize,
509}
510
511// ============================================================================
512// Task 12: Vectorized Predicate Evaluation
513// ============================================================================
514
515/// Selection vector for batch predicate evaluation
516///
517/// Maintains indices of rows that pass all predicates so far.
518/// Short-circuits at batch level when selection becomes empty.
519#[derive(Debug, Clone)]
520pub struct SelectionVector {
521    /// Selected row indices (sorted)
522    indices: Vec<u32>,
523    /// Original batch size
524    batch_size: usize,
525}
526
527impl SelectionVector {
528    /// Create selection with all rows selected
529    pub fn all(batch_size: usize) -> Self {
530        Self {
531            indices: (0..batch_size as u32).collect(),
532            batch_size,
533        }
534    }
535
536    /// Create empty selection
537    pub fn empty() -> Self {
538        Self {
539            indices: Vec::new(),
540            batch_size: 0,
541        }
542    }
543
544    /// Create from specific indices
545    pub fn from_indices(indices: Vec<u32>, batch_size: usize) -> Self {
546        Self {
547            indices,
548            batch_size,
549        }
550    }
551
552    /// Check if selection is empty (short-circuit condition)
553    #[inline]
554    pub fn is_empty(&self) -> bool {
555        self.indices.is_empty()
556    }
557
558    /// Number of selected rows
559    #[inline]
560    pub fn len(&self) -> usize {
561        self.indices.len()
562    }
563
564    /// Original batch size
565    #[inline]
566    pub fn batch_size(&self) -> usize {
567        self.batch_size
568    }
569
570    /// Selectivity ratio
571    #[inline]
572    pub fn selectivity(&self) -> f64 {
573        if self.batch_size == 0 {
574            0.0
575        } else {
576            self.len() as f64 / self.batch_size as f64
577        }
578    }
579
580    /// Iterate over selected indices
581    pub fn iter(&self) -> impl Iterator<Item = u32> + '_ {
582        self.indices.iter().copied()
583    }
584
585    /// Filter selection with a predicate, returning new selection
586    pub fn filter<F>(&self, pred: F) -> Self
587    where
588        F: Fn(u32) -> bool,
589    {
590        Self {
591            indices: self.indices.iter().copied().filter(|&i| pred(i)).collect(),
592            batch_size: self.batch_size,
593        }
594    }
595
596    /// Extend with masked indices (for SIMD results)
597    pub fn extend_masked(&mut self, start_idx: usize, mask: u16) {
598        for bit in 0..16 {
599            if (mask >> bit) & 1 == 1 {
600                self.indices.push((start_idx + bit) as u32);
601            }
602        }
603    }
604}
605
606/// Columnar batch for vectorized processing
607#[derive(Debug, Clone)]
608pub struct ColumnBatch {
609    /// Column values
610    pub values: Vec<CoreSochValue>,
611    /// Column name
612    pub name: String,
613}
614
615impl ColumnBatch {
616    /// Create from column data
617    pub fn new(name: String, values: Vec<CoreSochValue>) -> Self {
618        Self { values, name }
619    }
620
621    /// Get value at index
622    #[inline]
623    pub fn get(&self, idx: usize) -> Option<&CoreSochValue> {
624        self.values.get(idx)
625    }
626
627    /// Get raw integer data pointer (for SIMD)
628    #[allow(dead_code)]
629    pub fn as_i64_slice(&self) -> Option<Vec<i64>> {
630        self.values
631            .iter()
632            .map(|v| match v {
633                CoreSochValue::Int(i) => Some(*i),
634                CoreSochValue::UInt(u) => Some(*u as i64),
635                _ => None,
636            })
637            .collect()
638    }
639
640    /// Batch size
641    pub fn len(&self) -> usize {
642        self.values.len()
643    }
644
645    /// Is empty
646    pub fn is_empty(&self) -> bool {
647        self.values.is_empty()
648    }
649}
650
651/// Vectorized predicate for batch evaluation
652#[derive(Debug, Clone)]
653pub enum VectorPredicate {
654    /// Integer greater than
655    IntGt { col_idx: usize, threshold: i64 },
656    /// Integer less than
657    IntLt { col_idx: usize, threshold: i64 },
658    /// Integer equals
659    IntEq { col_idx: usize, value: i64 },
660    /// Integer greater or equal
661    IntGe { col_idx: usize, threshold: i64 },
662    /// Integer less or equal
663    IntLe { col_idx: usize, threshold: i64 },
664    /// String equals
665    StrEq { col_idx: usize, value: String },
666    /// String prefix match
667    StrPrefix { col_idx: usize, prefix: String },
668    /// Boolean equals
669    BoolEq { col_idx: usize, value: bool },
670    /// Is null check
671    IsNull { col_idx: usize },
672    /// Is not null check
673    IsNotNull { col_idx: usize },
674}
675
676/// Vectorized executor for batch predicate evaluation
677///
678/// ## Performance Characteristics
679///
680/// - Traditional row-at-a-time: ~100M rows/sec (branch misprediction bound)
681/// - Vectorized with selection vector: ~1B rows/sec (branch-free)
682///
683/// ## Usage
684///
685/// ```ignore
686/// let executor = VectorizedExecutor::new(1024);
687/// let columns = vec![/* column batches */];
688/// let predicates = vec![/* predicates */];
689/// let selection = executor.evaluate_batch(&columns, &predicates);
690/// ```
691pub struct VectorizedExecutor {
692    /// Batch size for processing
693    batch_size: usize,
694}
695
696impl VectorizedExecutor {
697    /// Create with specified batch size
698    pub fn new(batch_size: usize) -> Self {
699        Self { batch_size }
700    }
701
702    /// Default batch size (1024 rows)
703    pub fn default_batch_size() -> usize {
704        1024
705    }
706
707    /// Evaluate predicates on columnar batch
708    ///
709    /// Returns selection vector of rows that pass all predicates.
710    /// Short-circuits at batch level when selection becomes empty.
711    pub fn evaluate_batch(
712        &self,
713        columns: &[ColumnBatch],
714        predicates: &[VectorPredicate],
715    ) -> SelectionVector {
716        if columns.is_empty() {
717            return SelectionVector::empty();
718        }
719
720        let batch_size = columns[0].len().min(self.batch_size);
721        let mut selection = SelectionVector::all(batch_size);
722
723        // Process predicates with short-circuit
724        for predicate in predicates {
725            if selection.is_empty() {
726                break; // Batch-level short-circuit
727            }
728
729            selection = match predicate {
730                VectorPredicate::IntGt { col_idx, threshold } => {
731                    self.filter_int_gt(&columns[*col_idx], *threshold, &selection)
732                }
733                VectorPredicate::IntLt { col_idx, threshold } => {
734                    self.filter_int_lt(&columns[*col_idx], *threshold, &selection)
735                }
736                VectorPredicate::IntEq { col_idx, value } => {
737                    self.filter_int_eq(&columns[*col_idx], *value, &selection)
738                }
739                VectorPredicate::IntGe { col_idx, threshold } => {
740                    self.filter_int_ge(&columns[*col_idx], *threshold, &selection)
741                }
742                VectorPredicate::IntLe { col_idx, threshold } => {
743                    self.filter_int_le(&columns[*col_idx], *threshold, &selection)
744                }
745                VectorPredicate::StrEq { col_idx, value } => {
746                    self.filter_str_eq(&columns[*col_idx], value, &selection)
747                }
748                VectorPredicate::StrPrefix { col_idx, prefix } => {
749                    self.filter_str_prefix(&columns[*col_idx], prefix, &selection)
750                }
751                VectorPredicate::BoolEq { col_idx, value } => {
752                    self.filter_bool_eq(&columns[*col_idx], *value, &selection)
753                }
754                VectorPredicate::IsNull { col_idx } => {
755                    self.filter_is_null(&columns[*col_idx], &selection)
756                }
757                VectorPredicate::IsNotNull { col_idx } => {
758                    self.filter_is_not_null(&columns[*col_idx], &selection)
759                }
760            };
761        }
762
763        selection
764    }
765
766    /// Filter: column > threshold
767    #[inline]
768    fn filter_int_gt(
769        &self,
770        column: &ColumnBatch,
771        threshold: i64,
772        selection: &SelectionVector,
773    ) -> SelectionVector {
774        selection.filter(|idx| match column.get(idx as usize) {
775            Some(CoreSochValue::Int(v)) => *v > threshold,
776            Some(CoreSochValue::UInt(v)) => (*v as i64) > threshold,
777            _ => false,
778        })
779    }
780
781    /// Filter: column < threshold
782    #[inline]
783    fn filter_int_lt(
784        &self,
785        column: &ColumnBatch,
786        threshold: i64,
787        selection: &SelectionVector,
788    ) -> SelectionVector {
789        selection.filter(|idx| match column.get(idx as usize) {
790            Some(CoreSochValue::Int(v)) => *v < threshold,
791            Some(CoreSochValue::UInt(v)) => (*v as i64) < threshold,
792            _ => false,
793        })
794    }
795
796    /// Filter: column == value
797    #[inline]
798    fn filter_int_eq(
799        &self,
800        column: &ColumnBatch,
801        value: i64,
802        selection: &SelectionVector,
803    ) -> SelectionVector {
804        selection.filter(|idx| match column.get(idx as usize) {
805            Some(CoreSochValue::Int(v)) => *v == value,
806            Some(CoreSochValue::UInt(v)) => (*v as i64) == value,
807            _ => false,
808        })
809    }
810
811    /// Filter: column >= threshold
812    #[inline]
813    fn filter_int_ge(
814        &self,
815        column: &ColumnBatch,
816        threshold: i64,
817        selection: &SelectionVector,
818    ) -> SelectionVector {
819        selection.filter(|idx| match column.get(idx as usize) {
820            Some(CoreSochValue::Int(v)) => *v >= threshold,
821            Some(CoreSochValue::UInt(v)) => (*v as i64) >= threshold,
822            _ => false,
823        })
824    }
825
826    /// Filter: column <= threshold
827    #[inline]
828    fn filter_int_le(
829        &self,
830        column: &ColumnBatch,
831        threshold: i64,
832        selection: &SelectionVector,
833    ) -> SelectionVector {
834        selection.filter(|idx| match column.get(idx as usize) {
835            Some(CoreSochValue::Int(v)) => *v <= threshold,
836            Some(CoreSochValue::UInt(v)) => (*v as i64) <= threshold,
837            _ => false,
838        })
839    }
840
841    /// Filter: column == value (string)
842    #[inline]
843    fn filter_str_eq(
844        &self,
845        column: &ColumnBatch,
846        value: &str,
847        selection: &SelectionVector,
848    ) -> SelectionVector {
849        selection.filter(|idx| match column.get(idx as usize) {
850            Some(CoreSochValue::Text(s)) => s == value,
851            _ => false,
852        })
853    }
854
855    /// Filter: column starts with prefix
856    #[inline]
857    fn filter_str_prefix(
858        &self,
859        column: &ColumnBatch,
860        prefix: &str,
861        selection: &SelectionVector,
862    ) -> SelectionVector {
863        selection.filter(|idx| match column.get(idx as usize) {
864            Some(CoreSochValue::Text(s)) => s.starts_with(prefix),
865            _ => false,
866        })
867    }
868
869    /// Filter: column == value (bool)
870    #[inline]
871    fn filter_bool_eq(
872        &self,
873        column: &ColumnBatch,
874        value: bool,
875        selection: &SelectionVector,
876    ) -> SelectionVector {
877        selection.filter(|idx| match column.get(idx as usize) {
878            Some(CoreSochValue::Bool(b)) => *b == value,
879            _ => false,
880        })
881    }
882
883    /// Filter: column IS NULL
884    #[inline]
885    fn filter_is_null(&self, column: &ColumnBatch, selection: &SelectionVector) -> SelectionVector {
886        selection.filter(|idx| matches!(column.get(idx as usize), Some(CoreSochValue::Null)))
887    }
888
889    /// Filter: column IS NOT NULL
890    #[inline]
891    fn filter_is_not_null(
892        &self,
893        column: &ColumnBatch,
894        selection: &SelectionVector,
895    ) -> SelectionVector {
896        selection
897            .filter(|idx| !matches!(column.get(idx as usize), Some(CoreSochValue::Null) | None))
898    }
899
900    /// Materialize selected rows from columnar data
901    pub fn materialize(
902        &self,
903        columns: &[ColumnBatch],
904        selection: &SelectionVector,
905    ) -> Vec<SochRow> {
906        selection
907            .iter()
908            .map(|idx| {
909                let values: Vec<CoreSochValue> = columns
910                    .iter()
911                    .map(|col| {
912                        col.get(idx as usize)
913                            .cloned()
914                            .unwrap_or(CoreSochValue::Null)
915                    })
916                    .collect();
917                SochRow::new(values)
918            })
919            .collect()
920    }
921
922    /// Convert row-oriented data to columnar batches
923    pub fn row_to_columnar(&self, rows: &[SochRow], column_names: &[String]) -> Vec<ColumnBatch> {
924        if rows.is_empty() || column_names.is_empty() {
925            return vec![];
926        }
927
928        let num_cols = column_names.len().min(rows[0].values.len());
929
930        (0..num_cols)
931            .map(|col_idx| {
932                let values: Vec<CoreSochValue> = rows
933                    .iter()
934                    .map(|row| {
935                        row.values
936                            .get(col_idx)
937                            .cloned()
938                            .unwrap_or(CoreSochValue::Null)
939                    })
940                    .collect();
941                ColumnBatch::new(column_names[col_idx].clone(), values)
942            })
943            .collect()
944    }
945}
946
947impl Default for VectorizedExecutor {
948    fn default() -> Self {
949        Self::new(Self::default_batch_size())
950    }
951}
952
953/// Statistics for vectorized execution
954#[derive(Debug, Clone, Default)]
955pub struct VectorizedStats {
956    /// Rows processed
957    pub rows_processed: usize,
958    /// Rows selected
959    pub rows_selected: usize,
960    /// Predicates evaluated
961    pub predicates_evaluated: usize,
962    /// Short-circuits triggered
963    pub short_circuits: usize,
964    /// Processing time (microseconds)
965    pub time_us: u64,
966}
967
968impl VectorizedStats {
969    /// Selectivity ratio
970    pub fn selectivity(&self) -> f64 {
971        if self.rows_processed == 0 {
972            0.0
973        } else {
974            self.rows_selected as f64 / self.rows_processed as f64
975        }
976    }
977
978    /// Rows per second
979    pub fn rows_per_sec(&self) -> f64 {
980        if self.time_us == 0 {
981            0.0
982        } else {
983            self.rows_processed as f64 / (self.time_us as f64 / 1_000_000.0)
984        }
985    }
986}
987
988#[cfg(test)]
989mod tests {
990    use super::*;
991
992    fn test_catalog() -> Catalog {
993        let mut catalog = Catalog::new("test_db");
994
995        let schema = SochSchema::new("users")
996            .field("id", SochType::UInt)
997            .field("name", SochType::Text)
998            .field("score", SochType::Float);
999
1000        catalog.create_table(schema, 1).unwrap();
1001        catalog
1002    }
1003
1004    #[test]
1005    fn test_validate_select() {
1006        let catalog = test_catalog();
1007        let executor = SochQlExecutor::new();
1008
1009        let query = SochQuery::Select(SelectQuery {
1010            columns: vec!["id".to_string(), "name".to_string()],
1011            table: "users".to_string(),
1012            where_clause: None,
1013            order_by: None,
1014            limit: None,
1015            offset: None,
1016        });
1017
1018        assert!(executor.validate(&query, &catalog).is_ok());
1019    }
1020
1021    #[test]
1022    fn test_validate_nonexistent_table() {
1023        let catalog = test_catalog();
1024        let executor = SochQlExecutor::new();
1025
1026        let query = SochQuery::Select(SelectQuery {
1027            columns: vec!["*".to_string()],
1028            table: "nonexistent".to_string(),
1029            where_clause: None,
1030            order_by: None,
1031            limit: None,
1032            offset: None,
1033        });
1034
1035        assert!(executor.validate(&query, &catalog).is_err());
1036    }
1037
1038    #[test]
1039    fn test_plan_select() {
1040        let catalog = test_catalog();
1041        let executor = SochQlExecutor::new();
1042
1043        let select = SelectQuery {
1044            columns: vec!["id".to_string(), "name".to_string()],
1045            table: "users".to_string(),
1046            where_clause: Some(WhereClause {
1047                conditions: vec![Condition {
1048                    column: "score".to_string(),
1049                    operator: ComparisonOp::Gt,
1050                    value: SochValue::Float(80.0),
1051                }],
1052                operator: LogicalOp::And,
1053            }),
1054            order_by: Some(OrderBy {
1055                column: "score".to_string(),
1056                direction: SortDirection::Desc,
1057            }),
1058            limit: Some(10),
1059            offset: None,
1060        };
1061
1062        let plan = executor.plan_select(&select, &catalog).unwrap();
1063
1064        // Should be: Limit(Sort(Project(Filter(TableScan))))
1065        match plan {
1066            QueryPlan::Limit { input, count, .. } => {
1067                assert_eq!(count, 10);
1068                match *input {
1069                    QueryPlan::Sort { input, order_by } => {
1070                        assert_eq!(order_by[0].0, "score");
1071                        assert!(!order_by[0].1); // Descending = false
1072                        match *input {
1073                            QueryPlan::Project { input, columns } => {
1074                                assert_eq!(columns, vec!["id", "name"]);
1075                                match *input {
1076                                    QueryPlan::Filter { predicate, .. } => {
1077                                        assert_eq!(predicate.conditions.len(), 1);
1078                                    }
1079                                    _ => panic!("Expected Filter"),
1080                                }
1081                            }
1082                            _ => panic!("Expected Project"),
1083                        }
1084                    }
1085                    _ => panic!("Expected Sort"),
1086                }
1087            }
1088            _ => panic!("Expected Limit"),
1089        }
1090    }
1091
1092    #[test]
1093    fn test_predicate_evaluation() {
1094        let cond = PredicateCondition {
1095            column: "score".to_string(),
1096            operator: ComparisonOp::Gt,
1097            value: CoreSochValue::Float(80.0),
1098        };
1099
1100        let row_pass = SochRow::new(vec![
1101            CoreSochValue::UInt(1),
1102            CoreSochValue::Text("Alice".to_string()),
1103            CoreSochValue::Float(95.0),
1104        ]);
1105
1106        let row_fail = SochRow::new(vec![
1107            CoreSochValue::UInt(2),
1108            CoreSochValue::Text("Bob".to_string()),
1109            CoreSochValue::Float(75.0),
1110        ]);
1111
1112        assert!(cond.evaluate(&row_pass, 2));
1113        assert!(!cond.evaluate(&row_fail, 2));
1114    }
1115
1116    #[test]
1117    fn test_token_reduction() {
1118        // Use more rows and longer field names to show real reduction
1119        let result = SochResult {
1120            table: "user_statistics".to_string(),
1121            columns: vec![
1122                "user_id".to_string(),
1123                "full_name".to_string(),
1124                "email_address".to_string(),
1125                "registration_date".to_string(),
1126                "last_login".to_string(),
1127            ],
1128            rows: (0..20)
1129                .map(|i| {
1130                    vec![
1131                        SochValue::UInt(i as u64),
1132                        SochValue::Text(format!("User Number {}", i)),
1133                        SochValue::Text(format!("user{}@example.com", i)),
1134                        SochValue::Text("2024-01-15".to_string()),
1135                        SochValue::Text("2024-03-20".to_string()),
1136                    ]
1137                })
1138                .collect(),
1139        };
1140
1141        let stats = estimate_token_reduction(&result);
1142
1143        println!("JSON tokens: {}", stats.json_tokens);
1144        println!("TOON tokens: {}", stats.soch_tokens);
1145        println!("Reduction: {}%", stats.reduction_percent);
1146
1147        // With many rows and repeated column names, TOON should be more efficient
1148        assert!(stats.soch_tokens < stats.json_tokens);
1149        assert!(stats.reduction_percent > 0); // Any reduction is valuable
1150    }
1151
1152    // ========================================================================
1153    // Task 12: Vectorized Predicate Evaluation Tests
1154    // ========================================================================
1155
1156    #[test]
1157    fn test_selection_vector_basic() {
1158        let sel = SelectionVector::all(100);
1159        assert_eq!(sel.len(), 100);
1160        assert!(!sel.is_empty());
1161        assert_eq!(sel.selectivity(), 1.0);
1162
1163        let empty = SelectionVector::empty();
1164        assert!(empty.is_empty());
1165        assert_eq!(empty.selectivity(), 0.0);
1166    }
1167
1168    #[test]
1169    fn test_selection_vector_filter() {
1170        let sel = SelectionVector::all(10);
1171
1172        // Keep only even indices
1173        let filtered = sel.filter(|i| i % 2 == 0);
1174        assert_eq!(filtered.len(), 5);
1175
1176        let indices: Vec<u32> = filtered.iter().collect();
1177        assert_eq!(indices, vec![0, 2, 4, 6, 8]);
1178    }
1179
1180    #[test]
1181    fn test_vectorized_int_filter() {
1182        let executor = VectorizedExecutor::new(1024);
1183
1184        // Create a column with values 0-9
1185        let column = ColumnBatch::new(
1186            "value".to_string(),
1187            (0..10).map(CoreSochValue::Int).collect(),
1188        );
1189
1190        let predicates = vec![VectorPredicate::IntGt {
1191            col_idx: 0,
1192            threshold: 5,
1193        }];
1194
1195        let selection = executor.evaluate_batch(&[column], &predicates);
1196
1197        // Should select 6, 7, 8, 9 (4 values > 5)
1198        assert_eq!(selection.len(), 4);
1199        let indices: Vec<u32> = selection.iter().collect();
1200        assert_eq!(indices, vec![6, 7, 8, 9]);
1201    }
1202
1203    #[test]
1204    fn test_vectorized_multiple_predicates() {
1205        let executor = VectorizedExecutor::new(1024);
1206
1207        // Create columns
1208        let id_col = ColumnBatch::new("id".to_string(), (0..100).map(CoreSochValue::Int).collect());
1209
1210        let status_col = ColumnBatch::new(
1211            "active".to_string(),
1212            (0..100).map(|i| CoreSochValue::Bool(i % 2 == 0)).collect(),
1213        );
1214
1215        let predicates = vec![
1216            VectorPredicate::IntGe {
1217                col_idx: 0,
1218                threshold: 50,
1219            },
1220            VectorPredicate::IntLt {
1221                col_idx: 0,
1222                threshold: 60,
1223            },
1224            VectorPredicate::BoolEq {
1225                col_idx: 1,
1226                value: true,
1227            },
1228        ];
1229
1230        let selection = executor.evaluate_batch(&[id_col, status_col], &predicates);
1231
1232        // Should select: 50, 52, 54, 56, 58 (even numbers in [50, 60))
1233        assert_eq!(selection.len(), 5);
1234        let indices: Vec<u32> = selection.iter().collect();
1235        assert_eq!(indices, vec![50, 52, 54, 56, 58]);
1236    }
1237
1238    #[test]
1239    fn test_vectorized_short_circuit() {
1240        let executor = VectorizedExecutor::new(1024);
1241
1242        // Create a column with all values < 0
1243        let column = ColumnBatch::new(
1244            "value".to_string(),
1245            (0..100).map(|_| CoreSochValue::Int(-1)).collect(),
1246        );
1247
1248        // First predicate eliminates everything
1249        let predicates = vec![
1250            VectorPredicate::IntGt {
1251                col_idx: 0,
1252                threshold: 0,
1253            },
1254            // These should not even be evaluated due to short-circuit
1255            VectorPredicate::IntLt {
1256                col_idx: 0,
1257                threshold: 100,
1258            },
1259            VectorPredicate::IntEq {
1260                col_idx: 0,
1261                value: 50,
1262            },
1263        ];
1264
1265        let selection = executor.evaluate_batch(&[column], &predicates);
1266        assert!(selection.is_empty());
1267    }
1268
1269    #[test]
1270    fn test_vectorized_string_predicates() {
1271        let executor = VectorizedExecutor::new(1024);
1272
1273        let names = [
1274            "Alice", "Bob", "Carol", "Dave", "Alice", "Alice", "Bob", "Carol",
1275        ];
1276        let column = ColumnBatch::new(
1277            "name".to_string(),
1278            names
1279                .iter()
1280                .map(|s| CoreSochValue::Text(s.to_string()))
1281                .collect(),
1282        );
1283
1284        let predicates = vec![VectorPredicate::StrEq {
1285            col_idx: 0,
1286            value: "Alice".to_string(),
1287        }];
1288
1289        let selection = executor.evaluate_batch(&[column], &predicates);
1290
1291        // Should select indices 0, 4, 5 (where name == "Alice")
1292        assert_eq!(selection.len(), 3);
1293        let indices: Vec<u32> = selection.iter().collect();
1294        assert_eq!(indices, vec![0, 4, 5]);
1295    }
1296
1297    #[test]
1298    fn test_vectorized_null_handling() {
1299        let executor = VectorizedExecutor::new(1024);
1300
1301        let values = vec![
1302            CoreSochValue::Int(1),
1303            CoreSochValue::Null,
1304            CoreSochValue::Int(2),
1305            CoreSochValue::Null,
1306            CoreSochValue::Int(3),
1307        ];
1308        let column = ColumnBatch::new("value".to_string(), values);
1309
1310        let predicates = vec![VectorPredicate::IsNull { col_idx: 0 }];
1311        let null_selection = executor.evaluate_batch(std::slice::from_ref(&column), &predicates);
1312        assert_eq!(null_selection.len(), 2); // indices 1, 3
1313
1314        let not_null_predicates = vec![VectorPredicate::IsNotNull { col_idx: 0 }];
1315        let not_null_selection = executor.evaluate_batch(&[column], &not_null_predicates);
1316        assert_eq!(not_null_selection.len(), 3); // indices 0, 2, 4
1317    }
1318
1319    #[test]
1320    fn test_row_to_columnar_conversion() {
1321        let executor = VectorizedExecutor::new(1024);
1322
1323        let rows = vec![
1324            SochRow::new(vec![
1325                CoreSochValue::Int(1),
1326                CoreSochValue::Text("Alice".to_string()),
1327            ]),
1328            SochRow::new(vec![
1329                CoreSochValue::Int(2),
1330                CoreSochValue::Text("Bob".to_string()),
1331            ]),
1332            SochRow::new(vec![
1333                CoreSochValue::Int(3),
1334                CoreSochValue::Text("Carol".to_string()),
1335            ]),
1336        ];
1337
1338        let column_names = vec!["id".to_string(), "name".to_string()];
1339        let columns = executor.row_to_columnar(&rows, &column_names);
1340
1341        assert_eq!(columns.len(), 2);
1342        assert_eq!(columns[0].name, "id");
1343        assert_eq!(columns[1].name, "name");
1344        assert_eq!(columns[0].len(), 3);
1345        assert_eq!(columns[1].len(), 3);
1346    }
1347
1348    #[test]
1349    fn test_materialize_selected_rows() {
1350        let executor = VectorizedExecutor::new(1024);
1351
1352        let id_col = ColumnBatch::new(
1353            "id".to_string(),
1354            vec![
1355                CoreSochValue::Int(1),
1356                CoreSochValue::Int(2),
1357                CoreSochValue::Int(3),
1358            ],
1359        );
1360        let name_col = ColumnBatch::new(
1361            "name".to_string(),
1362            vec![
1363                CoreSochValue::Text("Alice".to_string()),
1364                CoreSochValue::Text("Bob".to_string()),
1365                CoreSochValue::Text("Carol".to_string()),
1366            ],
1367        );
1368
1369        // Select rows 0 and 2
1370        let selection = SelectionVector::from_indices(vec![0, 2], 3);
1371
1372        let rows = executor.materialize(&[id_col, name_col], &selection);
1373
1374        assert_eq!(rows.len(), 2);
1375        assert_eq!(rows[0].values[0], CoreSochValue::Int(1));
1376        assert_eq!(rows[0].values[1], CoreSochValue::Text("Alice".to_string()));
1377        assert_eq!(rows[1].values[0], CoreSochValue::Int(3));
1378        assert_eq!(rows[1].values[1], CoreSochValue::Text("Carol".to_string()));
1379    }
1380}