1use crate::soch_ql::{
39 ComparisonOp, LogicalOp, SelectQuery, SortDirection, SochQlParser, SochQuery, SochResult,
40 SochValue, WhereClause,
41};
42#[cfg(test)]
43use crate::soch_ql::{Condition, OrderBy};
44use std::collections::HashMap;
45use sochdb_core::{Catalog, Result, SochDBError, SochRow, SochValue as CoreSochValue};
46#[cfg(test)]
47use sochdb_core::{SochSchema, SochType};
48
49#[derive(Debug, Clone)]
51pub enum QueryPlan {
52 TableScan {
54 table: String,
55 columns: Vec<String>,
56 predicate: Option<Box<QueryPlan>>,
57 },
58 IndexSeek { index: String, key_range: KeyRange },
60 Filter {
62 input: Box<QueryPlan>,
63 predicate: Predicate,
64 },
65 Project {
67 input: Box<QueryPlan>,
68 columns: Vec<String>,
69 },
70 Sort {
72 input: Box<QueryPlan>,
73 order_by: Vec<(String, bool)>, },
75 Limit {
77 input: Box<QueryPlan>,
78 count: usize,
79 offset: usize,
80 },
81 Empty,
83}
84
85#[derive(Debug, Clone)]
87pub struct KeyRange {
88 pub start: Option<SochValue>,
89 pub end: Option<SochValue>,
90 pub inclusive_start: bool,
91 pub inclusive_end: bool,
92}
93
94impl KeyRange {
95 pub fn all() -> Self {
96 Self {
97 start: None,
98 end: None,
99 inclusive_start: true,
100 inclusive_end: true,
101 }
102 }
103
104 pub fn eq(value: SochValue) -> Self {
105 Self {
106 start: Some(value.clone()),
107 end: Some(value),
108 inclusive_start: true,
109 inclusive_end: true,
110 }
111 }
112}
113
114#[derive(Debug, Clone)]
116pub struct Predicate {
117 pub conditions: Vec<PredicateCondition>,
118 pub operator: LogicalOp,
119}
120
121#[derive(Debug, Clone)]
123pub struct PredicateCondition {
124 pub column: String,
125 pub operator: ComparisonOp,
126 pub value: CoreSochValue,
127}
128
129impl PredicateCondition {
130 pub fn from_soch_ql(column: String, operator: ComparisonOp, value: &SochValue) -> Self {
132 Self {
133 column,
134 operator,
135 value: Self::convert_value(value),
136 }
137 }
138
139 fn convert_value(v: &SochValue) -> CoreSochValue {
141 match v {
142 SochValue::Int(i) => CoreSochValue::Int(*i),
143 SochValue::UInt(u) => CoreSochValue::UInt(*u),
144 SochValue::Float(f) => CoreSochValue::Float(*f),
145 SochValue::Text(s) => CoreSochValue::Text(s.clone()),
146 SochValue::Bool(b) => CoreSochValue::Bool(*b),
147 SochValue::Null => CoreSochValue::Null,
148 SochValue::Binary(b) => CoreSochValue::Binary(b.clone()),
149 SochValue::Array(arr) => {
150 CoreSochValue::Array(arr.iter().map(Self::convert_value).collect())
151 }
152 }
153 }
154
155 pub fn evaluate(&self, row: &SochRow, column_idx: usize) -> bool {
157 if column_idx >= row.values.len() {
158 return false;
159 }
160
161 let row_value = &row.values[column_idx];
162
163 match self.operator {
164 ComparisonOp::Eq => row_value == &self.value,
165 ComparisonOp::Ne => row_value != &self.value,
166 ComparisonOp::Lt => {
167 Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Less)
168 }
169 ComparisonOp::Le => matches!(
170 Self::compare(row_value, &self.value),
171 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
172 ),
173 ComparisonOp::Gt => {
174 Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Greater)
175 }
176 ComparisonOp::Ge => matches!(
177 Self::compare(row_value, &self.value),
178 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
179 ),
180 ComparisonOp::Like => Self::like_match(row_value, &self.value),
181 ComparisonOp::In => Self::in_match(row_value, &self.value),
182 ComparisonOp::SimilarTo => {
183 Self::like_match(row_value, &self.value)
187 }
188 }
189 }
190
191 fn compare(a: &CoreSochValue, b: &CoreSochValue) -> Option<std::cmp::Ordering> {
192 match (a, b) {
193 (CoreSochValue::Int(a), CoreSochValue::Int(b)) => Some(a.cmp(b)),
194 (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => Some(a.cmp(b)),
195 (CoreSochValue::Float(a), CoreSochValue::Float(b)) => a.partial_cmp(b),
196 (CoreSochValue::Text(a), CoreSochValue::Text(b)) => Some(a.cmp(b)),
197 _ => None,
198 }
199 }
200
201 fn like_match(value: &CoreSochValue, pattern: &CoreSochValue) -> bool {
202 match (value, pattern) {
203 (CoreSochValue::Text(v), CoreSochValue::Text(p)) => {
204 let regex_pattern = p.replace('%', ".*").replace('_', ".");
206 regex::Regex::new(&format!("^{}$", regex_pattern))
207 .map(|re| re.is_match(v))
208 .unwrap_or(false)
209 }
210 _ => false,
211 }
212 }
213
214 fn in_match(value: &CoreSochValue, list: &CoreSochValue) -> bool {
215 match list {
216 CoreSochValue::Array(values) => values.iter().any(|v| value == v),
217 _ => value == list, }
219 }
220}
221
222impl Predicate {
223 pub fn evaluate(&self, row: &SochRow, column_map: &HashMap<String, usize>) -> bool {
225 let results: Vec<bool> = self
226 .conditions
227 .iter()
228 .map(|cond| {
229 column_map
230 .get(&cond.column)
231 .map(|&idx| cond.evaluate(row, idx))
232 .unwrap_or(false)
233 })
234 .collect();
235
236 match self.operator {
237 LogicalOp::And => results.iter().all(|&r| r),
238 LogicalOp::Or => results.iter().any(|&r| r),
239 }
240 }
241}
242
243pub struct SochQlExecutor {
248 storage: Option<std::sync::Arc<dyn crate::optimizer_integration::StorageBackend>>,
249}
250
251impl SochQlExecutor {
252 pub fn new() -> Self {
254 Self { storage: None }
255 }
256
257 pub fn with_storage(
259 storage: std::sync::Arc<dyn crate::optimizer_integration::StorageBackend>,
260 ) -> Self {
261 Self {
262 storage: Some(storage),
263 }
264 }
265
266 pub fn execute(&self, query: &str, catalog: &Catalog) -> Result<SochResult> {
268 let parsed = SochQlParser::parse(query)
270 .map_err(|e| SochDBError::InvalidArgument(format!("Parse error: {:?}", e)))?;
271
272 self.validate(&parsed, catalog)?;
274
275 let plan = self.plan(&parsed, catalog)?;
277
278 self.execute_plan(&plan, catalog)
280 }
281
282 pub fn validate(&self, query: &SochQuery, catalog: &Catalog) -> Result<()> {
284 match query {
285 SochQuery::Select(select) => {
286 if catalog.get_table(&select.table).is_none() {
288 return Err(SochDBError::NotFound(format!(
289 "Table '{}' not found",
290 select.table
291 )));
292 }
293
294 if let Some(entry) = catalog.get_table(&select.table)
296 && let Some(schema) = &entry.schema
297 {
298 for col in &select.columns {
299 if col != "*" && !schema.fields.iter().any(|f| &f.name == col) {
300 return Err(SochDBError::InvalidArgument(format!(
301 "Column '{}' not found in table '{}'",
302 col, select.table
303 )));
304 }
305 }
306 }
307
308 Ok(())
309 }
310 SochQuery::Insert(insert) => {
311 if catalog.get_table(&insert.table).is_none() {
313 return Err(SochDBError::NotFound(format!(
314 "Table '{}' not found",
315 insert.table
316 )));
317 }
318 Ok(())
319 }
320 SochQuery::CreateTable(create) => {
321 if catalog.get_table(&create.table).is_some() {
323 return Err(SochDBError::InvalidArgument(format!(
324 "Table '{}' already exists",
325 create.table
326 )));
327 }
328 Ok(())
329 }
330 SochQuery::DropTable { table } => {
331 if catalog.get_table(table).is_none() {
332 return Err(SochDBError::NotFound(format!(
333 "Table '{}' not found",
334 table
335 )));
336 }
337 Ok(())
338 }
339 }
340 }
341
342 pub fn plan(&self, query: &SochQuery, catalog: &Catalog) -> Result<QueryPlan> {
344 match query {
345 SochQuery::Select(select) => self.plan_select(select, catalog),
346 _ => Err(SochDBError::InvalidArgument(
347 "Only SELECT queries can be planned".to_string(),
348 )),
349 }
350 }
351
352 fn plan_select(&self, select: &SelectQuery, _catalog: &Catalog) -> Result<QueryPlan> {
353 let mut plan = QueryPlan::TableScan {
355 table: select.table.clone(),
356 columns: select.columns.clone(),
357 predicate: None,
358 };
359
360 if let Some(where_clause) = &select.where_clause {
362 let predicate = self.build_predicate(where_clause);
363 plan = QueryPlan::Filter {
364 input: Box::new(plan),
365 predicate,
366 };
367 }
368
369 if !select.columns.contains(&"*".to_string()) {
371 plan = QueryPlan::Project {
372 input: Box::new(plan),
373 columns: select.columns.clone(),
374 };
375 }
376
377 if let Some(order_by) = &select.order_by {
379 plan = QueryPlan::Sort {
380 input: Box::new(plan),
381 order_by: vec![(
382 order_by.column.clone(),
383 matches!(order_by.direction, SortDirection::Asc),
384 )],
385 };
386 }
387
388 if select.limit.is_some() || select.offset.is_some() {
390 plan = QueryPlan::Limit {
391 input: Box::new(plan),
392 count: select.limit.unwrap_or(usize::MAX),
393 offset: select.offset.unwrap_or(0),
394 };
395 }
396
397 Ok(plan)
398 }
399
400 fn build_predicate(&self, where_clause: &WhereClause) -> Predicate {
401 Predicate {
402 conditions: where_clause
403 .conditions
404 .iter()
405 .map(|c| PredicateCondition::from_soch_ql(c.column.clone(), c.operator, &c.value))
406 .collect(),
407 operator: where_clause.operator,
408 }
409 }
410
411 #[allow(clippy::only_used_in_recursion)]
416 pub fn execute_plan(&self, plan: &QueryPlan, catalog: &Catalog) -> Result<SochResult> {
417 match plan {
418 QueryPlan::Empty => Ok(SochResult {
419 table: "result".to_string(),
420 columns: vec![],
421 rows: vec![],
422 }),
423 QueryPlan::TableScan { table, columns, .. } => {
424 let schema_columns = if let Some(entry) = catalog.get_table(table) {
426 if let Some(schema) = &entry.schema {
427 if columns.contains(&"*".to_string()) {
428 schema.fields.iter().map(|f| f.name.clone()).collect()
429 } else {
430 columns.clone()
431 }
432 } else {
433 columns.clone()
434 }
435 } else {
436 columns.clone()
437 };
438
439 let rows = if let Some(storage) = &self.storage {
441 let raw_rows = storage.table_scan(table, &schema_columns, None)?;
442 raw_rows
443 .into_iter()
444 .map(|row| {
445 schema_columns
446 .iter()
447 .map(|c| row.get(c).cloned().unwrap_or(SochValue::Null))
448 .collect()
449 })
450 .collect()
451 } else {
452 vec![] };
454
455 Ok(SochResult {
456 table: table.clone(),
457 columns: schema_columns,
458 rows,
459 })
460 }
461 QueryPlan::Filter { input, predicate } => {
462 let mut result = self.execute_plan(input, catalog)?;
463 let col_map: HashMap<String, usize> = result
465 .columns
466 .iter()
467 .enumerate()
468 .map(|(i, c)| (c.clone(), i))
469 .collect();
470 result.rows.retain(|row| {
472 let matches: Vec<bool> = predicate
473 .conditions
474 .iter()
475 .map(|cond| {
476 if let Some(&idx) = col_map.get(&cond.column) {
477 if idx < row.len() {
478 Self::eval_predicate_condition(cond, &row[idx])
479 } else {
480 false
481 }
482 } else {
483 false
484 }
485 })
486 .collect();
487 match predicate.operator {
488 LogicalOp::And => matches.iter().all(|&m| m),
489 LogicalOp::Or => matches.iter().any(|&m| m),
490 }
491 });
492 Ok(result)
493 }
494 QueryPlan::Project { input, columns } => {
495 let mut result = self.execute_plan(input, catalog)?;
496 let col_map: HashMap<String, usize> = result
498 .columns
499 .iter()
500 .enumerate()
501 .map(|(i, c)| (c.clone(), i))
502 .collect();
503 result.rows = result
505 .rows
506 .into_iter()
507 .map(|row| {
508 columns
509 .iter()
510 .map(|c| {
511 col_map
512 .get(c)
513 .and_then(|&i| row.get(i).cloned())
514 .unwrap_or(SochValue::Null)
515 })
516 .collect()
517 })
518 .collect();
519 result.columns = columns.clone();
520 Ok(result)
521 }
522 QueryPlan::Sort { input, order_by } => {
523 let mut result = self.execute_plan(input, catalog)?;
524 let col_map: HashMap<String, usize> = result
525 .columns
526 .iter()
527 .enumerate()
528 .map(|(i, c)| (c.clone(), i))
529 .collect();
530 result.rows.sort_by(|a, b| {
531 for (col, ascending) in order_by {
532 if let Some(&idx) = col_map.get(col) {
533 let va = a.get(idx);
534 let vb = b.get(idx);
535 let cmp = Self::compare_soch_values(va, vb);
536 let cmp = if *ascending { cmp } else { cmp.reverse() };
537 if cmp != std::cmp::Ordering::Equal {
538 return cmp;
539 }
540 }
541 }
542 std::cmp::Ordering::Equal
543 });
544 Ok(result)
545 }
546 QueryPlan::Limit {
547 input,
548 count,
549 offset,
550 } => {
551 let mut result = self.execute_plan(input, catalog)?;
552 result.rows = result.rows.into_iter().skip(*offset).take(*count).collect();
553 Ok(result)
554 }
555 QueryPlan::IndexSeek { .. } => {
556 Ok(SochResult {
558 table: "result".to_string(),
559 columns: vec![],
560 rows: vec![],
561 })
562 }
563 }
564 }
565
566 fn compare_soch_values(a: Option<&SochValue>, b: Option<&SochValue>) -> std::cmp::Ordering {
568 match (a, b) {
569 (None, None) => std::cmp::Ordering::Equal,
570 (None, Some(_)) => std::cmp::Ordering::Less,
571 (Some(_), None) => std::cmp::Ordering::Greater,
572 (Some(a), Some(b)) => match (a, b) {
573 (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
574 (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
575 (SochValue::Float(a), SochValue::Float(b)) => {
576 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
577 }
578 (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
579 (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
580 _ => std::cmp::Ordering::Equal,
581 },
582 }
583 }
584
585 fn eval_predicate_condition(cond: &PredicateCondition, value: &SochValue) -> bool {
587 let core_val = PredicateCondition::convert_value(value);
589 match cond.operator {
590 ComparisonOp::Eq => core_val == cond.value,
591 ComparisonOp::Ne => core_val != cond.value,
592 ComparisonOp::Lt => {
593 PredicateCondition::compare(&core_val, &cond.value)
594 == Some(std::cmp::Ordering::Less)
595 }
596 ComparisonOp::Le => matches!(
597 PredicateCondition::compare(&core_val, &cond.value),
598 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
599 ),
600 ComparisonOp::Gt => {
601 PredicateCondition::compare(&core_val, &cond.value)
602 == Some(std::cmp::Ordering::Greater)
603 }
604 ComparisonOp::Ge => matches!(
605 PredicateCondition::compare(&core_val, &cond.value),
606 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
607 ),
608 ComparisonOp::Like => PredicateCondition::like_match(&core_val, &cond.value),
609 ComparisonOp::In => PredicateCondition::in_match(&core_val, &cond.value),
610 ComparisonOp::SimilarTo => PredicateCondition::like_match(&core_val, &cond.value),
611 }
612 }
613}
614
615impl Default for SochQlExecutor {
616 fn default() -> Self {
617 Self { storage: None }
618 }
619}
620
621pub fn execute_sochql(query: &str, catalog: &Catalog) -> Result<SochResult> {
623 SochQlExecutor::new().execute(query, catalog)
624}
625
626pub fn estimate_token_reduction(result: &SochResult) -> TokenReductionStats {
628 let row_count = result.rows.len();
629 let col_count = result.columns.len();
630
631 if row_count == 0 || col_count == 0 {
632 return TokenReductionStats::default();
633 }
634
635 let avg_col_name_len: usize = result.columns.iter().map(|c| c.len()).sum::<usize>() / col_count;
638 let avg_value_len = 10; let json_tokens = 2 + row_count * (2 + col_count * (avg_col_name_len + 4 + avg_value_len));
642
643 let header_tokens = result.table.len() + 10 + result.columns.join(",").len();
646 let soch_tokens = header_tokens + row_count * (col_count * avg_value_len + col_count);
647
648 let reduction = 1.0 - (soch_tokens as f64 / json_tokens as f64);
649
650 TokenReductionStats {
651 json_tokens,
652 soch_tokens,
653 reduction_percent: (reduction * 100.0) as u32,
654 row_count,
655 col_count,
656 }
657}
658
659#[derive(Debug, Clone, Default)]
661pub struct TokenReductionStats {
662 pub json_tokens: usize,
664 pub soch_tokens: usize,
666 pub reduction_percent: u32,
668 pub row_count: usize,
670 pub col_count: usize,
672}
673
674#[derive(Debug, Clone)]
683pub struct SelectionVector {
684 indices: Vec<u32>,
686 batch_size: usize,
688}
689
690impl SelectionVector {
691 pub fn all(batch_size: usize) -> Self {
693 Self {
694 indices: (0..batch_size as u32).collect(),
695 batch_size,
696 }
697 }
698
699 pub fn empty() -> Self {
701 Self {
702 indices: Vec::new(),
703 batch_size: 0,
704 }
705 }
706
707 pub fn from_indices(indices: Vec<u32>, batch_size: usize) -> Self {
709 Self {
710 indices,
711 batch_size,
712 }
713 }
714
715 #[inline]
717 pub fn is_empty(&self) -> bool {
718 self.indices.is_empty()
719 }
720
721 #[inline]
723 pub fn len(&self) -> usize {
724 self.indices.len()
725 }
726
727 #[inline]
729 pub fn batch_size(&self) -> usize {
730 self.batch_size
731 }
732
733 #[inline]
735 pub fn selectivity(&self) -> f64 {
736 if self.batch_size == 0 {
737 0.0
738 } else {
739 self.len() as f64 / self.batch_size as f64
740 }
741 }
742
743 pub fn iter(&self) -> impl Iterator<Item = u32> + '_ {
745 self.indices.iter().copied()
746 }
747
748 pub fn filter<F>(&self, pred: F) -> Self
750 where
751 F: Fn(u32) -> bool,
752 {
753 Self {
754 indices: self.indices.iter().copied().filter(|&i| pred(i)).collect(),
755 batch_size: self.batch_size,
756 }
757 }
758
759 pub fn extend_masked(&mut self, start_idx: usize, mask: u16) {
761 for bit in 0..16 {
762 if (mask >> bit) & 1 == 1 {
763 self.indices.push((start_idx + bit) as u32);
764 }
765 }
766 }
767}
768
769#[derive(Debug, Clone)]
771pub struct ColumnBatch {
772 pub values: Vec<CoreSochValue>,
774 pub name: String,
776}
777
778impl ColumnBatch {
779 pub fn new(name: String, values: Vec<CoreSochValue>) -> Self {
781 Self { values, name }
782 }
783
784 #[inline]
786 pub fn get(&self, idx: usize) -> Option<&CoreSochValue> {
787 self.values.get(idx)
788 }
789
790 #[allow(dead_code)]
792 pub fn as_i64_slice(&self) -> Option<Vec<i64>> {
793 self.values
794 .iter()
795 .map(|v| match v {
796 CoreSochValue::Int(i) => Some(*i),
797 CoreSochValue::UInt(u) => Some(*u as i64),
798 _ => None,
799 })
800 .collect()
801 }
802
803 pub fn len(&self) -> usize {
805 self.values.len()
806 }
807
808 pub fn is_empty(&self) -> bool {
810 self.values.is_empty()
811 }
812}
813
814#[derive(Debug, Clone)]
816pub enum VectorPredicate {
817 IntGt { col_idx: usize, threshold: i64 },
819 IntLt { col_idx: usize, threshold: i64 },
821 IntEq { col_idx: usize, value: i64 },
823 IntGe { col_idx: usize, threshold: i64 },
825 IntLe { col_idx: usize, threshold: i64 },
827 StrEq { col_idx: usize, value: String },
829 StrPrefix { col_idx: usize, prefix: String },
831 BoolEq { col_idx: usize, value: bool },
833 IsNull { col_idx: usize },
835 IsNotNull { col_idx: usize },
837}
838
839pub struct VectorizedExecutor {
855 batch_size: usize,
857}
858
859impl VectorizedExecutor {
860 pub fn new(batch_size: usize) -> Self {
862 Self { batch_size }
863 }
864
865 pub fn default_batch_size() -> usize {
867 1024
868 }
869
870 pub fn evaluate_batch(
875 &self,
876 columns: &[ColumnBatch],
877 predicates: &[VectorPredicate],
878 ) -> SelectionVector {
879 if columns.is_empty() {
880 return SelectionVector::empty();
881 }
882
883 let batch_size = columns[0].len().min(self.batch_size);
884 let mut selection = SelectionVector::all(batch_size);
885
886 for predicate in predicates {
888 if selection.is_empty() {
889 break; }
891
892 selection = match predicate {
893 VectorPredicate::IntGt { col_idx, threshold } => {
894 self.filter_int_gt(&columns[*col_idx], *threshold, &selection)
895 }
896 VectorPredicate::IntLt { col_idx, threshold } => {
897 self.filter_int_lt(&columns[*col_idx], *threshold, &selection)
898 }
899 VectorPredicate::IntEq { col_idx, value } => {
900 self.filter_int_eq(&columns[*col_idx], *value, &selection)
901 }
902 VectorPredicate::IntGe { col_idx, threshold } => {
903 self.filter_int_ge(&columns[*col_idx], *threshold, &selection)
904 }
905 VectorPredicate::IntLe { col_idx, threshold } => {
906 self.filter_int_le(&columns[*col_idx], *threshold, &selection)
907 }
908 VectorPredicate::StrEq { col_idx, value } => {
909 self.filter_str_eq(&columns[*col_idx], value, &selection)
910 }
911 VectorPredicate::StrPrefix { col_idx, prefix } => {
912 self.filter_str_prefix(&columns[*col_idx], prefix, &selection)
913 }
914 VectorPredicate::BoolEq { col_idx, value } => {
915 self.filter_bool_eq(&columns[*col_idx], *value, &selection)
916 }
917 VectorPredicate::IsNull { col_idx } => {
918 self.filter_is_null(&columns[*col_idx], &selection)
919 }
920 VectorPredicate::IsNotNull { col_idx } => {
921 self.filter_is_not_null(&columns[*col_idx], &selection)
922 }
923 };
924 }
925
926 selection
927 }
928
929 #[inline]
931 fn filter_int_gt(
932 &self,
933 column: &ColumnBatch,
934 threshold: i64,
935 selection: &SelectionVector,
936 ) -> SelectionVector {
937 selection.filter(|idx| match column.get(idx as usize) {
938 Some(CoreSochValue::Int(v)) => *v > threshold,
939 Some(CoreSochValue::UInt(v)) => (*v as i64) > threshold,
940 _ => false,
941 })
942 }
943
944 #[inline]
946 fn filter_int_lt(
947 &self,
948 column: &ColumnBatch,
949 threshold: i64,
950 selection: &SelectionVector,
951 ) -> SelectionVector {
952 selection.filter(|idx| match column.get(idx as usize) {
953 Some(CoreSochValue::Int(v)) => *v < threshold,
954 Some(CoreSochValue::UInt(v)) => (*v as i64) < threshold,
955 _ => false,
956 })
957 }
958
959 #[inline]
961 fn filter_int_eq(
962 &self,
963 column: &ColumnBatch,
964 value: i64,
965 selection: &SelectionVector,
966 ) -> SelectionVector {
967 selection.filter(|idx| match column.get(idx as usize) {
968 Some(CoreSochValue::Int(v)) => *v == value,
969 Some(CoreSochValue::UInt(v)) => (*v as i64) == value,
970 _ => false,
971 })
972 }
973
974 #[inline]
976 fn filter_int_ge(
977 &self,
978 column: &ColumnBatch,
979 threshold: i64,
980 selection: &SelectionVector,
981 ) -> SelectionVector {
982 selection.filter(|idx| match column.get(idx as usize) {
983 Some(CoreSochValue::Int(v)) => *v >= threshold,
984 Some(CoreSochValue::UInt(v)) => (*v as i64) >= threshold,
985 _ => false,
986 })
987 }
988
989 #[inline]
991 fn filter_int_le(
992 &self,
993 column: &ColumnBatch,
994 threshold: i64,
995 selection: &SelectionVector,
996 ) -> SelectionVector {
997 selection.filter(|idx| match column.get(idx as usize) {
998 Some(CoreSochValue::Int(v)) => *v <= threshold,
999 Some(CoreSochValue::UInt(v)) => (*v as i64) <= threshold,
1000 _ => false,
1001 })
1002 }
1003
1004 #[inline]
1006 fn filter_str_eq(
1007 &self,
1008 column: &ColumnBatch,
1009 value: &str,
1010 selection: &SelectionVector,
1011 ) -> SelectionVector {
1012 selection.filter(|idx| match column.get(idx as usize) {
1013 Some(CoreSochValue::Text(s)) => s == value,
1014 _ => false,
1015 })
1016 }
1017
1018 #[inline]
1020 fn filter_str_prefix(
1021 &self,
1022 column: &ColumnBatch,
1023 prefix: &str,
1024 selection: &SelectionVector,
1025 ) -> SelectionVector {
1026 selection.filter(|idx| match column.get(idx as usize) {
1027 Some(CoreSochValue::Text(s)) => s.starts_with(prefix),
1028 _ => false,
1029 })
1030 }
1031
1032 #[inline]
1034 fn filter_bool_eq(
1035 &self,
1036 column: &ColumnBatch,
1037 value: bool,
1038 selection: &SelectionVector,
1039 ) -> SelectionVector {
1040 selection.filter(|idx| match column.get(idx as usize) {
1041 Some(CoreSochValue::Bool(b)) => *b == value,
1042 _ => false,
1043 })
1044 }
1045
1046 #[inline]
1048 fn filter_is_null(&self, column: &ColumnBatch, selection: &SelectionVector) -> SelectionVector {
1049 selection.filter(|idx| matches!(column.get(idx as usize), Some(CoreSochValue::Null)))
1050 }
1051
1052 #[inline]
1054 fn filter_is_not_null(
1055 &self,
1056 column: &ColumnBatch,
1057 selection: &SelectionVector,
1058 ) -> SelectionVector {
1059 selection
1060 .filter(|idx| !matches!(column.get(idx as usize), Some(CoreSochValue::Null) | None))
1061 }
1062
1063 pub fn materialize(
1065 &self,
1066 columns: &[ColumnBatch],
1067 selection: &SelectionVector,
1068 ) -> Vec<SochRow> {
1069 selection
1070 .iter()
1071 .map(|idx| {
1072 let values: Vec<CoreSochValue> = columns
1073 .iter()
1074 .map(|col| {
1075 col.get(idx as usize)
1076 .cloned()
1077 .unwrap_or(CoreSochValue::Null)
1078 })
1079 .collect();
1080 SochRow::new(values)
1081 })
1082 .collect()
1083 }
1084
1085 pub fn row_to_columnar(&self, rows: &[SochRow], column_names: &[String]) -> Vec<ColumnBatch> {
1087 if rows.is_empty() || column_names.is_empty() {
1088 return vec![];
1089 }
1090
1091 let num_cols = column_names.len().min(rows[0].values.len());
1092
1093 (0..num_cols)
1094 .map(|col_idx| {
1095 let values: Vec<CoreSochValue> = rows
1096 .iter()
1097 .map(|row| {
1098 row.values
1099 .get(col_idx)
1100 .cloned()
1101 .unwrap_or(CoreSochValue::Null)
1102 })
1103 .collect();
1104 ColumnBatch::new(column_names[col_idx].clone(), values)
1105 })
1106 .collect()
1107 }
1108}
1109
1110impl Default for VectorizedExecutor {
1111 fn default() -> Self {
1112 Self::new(Self::default_batch_size())
1113 }
1114}
1115
1116#[derive(Debug, Clone, Default)]
1118pub struct VectorizedStats {
1119 pub rows_processed: usize,
1121 pub rows_selected: usize,
1123 pub predicates_evaluated: usize,
1125 pub short_circuits: usize,
1127 pub time_us: u64,
1129}
1130
1131impl VectorizedStats {
1132 pub fn selectivity(&self) -> f64 {
1134 if self.rows_processed == 0 {
1135 0.0
1136 } else {
1137 self.rows_selected as f64 / self.rows_processed as f64
1138 }
1139 }
1140
1141 pub fn rows_per_sec(&self) -> f64 {
1143 if self.time_us == 0 {
1144 0.0
1145 } else {
1146 self.rows_processed as f64 / (self.time_us as f64 / 1_000_000.0)
1147 }
1148 }
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153 use super::*;
1154
1155 fn test_catalog() -> Catalog {
1156 let mut catalog = Catalog::new("test_db");
1157
1158 let schema = SochSchema::new("users")
1159 .field("id", SochType::UInt)
1160 .field("name", SochType::Text)
1161 .field("score", SochType::Float);
1162
1163 catalog.create_table(schema, 1).unwrap();
1164 catalog
1165 }
1166
1167 #[test]
1168 fn test_validate_select() {
1169 let catalog = test_catalog();
1170 let executor = SochQlExecutor::new();
1171
1172 let query = SochQuery::Select(SelectQuery {
1173 columns: vec!["id".to_string(), "name".to_string()],
1174 table: "users".to_string(),
1175 where_clause: None,
1176 order_by: None,
1177 limit: None,
1178 offset: None,
1179 });
1180
1181 assert!(executor.validate(&query, &catalog).is_ok());
1182 }
1183
1184 #[test]
1185 fn test_validate_nonexistent_table() {
1186 let catalog = test_catalog();
1187 let executor = SochQlExecutor::new();
1188
1189 let query = SochQuery::Select(SelectQuery {
1190 columns: vec!["*".to_string()],
1191 table: "nonexistent".to_string(),
1192 where_clause: None,
1193 order_by: None,
1194 limit: None,
1195 offset: None,
1196 });
1197
1198 assert!(executor.validate(&query, &catalog).is_err());
1199 }
1200
1201 #[test]
1202 fn test_plan_select() {
1203 let catalog = test_catalog();
1204 let executor = SochQlExecutor::new();
1205
1206 let select = SelectQuery {
1207 columns: vec!["id".to_string(), "name".to_string()],
1208 table: "users".to_string(),
1209 where_clause: Some(WhereClause {
1210 conditions: vec![Condition {
1211 column: "score".to_string(),
1212 operator: ComparisonOp::Gt,
1213 value: SochValue::Float(80.0),
1214 }],
1215 operator: LogicalOp::And,
1216 }),
1217 order_by: Some(OrderBy {
1218 column: "score".to_string(),
1219 direction: SortDirection::Desc,
1220 }),
1221 limit: Some(10),
1222 offset: None,
1223 };
1224
1225 let plan = executor.plan_select(&select, &catalog).unwrap();
1226
1227 match plan {
1229 QueryPlan::Limit { input, count, .. } => {
1230 assert_eq!(count, 10);
1231 match *input {
1232 QueryPlan::Sort { input, order_by } => {
1233 assert_eq!(order_by[0].0, "score");
1234 assert!(!order_by[0].1); match *input {
1236 QueryPlan::Project { input, columns } => {
1237 assert_eq!(columns, vec!["id", "name"]);
1238 match *input {
1239 QueryPlan::Filter { predicate, .. } => {
1240 assert_eq!(predicate.conditions.len(), 1);
1241 }
1242 _ => panic!("Expected Filter"),
1243 }
1244 }
1245 _ => panic!("Expected Project"),
1246 }
1247 }
1248 _ => panic!("Expected Sort"),
1249 }
1250 }
1251 _ => panic!("Expected Limit"),
1252 }
1253 }
1254
1255 #[test]
1256 fn test_predicate_evaluation() {
1257 let cond = PredicateCondition {
1258 column: "score".to_string(),
1259 operator: ComparisonOp::Gt,
1260 value: CoreSochValue::Float(80.0),
1261 };
1262
1263 let row_pass = SochRow::new(vec![
1264 CoreSochValue::UInt(1),
1265 CoreSochValue::Text("Alice".to_string()),
1266 CoreSochValue::Float(95.0),
1267 ]);
1268
1269 let row_fail = SochRow::new(vec![
1270 CoreSochValue::UInt(2),
1271 CoreSochValue::Text("Bob".to_string()),
1272 CoreSochValue::Float(75.0),
1273 ]);
1274
1275 assert!(cond.evaluate(&row_pass, 2));
1276 assert!(!cond.evaluate(&row_fail, 2));
1277 }
1278
1279 #[test]
1280 fn test_token_reduction() {
1281 let result = SochResult {
1283 table: "user_statistics".to_string(),
1284 columns: vec![
1285 "user_id".to_string(),
1286 "full_name".to_string(),
1287 "email_address".to_string(),
1288 "registration_date".to_string(),
1289 "last_login".to_string(),
1290 ],
1291 rows: (0..20)
1292 .map(|i| {
1293 vec![
1294 SochValue::UInt(i as u64),
1295 SochValue::Text(format!("User Number {}", i)),
1296 SochValue::Text(format!("user{}@example.com", i)),
1297 SochValue::Text("2024-01-15".to_string()),
1298 SochValue::Text("2024-03-20".to_string()),
1299 ]
1300 })
1301 .collect(),
1302 };
1303
1304 let stats = estimate_token_reduction(&result);
1305
1306 println!("JSON tokens: {}", stats.json_tokens);
1307 println!("TOON tokens: {}", stats.soch_tokens);
1308 println!("Reduction: {}%", stats.reduction_percent);
1309
1310 assert!(stats.soch_tokens < stats.json_tokens);
1312 assert!(stats.reduction_percent > 0); }
1314
1315 #[test]
1320 fn test_selection_vector_basic() {
1321 let sel = SelectionVector::all(100);
1322 assert_eq!(sel.len(), 100);
1323 assert!(!sel.is_empty());
1324 assert_eq!(sel.selectivity(), 1.0);
1325
1326 let empty = SelectionVector::empty();
1327 assert!(empty.is_empty());
1328 assert_eq!(empty.selectivity(), 0.0);
1329 }
1330
1331 #[test]
1332 fn test_selection_vector_filter() {
1333 let sel = SelectionVector::all(10);
1334
1335 let filtered = sel.filter(|i| i % 2 == 0);
1337 assert_eq!(filtered.len(), 5);
1338
1339 let indices: Vec<u32> = filtered.iter().collect();
1340 assert_eq!(indices, vec![0, 2, 4, 6, 8]);
1341 }
1342
1343 #[test]
1344 fn test_vectorized_int_filter() {
1345 let executor = VectorizedExecutor::new(1024);
1346
1347 let column = ColumnBatch::new(
1349 "value".to_string(),
1350 (0..10).map(CoreSochValue::Int).collect(),
1351 );
1352
1353 let predicates = vec![VectorPredicate::IntGt {
1354 col_idx: 0,
1355 threshold: 5,
1356 }];
1357
1358 let selection = executor.evaluate_batch(&[column], &predicates);
1359
1360 assert_eq!(selection.len(), 4);
1362 let indices: Vec<u32> = selection.iter().collect();
1363 assert_eq!(indices, vec![6, 7, 8, 9]);
1364 }
1365
1366 #[test]
1367 fn test_vectorized_multiple_predicates() {
1368 let executor = VectorizedExecutor::new(1024);
1369
1370 let id_col = ColumnBatch::new("id".to_string(), (0..100).map(CoreSochValue::Int).collect());
1372
1373 let status_col = ColumnBatch::new(
1374 "active".to_string(),
1375 (0..100).map(|i| CoreSochValue::Bool(i % 2 == 0)).collect(),
1376 );
1377
1378 let predicates = vec![
1379 VectorPredicate::IntGe {
1380 col_idx: 0,
1381 threshold: 50,
1382 },
1383 VectorPredicate::IntLt {
1384 col_idx: 0,
1385 threshold: 60,
1386 },
1387 VectorPredicate::BoolEq {
1388 col_idx: 1,
1389 value: true,
1390 },
1391 ];
1392
1393 let selection = executor.evaluate_batch(&[id_col, status_col], &predicates);
1394
1395 assert_eq!(selection.len(), 5);
1397 let indices: Vec<u32> = selection.iter().collect();
1398 assert_eq!(indices, vec![50, 52, 54, 56, 58]);
1399 }
1400
1401 #[test]
1402 fn test_vectorized_short_circuit() {
1403 let executor = VectorizedExecutor::new(1024);
1404
1405 let column = ColumnBatch::new(
1407 "value".to_string(),
1408 (0..100).map(|_| CoreSochValue::Int(-1)).collect(),
1409 );
1410
1411 let predicates = vec![
1413 VectorPredicate::IntGt {
1414 col_idx: 0,
1415 threshold: 0,
1416 },
1417 VectorPredicate::IntLt {
1419 col_idx: 0,
1420 threshold: 100,
1421 },
1422 VectorPredicate::IntEq {
1423 col_idx: 0,
1424 value: 50,
1425 },
1426 ];
1427
1428 let selection = executor.evaluate_batch(&[column], &predicates);
1429 assert!(selection.is_empty());
1430 }
1431
1432 #[test]
1433 fn test_vectorized_string_predicates() {
1434 let executor = VectorizedExecutor::new(1024);
1435
1436 let names = [
1437 "Alice", "Bob", "Carol", "Dave", "Alice", "Alice", "Bob", "Carol",
1438 ];
1439 let column = ColumnBatch::new(
1440 "name".to_string(),
1441 names
1442 .iter()
1443 .map(|s| CoreSochValue::Text(s.to_string()))
1444 .collect(),
1445 );
1446
1447 let predicates = vec![VectorPredicate::StrEq {
1448 col_idx: 0,
1449 value: "Alice".to_string(),
1450 }];
1451
1452 let selection = executor.evaluate_batch(&[column], &predicates);
1453
1454 assert_eq!(selection.len(), 3);
1456 let indices: Vec<u32> = selection.iter().collect();
1457 assert_eq!(indices, vec![0, 4, 5]);
1458 }
1459
1460 #[test]
1461 fn test_vectorized_null_handling() {
1462 let executor = VectorizedExecutor::new(1024);
1463
1464 let values = vec![
1465 CoreSochValue::Int(1),
1466 CoreSochValue::Null,
1467 CoreSochValue::Int(2),
1468 CoreSochValue::Null,
1469 CoreSochValue::Int(3),
1470 ];
1471 let column = ColumnBatch::new("value".to_string(), values);
1472
1473 let predicates = vec![VectorPredicate::IsNull { col_idx: 0 }];
1474 let null_selection = executor.evaluate_batch(std::slice::from_ref(&column), &predicates);
1475 assert_eq!(null_selection.len(), 2); let not_null_predicates = vec![VectorPredicate::IsNotNull { col_idx: 0 }];
1478 let not_null_selection = executor.evaluate_batch(&[column], ¬_null_predicates);
1479 assert_eq!(not_null_selection.len(), 3); }
1481
1482 #[test]
1483 fn test_row_to_columnar_conversion() {
1484 let executor = VectorizedExecutor::new(1024);
1485
1486 let rows = vec![
1487 SochRow::new(vec![
1488 CoreSochValue::Int(1),
1489 CoreSochValue::Text("Alice".to_string()),
1490 ]),
1491 SochRow::new(vec![
1492 CoreSochValue::Int(2),
1493 CoreSochValue::Text("Bob".to_string()),
1494 ]),
1495 SochRow::new(vec![
1496 CoreSochValue::Int(3),
1497 CoreSochValue::Text("Carol".to_string()),
1498 ]),
1499 ];
1500
1501 let column_names = vec!["id".to_string(), "name".to_string()];
1502 let columns = executor.row_to_columnar(&rows, &column_names);
1503
1504 assert_eq!(columns.len(), 2);
1505 assert_eq!(columns[0].name, "id");
1506 assert_eq!(columns[1].name, "name");
1507 assert_eq!(columns[0].len(), 3);
1508 assert_eq!(columns[1].len(), 3);
1509 }
1510
1511 #[test]
1512 fn test_materialize_selected_rows() {
1513 let executor = VectorizedExecutor::new(1024);
1514
1515 let id_col = ColumnBatch::new(
1516 "id".to_string(),
1517 vec![
1518 CoreSochValue::Int(1),
1519 CoreSochValue::Int(2),
1520 CoreSochValue::Int(3),
1521 ],
1522 );
1523 let name_col = ColumnBatch::new(
1524 "name".to_string(),
1525 vec![
1526 CoreSochValue::Text("Alice".to_string()),
1527 CoreSochValue::Text("Bob".to_string()),
1528 CoreSochValue::Text("Carol".to_string()),
1529 ],
1530 );
1531
1532 let selection = SelectionVector::from_indices(vec![0, 2], 3);
1534
1535 let rows = executor.materialize(&[id_col, name_col], &selection);
1536
1537 assert_eq!(rows.len(), 2);
1538 assert_eq!(rows[0].values[0], CoreSochValue::Int(1));
1539 assert_eq!(rows[0].values[1], CoreSochValue::Text("Alice".to_string()));
1540 assert_eq!(rows[1].values[0], CoreSochValue::Int(3));
1541 assert_eq!(rows[1].values[1], CoreSochValue::Text("Carol".to_string()));
1542 }
1543}