1use crate::soch_ql::{
39 ComparisonOp, LogicalOp, SelectQuery, SochQlParser, SochQuery, SochResult, SochValue,
40 SortDirection, WhereClause,
41};
42#[cfg(test)]
43use crate::soch_ql::{Condition, OrderBy};
44use sochdb_core::{Catalog, Result, SochDBError, SochRow, SochValue as CoreSochValue};
45#[cfg(test)]
46use sochdb_core::{SochSchema, SochType};
47use std::collections::HashMap;
48
49#[derive(Debug, Clone)]
51pub enum QueryPlan {
52 TableScan {
54 table: String,
55 columns: Vec<String>,
56 predicate: Option<Box<QueryPlan>>,
57 },
58 IndexSeek { index: String, key_range: KeyRange },
60 Filter {
62 input: Box<QueryPlan>,
63 predicate: Predicate,
64 },
65 Project {
67 input: Box<QueryPlan>,
68 columns: Vec<String>,
69 },
70 Sort {
72 input: Box<QueryPlan>,
73 order_by: Vec<(String, bool)>, },
75 Limit {
77 input: Box<QueryPlan>,
78 count: usize,
79 offset: usize,
80 },
81 Empty,
83}
84
85#[derive(Debug, Clone)]
87pub struct KeyRange {
88 pub start: Option<SochValue>,
89 pub end: Option<SochValue>,
90 pub inclusive_start: bool,
91 pub inclusive_end: bool,
92}
93
94impl KeyRange {
95 pub fn all() -> Self {
96 Self {
97 start: None,
98 end: None,
99 inclusive_start: true,
100 inclusive_end: true,
101 }
102 }
103
104 pub fn eq(value: SochValue) -> Self {
105 Self {
106 start: Some(value.clone()),
107 end: Some(value),
108 inclusive_start: true,
109 inclusive_end: true,
110 }
111 }
112}
113
114#[derive(Debug, Clone)]
116pub struct Predicate {
117 pub conditions: Vec<PredicateCondition>,
118 pub operator: LogicalOp,
119}
120
121#[derive(Debug, Clone)]
123pub struct PredicateCondition {
124 pub column: String,
125 pub operator: ComparisonOp,
126 pub value: CoreSochValue,
127}
128
129impl PredicateCondition {
130 pub fn from_soch_ql(column: String, operator: ComparisonOp, value: &SochValue) -> Self {
132 Self {
133 column,
134 operator,
135 value: Self::convert_value(value),
136 }
137 }
138
139 fn convert_value(v: &SochValue) -> CoreSochValue {
141 match v {
142 SochValue::Int(i) => CoreSochValue::Int(*i),
143 SochValue::UInt(u) => CoreSochValue::UInt(*u),
144 SochValue::Float(f) => CoreSochValue::Float(*f),
145 SochValue::Text(s) => CoreSochValue::Text(s.clone()),
146 SochValue::Bool(b) => CoreSochValue::Bool(*b),
147 SochValue::Null => CoreSochValue::Null,
148 SochValue::Binary(b) => CoreSochValue::Binary(b.clone()),
149 SochValue::Array(arr) => {
150 CoreSochValue::Array(arr.iter().map(Self::convert_value).collect())
151 }
152 }
153 }
154
155 pub fn evaluate(&self, row: &SochRow, column_idx: usize) -> bool {
157 if column_idx >= row.values.len() {
158 return false;
159 }
160
161 let row_value = &row.values[column_idx];
162
163 match self.operator {
164 ComparisonOp::Eq => row_value == &self.value,
165 ComparisonOp::Ne => row_value != &self.value,
166 ComparisonOp::Lt => {
167 Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Less)
168 }
169 ComparisonOp::Le => matches!(
170 Self::compare(row_value, &self.value),
171 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
172 ),
173 ComparisonOp::Gt => {
174 Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Greater)
175 }
176 ComparisonOp::Ge => matches!(
177 Self::compare(row_value, &self.value),
178 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
179 ),
180 ComparisonOp::Like => Self::like_match(row_value, &self.value),
181 ComparisonOp::In => Self::in_match(row_value, &self.value),
182 ComparisonOp::SimilarTo => {
183 Self::like_match(row_value, &self.value)
187 }
188 }
189 }
190
191 fn compare(a: &CoreSochValue, b: &CoreSochValue) -> Option<std::cmp::Ordering> {
192 match (a, b) {
193 (CoreSochValue::Int(a), CoreSochValue::Int(b)) => Some(a.cmp(b)),
194 (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => Some(a.cmp(b)),
195 (CoreSochValue::Float(a), CoreSochValue::Float(b)) => a.partial_cmp(b),
196 (CoreSochValue::Text(a), CoreSochValue::Text(b)) => Some(a.cmp(b)),
197 _ => None,
198 }
199 }
200
201 fn like_match(value: &CoreSochValue, pattern: &CoreSochValue) -> bool {
202 match (value, pattern) {
203 (CoreSochValue::Text(v), CoreSochValue::Text(p)) => {
204 crate::like::like_match(v, p)
207 }
208 _ => false,
209 }
210 }
211
212 fn in_match(value: &CoreSochValue, list: &CoreSochValue) -> bool {
213 match list {
214 CoreSochValue::Array(values) => values.iter().any(|v| value == v),
215 _ => value == list, }
217 }
218}
219
220impl Predicate {
221 pub fn evaluate(&self, row: &SochRow, column_map: &HashMap<String, usize>) -> bool {
223 let results: Vec<bool> = self
224 .conditions
225 .iter()
226 .map(|cond| {
227 column_map
228 .get(&cond.column)
229 .map(|&idx| cond.evaluate(row, idx))
230 .unwrap_or(false)
231 })
232 .collect();
233
234 match self.operator {
235 LogicalOp::And => results.iter().all(|&r| r),
236 LogicalOp::Or => results.iter().any(|&r| r),
237 }
238 }
239}
240
241pub struct SochQlExecutor {
246 storage: Option<std::sync::Arc<dyn crate::optimizer_integration::StorageBackend>>,
247}
248
249impl SochQlExecutor {
250 pub fn new() -> Self {
252 Self { storage: None }
253 }
254
255 pub fn with_storage(
257 storage: std::sync::Arc<dyn crate::optimizer_integration::StorageBackend>,
258 ) -> Self {
259 Self {
260 storage: Some(storage),
261 }
262 }
263
264 pub fn execute(&self, query: &str, catalog: &Catalog) -> Result<SochResult> {
266 let parsed = SochQlParser::parse(query)
268 .map_err(|e| SochDBError::InvalidArgument(format!("Parse error: {:?}", e)))?;
269
270 self.validate(&parsed, catalog)?;
272
273 let plan = self.plan(&parsed, catalog)?;
275
276 self.execute_plan(&plan, catalog)
278 }
279
280 pub fn validate(&self, query: &SochQuery, catalog: &Catalog) -> Result<()> {
282 match query {
283 SochQuery::Select(select) => {
284 if catalog.get_table(&select.table).is_none() {
286 return Err(SochDBError::NotFound(format!(
287 "Table '{}' not found",
288 select.table
289 )));
290 }
291
292 if let Some(entry) = catalog.get_table(&select.table)
294 && let Some(schema) = &entry.schema
295 {
296 for col in &select.columns {
297 if col != "*" && !schema.fields.iter().any(|f| &f.name == col) {
298 return Err(SochDBError::InvalidArgument(format!(
299 "Column '{}' not found in table '{}'",
300 col, select.table
301 )));
302 }
303 }
304 }
305
306 Ok(())
307 }
308 SochQuery::Insert(insert) => {
309 if catalog.get_table(&insert.table).is_none() {
311 return Err(SochDBError::NotFound(format!(
312 "Table '{}' not found",
313 insert.table
314 )));
315 }
316 Ok(())
317 }
318 SochQuery::CreateTable(create) => {
319 if catalog.get_table(&create.table).is_some() {
321 return Err(SochDBError::InvalidArgument(format!(
322 "Table '{}' already exists",
323 create.table
324 )));
325 }
326 Ok(())
327 }
328 SochQuery::DropTable { table } => {
329 if catalog.get_table(table).is_none() {
330 return Err(SochDBError::NotFound(format!(
331 "Table '{}' not found",
332 table
333 )));
334 }
335 Ok(())
336 }
337 }
338 }
339
340 pub fn plan(&self, query: &SochQuery, catalog: &Catalog) -> Result<QueryPlan> {
342 match query {
343 SochQuery::Select(select) => self.plan_select(select, catalog),
344 _ => Err(SochDBError::InvalidArgument(
345 "Only SELECT queries can be planned".to_string(),
346 )),
347 }
348 }
349
350 fn plan_select(&self, select: &SelectQuery, _catalog: &Catalog) -> Result<QueryPlan> {
351 let mut plan = QueryPlan::TableScan {
353 table: select.table.clone(),
354 columns: select.columns.clone(),
355 predicate: None,
356 };
357
358 if let Some(where_clause) = &select.where_clause {
360 let predicate = self.build_predicate(where_clause);
361 plan = QueryPlan::Filter {
362 input: Box::new(plan),
363 predicate,
364 };
365 }
366
367 if !select.columns.contains(&"*".to_string()) {
369 plan = QueryPlan::Project {
370 input: Box::new(plan),
371 columns: select.columns.clone(),
372 };
373 }
374
375 if let Some(order_by) = &select.order_by {
377 plan = QueryPlan::Sort {
378 input: Box::new(plan),
379 order_by: vec![(
380 order_by.column.clone(),
381 matches!(order_by.direction, SortDirection::Asc),
382 )],
383 };
384 }
385
386 if select.limit.is_some() || select.offset.is_some() {
388 plan = QueryPlan::Limit {
389 input: Box::new(plan),
390 count: select.limit.unwrap_or(usize::MAX),
391 offset: select.offset.unwrap_or(0),
392 };
393 }
394
395 Ok(plan)
396 }
397
398 fn build_predicate(&self, where_clause: &WhereClause) -> Predicate {
399 Predicate {
400 conditions: where_clause
401 .conditions
402 .iter()
403 .map(|c| PredicateCondition::from_soch_ql(c.column.clone(), c.operator, &c.value))
404 .collect(),
405 operator: where_clause.operator,
406 }
407 }
408
409 #[allow(clippy::only_used_in_recursion)]
414 pub fn execute_plan(&self, plan: &QueryPlan, catalog: &Catalog) -> Result<SochResult> {
415 match plan {
416 QueryPlan::Empty => Ok(SochResult {
417 table: "result".to_string(),
418 columns: vec![],
419 rows: vec![],
420 }),
421 QueryPlan::TableScan { table, columns, .. } => {
422 let schema_columns = if let Some(entry) = catalog.get_table(table) {
424 if let Some(schema) = &entry.schema {
425 if columns.contains(&"*".to_string()) {
426 schema.fields.iter().map(|f| f.name.clone()).collect()
427 } else {
428 columns.clone()
429 }
430 } else {
431 columns.clone()
432 }
433 } else {
434 columns.clone()
435 };
436
437 let rows = if let Some(storage) = &self.storage {
439 let raw_rows = storage.table_scan(table, &schema_columns, None)?;
440 raw_rows
441 .into_iter()
442 .map(|row| {
443 schema_columns
444 .iter()
445 .map(|c| row.get(c).cloned().unwrap_or(SochValue::Null))
446 .collect()
447 })
448 .collect()
449 } else {
450 vec![] };
452
453 Ok(SochResult {
454 table: table.clone(),
455 columns: schema_columns,
456 rows,
457 })
458 }
459 QueryPlan::Filter { input, predicate } => {
460 let mut result = self.execute_plan(input, catalog)?;
461 let col_map: HashMap<String, usize> = result
463 .columns
464 .iter()
465 .enumerate()
466 .map(|(i, c)| (c.clone(), i))
467 .collect();
468 result.rows.retain(|row| {
470 let matches: Vec<bool> = predicate
471 .conditions
472 .iter()
473 .map(|cond| {
474 if let Some(&idx) = col_map.get(&cond.column) {
475 if idx < row.len() {
476 Self::eval_predicate_condition(cond, &row[idx])
477 } else {
478 false
479 }
480 } else {
481 false
482 }
483 })
484 .collect();
485 match predicate.operator {
486 LogicalOp::And => matches.iter().all(|&m| m),
487 LogicalOp::Or => matches.iter().any(|&m| m),
488 }
489 });
490 Ok(result)
491 }
492 QueryPlan::Project { input, columns } => {
493 let mut result = self.execute_plan(input, catalog)?;
494 let col_map: HashMap<String, usize> = result
496 .columns
497 .iter()
498 .enumerate()
499 .map(|(i, c)| (c.clone(), i))
500 .collect();
501 result.rows = result
503 .rows
504 .into_iter()
505 .map(|row| {
506 columns
507 .iter()
508 .map(|c| {
509 col_map
510 .get(c)
511 .and_then(|&i| row.get(i).cloned())
512 .unwrap_or(SochValue::Null)
513 })
514 .collect()
515 })
516 .collect();
517 result.columns = columns.clone();
518 Ok(result)
519 }
520 QueryPlan::Sort { input, order_by } => {
521 let mut result = self.execute_plan(input, catalog)?;
522 let col_map: HashMap<String, usize> = result
523 .columns
524 .iter()
525 .enumerate()
526 .map(|(i, c)| (c.clone(), i))
527 .collect();
528 result.rows.sort_by(|a, b| {
529 for (col, ascending) in order_by {
530 if let Some(&idx) = col_map.get(col) {
531 let va = a.get(idx);
532 let vb = b.get(idx);
533 let cmp = Self::compare_soch_values(va, vb);
534 let cmp = if *ascending { cmp } else { cmp.reverse() };
535 if cmp != std::cmp::Ordering::Equal {
536 return cmp;
537 }
538 }
539 }
540 std::cmp::Ordering::Equal
541 });
542 Ok(result)
543 }
544 QueryPlan::Limit {
545 input,
546 count,
547 offset,
548 } => {
549 let mut result = self.execute_plan(input, catalog)?;
550 result.rows = result.rows.into_iter().skip(*offset).take(*count).collect();
551 Ok(result)
552 }
553 QueryPlan::IndexSeek { .. } => {
554 Ok(SochResult {
556 table: "result".to_string(),
557 columns: vec![],
558 rows: vec![],
559 })
560 }
561 }
562 }
563
564 fn compare_soch_values(a: Option<&SochValue>, b: Option<&SochValue>) -> std::cmp::Ordering {
566 match (a, b) {
567 (None, None) => std::cmp::Ordering::Equal,
568 (None, Some(_)) => std::cmp::Ordering::Less,
569 (Some(_), None) => std::cmp::Ordering::Greater,
570 (Some(a), Some(b)) => match (a, b) {
571 (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
572 (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
573 (SochValue::Float(a), SochValue::Float(b)) => {
574 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
575 }
576 (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
577 (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
578 _ => std::cmp::Ordering::Equal,
579 },
580 }
581 }
582
583 fn eval_predicate_condition(cond: &PredicateCondition, value: &SochValue) -> bool {
585 let core_val = PredicateCondition::convert_value(value);
587 match cond.operator {
588 ComparisonOp::Eq => core_val == cond.value,
589 ComparisonOp::Ne => core_val != cond.value,
590 ComparisonOp::Lt => {
591 PredicateCondition::compare(&core_val, &cond.value)
592 == Some(std::cmp::Ordering::Less)
593 }
594 ComparisonOp::Le => matches!(
595 PredicateCondition::compare(&core_val, &cond.value),
596 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
597 ),
598 ComparisonOp::Gt => {
599 PredicateCondition::compare(&core_val, &cond.value)
600 == Some(std::cmp::Ordering::Greater)
601 }
602 ComparisonOp::Ge => matches!(
603 PredicateCondition::compare(&core_val, &cond.value),
604 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
605 ),
606 ComparisonOp::Like => PredicateCondition::like_match(&core_val, &cond.value),
607 ComparisonOp::In => PredicateCondition::in_match(&core_val, &cond.value),
608 ComparisonOp::SimilarTo => PredicateCondition::like_match(&core_val, &cond.value),
609 }
610 }
611}
612
613impl Default for SochQlExecutor {
614 fn default() -> Self {
615 Self { storage: None }
616 }
617}
618
619pub fn execute_sochql(query: &str, catalog: &Catalog) -> Result<SochResult> {
621 SochQlExecutor::new().execute(query, catalog)
622}
623
624pub fn estimate_token_reduction(result: &SochResult) -> TokenReductionStats {
626 let row_count = result.rows.len();
627 let col_count = result.columns.len();
628
629 if row_count == 0 || col_count == 0 {
630 return TokenReductionStats::default();
631 }
632
633 let avg_col_name_len: usize = result.columns.iter().map(|c| c.len()).sum::<usize>() / col_count;
636 let avg_value_len = 10; let json_tokens = 2 + row_count * (2 + col_count * (avg_col_name_len + 4 + avg_value_len));
640
641 let header_tokens = result.table.len() + 10 + result.columns.join(",").len();
644 let soch_tokens = header_tokens + row_count * (col_count * avg_value_len + col_count);
645
646 let reduction = 1.0 - (soch_tokens as f64 / json_tokens as f64);
647
648 TokenReductionStats {
649 json_tokens,
650 soch_tokens,
651 reduction_percent: (reduction * 100.0) as u32,
652 row_count,
653 col_count,
654 }
655}
656
657#[derive(Debug, Clone, Default)]
659pub struct TokenReductionStats {
660 pub json_tokens: usize,
662 pub soch_tokens: usize,
664 pub reduction_percent: u32,
666 pub row_count: usize,
668 pub col_count: usize,
670}
671
672#[derive(Debug, Clone)]
681pub struct SelectionVector {
682 indices: Vec<u32>,
684 batch_size: usize,
686}
687
688impl SelectionVector {
689 pub fn all(batch_size: usize) -> Self {
691 Self {
692 indices: (0..batch_size as u32).collect(),
693 batch_size,
694 }
695 }
696
697 pub fn empty() -> Self {
699 Self {
700 indices: Vec::new(),
701 batch_size: 0,
702 }
703 }
704
705 pub fn from_indices(indices: Vec<u32>, batch_size: usize) -> Self {
707 Self {
708 indices,
709 batch_size,
710 }
711 }
712
713 #[inline]
715 pub fn is_empty(&self) -> bool {
716 self.indices.is_empty()
717 }
718
719 #[inline]
721 pub fn len(&self) -> usize {
722 self.indices.len()
723 }
724
725 #[inline]
727 pub fn batch_size(&self) -> usize {
728 self.batch_size
729 }
730
731 #[inline]
733 pub fn selectivity(&self) -> f64 {
734 if self.batch_size == 0 {
735 0.0
736 } else {
737 self.len() as f64 / self.batch_size as f64
738 }
739 }
740
741 pub fn iter(&self) -> impl Iterator<Item = u32> + '_ {
743 self.indices.iter().copied()
744 }
745
746 pub fn filter<F>(&self, pred: F) -> Self
748 where
749 F: Fn(u32) -> bool,
750 {
751 Self {
752 indices: self.indices.iter().copied().filter(|&i| pred(i)).collect(),
753 batch_size: self.batch_size,
754 }
755 }
756
757 pub fn extend_masked(&mut self, start_idx: usize, mask: u16) {
759 for bit in 0..16 {
760 if (mask >> bit) & 1 == 1 {
761 self.indices.push((start_idx + bit) as u32);
762 }
763 }
764 }
765}
766
767#[derive(Debug, Clone)]
769pub struct ColumnBatch {
770 pub values: Vec<CoreSochValue>,
772 pub name: String,
774}
775
776impl ColumnBatch {
777 pub fn new(name: String, values: Vec<CoreSochValue>) -> Self {
779 Self { values, name }
780 }
781
782 #[inline]
784 pub fn get(&self, idx: usize) -> Option<&CoreSochValue> {
785 self.values.get(idx)
786 }
787
788 #[allow(dead_code)]
790 pub fn as_i64_slice(&self) -> Option<Vec<i64>> {
791 self.values
792 .iter()
793 .map(|v| match v {
794 CoreSochValue::Int(i) => Some(*i),
795 CoreSochValue::UInt(u) => Some(*u as i64),
796 _ => None,
797 })
798 .collect()
799 }
800
801 pub fn len(&self) -> usize {
803 self.values.len()
804 }
805
806 pub fn is_empty(&self) -> bool {
808 self.values.is_empty()
809 }
810}
811
812#[derive(Debug, Clone)]
814pub enum VectorPredicate {
815 IntGt { col_idx: usize, threshold: i64 },
817 IntLt { col_idx: usize, threshold: i64 },
819 IntEq { col_idx: usize, value: i64 },
821 IntGe { col_idx: usize, threshold: i64 },
823 IntLe { col_idx: usize, threshold: i64 },
825 StrEq { col_idx: usize, value: String },
827 StrPrefix { col_idx: usize, prefix: String },
829 BoolEq { col_idx: usize, value: bool },
831 IsNull { col_idx: usize },
833 IsNotNull { col_idx: usize },
835}
836
837pub struct VectorizedExecutor {
853 batch_size: usize,
855}
856
857impl VectorizedExecutor {
858 pub fn new(batch_size: usize) -> Self {
860 Self { batch_size }
861 }
862
863 pub fn default_batch_size() -> usize {
865 1024
866 }
867
868 pub fn evaluate_batch(
873 &self,
874 columns: &[ColumnBatch],
875 predicates: &[VectorPredicate],
876 ) -> SelectionVector {
877 if columns.is_empty() {
878 return SelectionVector::empty();
879 }
880
881 let batch_size = columns[0].len().min(self.batch_size);
882 let mut selection = SelectionVector::all(batch_size);
883
884 for predicate in predicates {
886 if selection.is_empty() {
887 break; }
889
890 selection = match predicate {
891 VectorPredicate::IntGt { col_idx, threshold } => {
892 self.filter_int_gt(&columns[*col_idx], *threshold, &selection)
893 }
894 VectorPredicate::IntLt { col_idx, threshold } => {
895 self.filter_int_lt(&columns[*col_idx], *threshold, &selection)
896 }
897 VectorPredicate::IntEq { col_idx, value } => {
898 self.filter_int_eq(&columns[*col_idx], *value, &selection)
899 }
900 VectorPredicate::IntGe { col_idx, threshold } => {
901 self.filter_int_ge(&columns[*col_idx], *threshold, &selection)
902 }
903 VectorPredicate::IntLe { col_idx, threshold } => {
904 self.filter_int_le(&columns[*col_idx], *threshold, &selection)
905 }
906 VectorPredicate::StrEq { col_idx, value } => {
907 self.filter_str_eq(&columns[*col_idx], value, &selection)
908 }
909 VectorPredicate::StrPrefix { col_idx, prefix } => {
910 self.filter_str_prefix(&columns[*col_idx], prefix, &selection)
911 }
912 VectorPredicate::BoolEq { col_idx, value } => {
913 self.filter_bool_eq(&columns[*col_idx], *value, &selection)
914 }
915 VectorPredicate::IsNull { col_idx } => {
916 self.filter_is_null(&columns[*col_idx], &selection)
917 }
918 VectorPredicate::IsNotNull { col_idx } => {
919 self.filter_is_not_null(&columns[*col_idx], &selection)
920 }
921 };
922 }
923
924 selection
925 }
926
927 #[inline]
929 fn filter_int_gt(
930 &self,
931 column: &ColumnBatch,
932 threshold: i64,
933 selection: &SelectionVector,
934 ) -> SelectionVector {
935 selection.filter(|idx| match column.get(idx as usize) {
936 Some(CoreSochValue::Int(v)) => *v > threshold,
937 Some(CoreSochValue::UInt(v)) => (*v as i64) > threshold,
938 _ => false,
939 })
940 }
941
942 #[inline]
944 fn filter_int_lt(
945 &self,
946 column: &ColumnBatch,
947 threshold: i64,
948 selection: &SelectionVector,
949 ) -> SelectionVector {
950 selection.filter(|idx| match column.get(idx as usize) {
951 Some(CoreSochValue::Int(v)) => *v < threshold,
952 Some(CoreSochValue::UInt(v)) => (*v as i64) < threshold,
953 _ => false,
954 })
955 }
956
957 #[inline]
959 fn filter_int_eq(
960 &self,
961 column: &ColumnBatch,
962 value: i64,
963 selection: &SelectionVector,
964 ) -> SelectionVector {
965 selection.filter(|idx| match column.get(idx as usize) {
966 Some(CoreSochValue::Int(v)) => *v == value,
967 Some(CoreSochValue::UInt(v)) => (*v as i64) == value,
968 _ => false,
969 })
970 }
971
972 #[inline]
974 fn filter_int_ge(
975 &self,
976 column: &ColumnBatch,
977 threshold: i64,
978 selection: &SelectionVector,
979 ) -> SelectionVector {
980 selection.filter(|idx| match column.get(idx as usize) {
981 Some(CoreSochValue::Int(v)) => *v >= threshold,
982 Some(CoreSochValue::UInt(v)) => (*v as i64) >= threshold,
983 _ => false,
984 })
985 }
986
987 #[inline]
989 fn filter_int_le(
990 &self,
991 column: &ColumnBatch,
992 threshold: i64,
993 selection: &SelectionVector,
994 ) -> SelectionVector {
995 selection.filter(|idx| match column.get(idx as usize) {
996 Some(CoreSochValue::Int(v)) => *v <= threshold,
997 Some(CoreSochValue::UInt(v)) => (*v as i64) <= threshold,
998 _ => false,
999 })
1000 }
1001
1002 #[inline]
1004 fn filter_str_eq(
1005 &self,
1006 column: &ColumnBatch,
1007 value: &str,
1008 selection: &SelectionVector,
1009 ) -> SelectionVector {
1010 selection.filter(|idx| match column.get(idx as usize) {
1011 Some(CoreSochValue::Text(s)) => s == value,
1012 _ => false,
1013 })
1014 }
1015
1016 #[inline]
1018 fn filter_str_prefix(
1019 &self,
1020 column: &ColumnBatch,
1021 prefix: &str,
1022 selection: &SelectionVector,
1023 ) -> SelectionVector {
1024 selection.filter(|idx| match column.get(idx as usize) {
1025 Some(CoreSochValue::Text(s)) => s.starts_with(prefix),
1026 _ => false,
1027 })
1028 }
1029
1030 #[inline]
1032 fn filter_bool_eq(
1033 &self,
1034 column: &ColumnBatch,
1035 value: bool,
1036 selection: &SelectionVector,
1037 ) -> SelectionVector {
1038 selection.filter(|idx| match column.get(idx as usize) {
1039 Some(CoreSochValue::Bool(b)) => *b == value,
1040 _ => false,
1041 })
1042 }
1043
1044 #[inline]
1046 fn filter_is_null(&self, column: &ColumnBatch, selection: &SelectionVector) -> SelectionVector {
1047 selection.filter(|idx| matches!(column.get(idx as usize), Some(CoreSochValue::Null)))
1048 }
1049
1050 #[inline]
1052 fn filter_is_not_null(
1053 &self,
1054 column: &ColumnBatch,
1055 selection: &SelectionVector,
1056 ) -> SelectionVector {
1057 selection
1058 .filter(|idx| !matches!(column.get(idx as usize), Some(CoreSochValue::Null) | None))
1059 }
1060
1061 pub fn materialize(
1063 &self,
1064 columns: &[ColumnBatch],
1065 selection: &SelectionVector,
1066 ) -> Vec<SochRow> {
1067 selection
1068 .iter()
1069 .map(|idx| {
1070 let values: Vec<CoreSochValue> = columns
1071 .iter()
1072 .map(|col| {
1073 col.get(idx as usize)
1074 .cloned()
1075 .unwrap_or(CoreSochValue::Null)
1076 })
1077 .collect();
1078 SochRow::new(values)
1079 })
1080 .collect()
1081 }
1082
1083 pub fn row_to_columnar(&self, rows: &[SochRow], column_names: &[String]) -> Vec<ColumnBatch> {
1085 if rows.is_empty() || column_names.is_empty() {
1086 return vec![];
1087 }
1088
1089 let num_cols = column_names.len().min(rows[0].values.len());
1090
1091 (0..num_cols)
1092 .map(|col_idx| {
1093 let values: Vec<CoreSochValue> = rows
1094 .iter()
1095 .map(|row| {
1096 row.values
1097 .get(col_idx)
1098 .cloned()
1099 .unwrap_or(CoreSochValue::Null)
1100 })
1101 .collect();
1102 ColumnBatch::new(column_names[col_idx].clone(), values)
1103 })
1104 .collect()
1105 }
1106}
1107
1108impl Default for VectorizedExecutor {
1109 fn default() -> Self {
1110 Self::new(Self::default_batch_size())
1111 }
1112}
1113
1114#[derive(Debug, Clone, Default)]
1116pub struct VectorizedStats {
1117 pub rows_processed: usize,
1119 pub rows_selected: usize,
1121 pub predicates_evaluated: usize,
1123 pub short_circuits: usize,
1125 pub time_us: u64,
1127}
1128
1129impl VectorizedStats {
1130 pub fn selectivity(&self) -> f64 {
1132 if self.rows_processed == 0 {
1133 0.0
1134 } else {
1135 self.rows_selected as f64 / self.rows_processed as f64
1136 }
1137 }
1138
1139 pub fn rows_per_sec(&self) -> f64 {
1141 if self.time_us == 0 {
1142 0.0
1143 } else {
1144 self.rows_processed as f64 / (self.time_us as f64 / 1_000_000.0)
1145 }
1146 }
1147}
1148
1149#[cfg(test)]
1150mod tests {
1151 use super::*;
1152
1153 fn test_catalog() -> Catalog {
1154 let mut catalog = Catalog::new("test_db");
1155
1156 let schema = SochSchema::new("users")
1157 .field("id", SochType::UInt)
1158 .field("name", SochType::Text)
1159 .field("score", SochType::Float);
1160
1161 catalog.create_table(schema, 1).unwrap();
1162 catalog
1163 }
1164
1165 #[test]
1166 fn test_validate_select() {
1167 let catalog = test_catalog();
1168 let executor = SochQlExecutor::new();
1169
1170 let query = SochQuery::Select(SelectQuery {
1171 columns: vec!["id".to_string(), "name".to_string()],
1172 table: "users".to_string(),
1173 where_clause: None,
1174 order_by: None,
1175 limit: None,
1176 offset: None,
1177 });
1178
1179 assert!(executor.validate(&query, &catalog).is_ok());
1180 }
1181
1182 #[test]
1183 fn test_validate_nonexistent_table() {
1184 let catalog = test_catalog();
1185 let executor = SochQlExecutor::new();
1186
1187 let query = SochQuery::Select(SelectQuery {
1188 columns: vec!["*".to_string()],
1189 table: "nonexistent".to_string(),
1190 where_clause: None,
1191 order_by: None,
1192 limit: None,
1193 offset: None,
1194 });
1195
1196 assert!(executor.validate(&query, &catalog).is_err());
1197 }
1198
1199 #[test]
1200 fn test_plan_select() {
1201 let catalog = test_catalog();
1202 let executor = SochQlExecutor::new();
1203
1204 let select = SelectQuery {
1205 columns: vec!["id".to_string(), "name".to_string()],
1206 table: "users".to_string(),
1207 where_clause: Some(WhereClause {
1208 conditions: vec![Condition {
1209 column: "score".to_string(),
1210 operator: ComparisonOp::Gt,
1211 value: SochValue::Float(80.0),
1212 }],
1213 operator: LogicalOp::And,
1214 }),
1215 order_by: Some(OrderBy {
1216 column: "score".to_string(),
1217 direction: SortDirection::Desc,
1218 }),
1219 limit: Some(10),
1220 offset: None,
1221 };
1222
1223 let plan = executor.plan_select(&select, &catalog).unwrap();
1224
1225 match plan {
1227 QueryPlan::Limit { input, count, .. } => {
1228 assert_eq!(count, 10);
1229 match *input {
1230 QueryPlan::Sort { input, order_by } => {
1231 assert_eq!(order_by[0].0, "score");
1232 assert!(!order_by[0].1); match *input {
1234 QueryPlan::Project { input, columns } => {
1235 assert_eq!(columns, vec!["id", "name"]);
1236 match *input {
1237 QueryPlan::Filter { predicate, .. } => {
1238 assert_eq!(predicate.conditions.len(), 1);
1239 }
1240 _ => panic!("Expected Filter"),
1241 }
1242 }
1243 _ => panic!("Expected Project"),
1244 }
1245 }
1246 _ => panic!("Expected Sort"),
1247 }
1248 }
1249 _ => panic!("Expected Limit"),
1250 }
1251 }
1252
1253 #[test]
1254 fn test_predicate_evaluation() {
1255 let cond = PredicateCondition {
1256 column: "score".to_string(),
1257 operator: ComparisonOp::Gt,
1258 value: CoreSochValue::Float(80.0),
1259 };
1260
1261 let row_pass = SochRow::new(vec![
1262 CoreSochValue::UInt(1),
1263 CoreSochValue::Text("Alice".to_string()),
1264 CoreSochValue::Float(95.0),
1265 ]);
1266
1267 let row_fail = SochRow::new(vec![
1268 CoreSochValue::UInt(2),
1269 CoreSochValue::Text("Bob".to_string()),
1270 CoreSochValue::Float(75.0),
1271 ]);
1272
1273 assert!(cond.evaluate(&row_pass, 2));
1274 assert!(!cond.evaluate(&row_fail, 2));
1275 }
1276
1277 #[test]
1278 fn test_token_reduction() {
1279 let result = SochResult {
1281 table: "user_statistics".to_string(),
1282 columns: vec![
1283 "user_id".to_string(),
1284 "full_name".to_string(),
1285 "email_address".to_string(),
1286 "registration_date".to_string(),
1287 "last_login".to_string(),
1288 ],
1289 rows: (0..20)
1290 .map(|i| {
1291 vec![
1292 SochValue::UInt(i as u64),
1293 SochValue::Text(format!("User Number {}", i)),
1294 SochValue::Text(format!("user{}@example.com", i)),
1295 SochValue::Text("2024-01-15".to_string()),
1296 SochValue::Text("2024-03-20".to_string()),
1297 ]
1298 })
1299 .collect(),
1300 };
1301
1302 let stats = estimate_token_reduction(&result);
1303
1304 println!("JSON tokens: {}", stats.json_tokens);
1305 println!("TOON tokens: {}", stats.soch_tokens);
1306 println!("Reduction: {}%", stats.reduction_percent);
1307
1308 assert!(stats.soch_tokens < stats.json_tokens);
1310 assert!(stats.reduction_percent > 0); }
1312
1313 #[test]
1318 fn test_selection_vector_basic() {
1319 let sel = SelectionVector::all(100);
1320 assert_eq!(sel.len(), 100);
1321 assert!(!sel.is_empty());
1322 assert_eq!(sel.selectivity(), 1.0);
1323
1324 let empty = SelectionVector::empty();
1325 assert!(empty.is_empty());
1326 assert_eq!(empty.selectivity(), 0.0);
1327 }
1328
1329 #[test]
1330 fn test_selection_vector_filter() {
1331 let sel = SelectionVector::all(10);
1332
1333 let filtered = sel.filter(|i| i % 2 == 0);
1335 assert_eq!(filtered.len(), 5);
1336
1337 let indices: Vec<u32> = filtered.iter().collect();
1338 assert_eq!(indices, vec![0, 2, 4, 6, 8]);
1339 }
1340
1341 #[test]
1342 fn test_vectorized_int_filter() {
1343 let executor = VectorizedExecutor::new(1024);
1344
1345 let column = ColumnBatch::new(
1347 "value".to_string(),
1348 (0..10).map(CoreSochValue::Int).collect(),
1349 );
1350
1351 let predicates = vec![VectorPredicate::IntGt {
1352 col_idx: 0,
1353 threshold: 5,
1354 }];
1355
1356 let selection = executor.evaluate_batch(&[column], &predicates);
1357
1358 assert_eq!(selection.len(), 4);
1360 let indices: Vec<u32> = selection.iter().collect();
1361 assert_eq!(indices, vec![6, 7, 8, 9]);
1362 }
1363
1364 #[test]
1365 fn test_vectorized_multiple_predicates() {
1366 let executor = VectorizedExecutor::new(1024);
1367
1368 let id_col = ColumnBatch::new("id".to_string(), (0..100).map(CoreSochValue::Int).collect());
1370
1371 let status_col = ColumnBatch::new(
1372 "active".to_string(),
1373 (0..100).map(|i| CoreSochValue::Bool(i % 2 == 0)).collect(),
1374 );
1375
1376 let predicates = vec![
1377 VectorPredicate::IntGe {
1378 col_idx: 0,
1379 threshold: 50,
1380 },
1381 VectorPredicate::IntLt {
1382 col_idx: 0,
1383 threshold: 60,
1384 },
1385 VectorPredicate::BoolEq {
1386 col_idx: 1,
1387 value: true,
1388 },
1389 ];
1390
1391 let selection = executor.evaluate_batch(&[id_col, status_col], &predicates);
1392
1393 assert_eq!(selection.len(), 5);
1395 let indices: Vec<u32> = selection.iter().collect();
1396 assert_eq!(indices, vec![50, 52, 54, 56, 58]);
1397 }
1398
1399 #[test]
1400 fn test_vectorized_short_circuit() {
1401 let executor = VectorizedExecutor::new(1024);
1402
1403 let column = ColumnBatch::new(
1405 "value".to_string(),
1406 (0..100).map(|_| CoreSochValue::Int(-1)).collect(),
1407 );
1408
1409 let predicates = vec![
1411 VectorPredicate::IntGt {
1412 col_idx: 0,
1413 threshold: 0,
1414 },
1415 VectorPredicate::IntLt {
1417 col_idx: 0,
1418 threshold: 100,
1419 },
1420 VectorPredicate::IntEq {
1421 col_idx: 0,
1422 value: 50,
1423 },
1424 ];
1425
1426 let selection = executor.evaluate_batch(&[column], &predicates);
1427 assert!(selection.is_empty());
1428 }
1429
1430 #[test]
1431 fn test_vectorized_string_predicates() {
1432 let executor = VectorizedExecutor::new(1024);
1433
1434 let names = [
1435 "Alice", "Bob", "Carol", "Dave", "Alice", "Alice", "Bob", "Carol",
1436 ];
1437 let column = ColumnBatch::new(
1438 "name".to_string(),
1439 names
1440 .iter()
1441 .map(|s| CoreSochValue::Text(s.to_string()))
1442 .collect(),
1443 );
1444
1445 let predicates = vec![VectorPredicate::StrEq {
1446 col_idx: 0,
1447 value: "Alice".to_string(),
1448 }];
1449
1450 let selection = executor.evaluate_batch(&[column], &predicates);
1451
1452 assert_eq!(selection.len(), 3);
1454 let indices: Vec<u32> = selection.iter().collect();
1455 assert_eq!(indices, vec![0, 4, 5]);
1456 }
1457
1458 #[test]
1459 fn test_vectorized_null_handling() {
1460 let executor = VectorizedExecutor::new(1024);
1461
1462 let values = vec![
1463 CoreSochValue::Int(1),
1464 CoreSochValue::Null,
1465 CoreSochValue::Int(2),
1466 CoreSochValue::Null,
1467 CoreSochValue::Int(3),
1468 ];
1469 let column = ColumnBatch::new("value".to_string(), values);
1470
1471 let predicates = vec![VectorPredicate::IsNull { col_idx: 0 }];
1472 let null_selection = executor.evaluate_batch(std::slice::from_ref(&column), &predicates);
1473 assert_eq!(null_selection.len(), 2); let not_null_predicates = vec![VectorPredicate::IsNotNull { col_idx: 0 }];
1476 let not_null_selection = executor.evaluate_batch(&[column], ¬_null_predicates);
1477 assert_eq!(not_null_selection.len(), 3); }
1479
1480 #[test]
1481 fn test_row_to_columnar_conversion() {
1482 let executor = VectorizedExecutor::new(1024);
1483
1484 let rows = vec![
1485 SochRow::new(vec![
1486 CoreSochValue::Int(1),
1487 CoreSochValue::Text("Alice".to_string()),
1488 ]),
1489 SochRow::new(vec![
1490 CoreSochValue::Int(2),
1491 CoreSochValue::Text("Bob".to_string()),
1492 ]),
1493 SochRow::new(vec![
1494 CoreSochValue::Int(3),
1495 CoreSochValue::Text("Carol".to_string()),
1496 ]),
1497 ];
1498
1499 let column_names = vec!["id".to_string(), "name".to_string()];
1500 let columns = executor.row_to_columnar(&rows, &column_names);
1501
1502 assert_eq!(columns.len(), 2);
1503 assert_eq!(columns[0].name, "id");
1504 assert_eq!(columns[1].name, "name");
1505 assert_eq!(columns[0].len(), 3);
1506 assert_eq!(columns[1].len(), 3);
1507 }
1508
1509 #[test]
1510 fn test_materialize_selected_rows() {
1511 let executor = VectorizedExecutor::new(1024);
1512
1513 let id_col = ColumnBatch::new(
1514 "id".to_string(),
1515 vec![
1516 CoreSochValue::Int(1),
1517 CoreSochValue::Int(2),
1518 CoreSochValue::Int(3),
1519 ],
1520 );
1521 let name_col = ColumnBatch::new(
1522 "name".to_string(),
1523 vec![
1524 CoreSochValue::Text("Alice".to_string()),
1525 CoreSochValue::Text("Bob".to_string()),
1526 CoreSochValue::Text("Carol".to_string()),
1527 ],
1528 );
1529
1530 let selection = SelectionVector::from_indices(vec![0, 2], 3);
1532
1533 let rows = executor.materialize(&[id_col, name_col], &selection);
1534
1535 assert_eq!(rows.len(), 2);
1536 assert_eq!(rows[0].values[0], CoreSochValue::Int(1));
1537 assert_eq!(rows[0].values[1], CoreSochValue::Text("Alice".to_string()));
1538 assert_eq!(rows[1].values[0], CoreSochValue::Int(3));
1539 assert_eq!(rows[1].values[1], CoreSochValue::Text("Carol".to_string()));
1540 }
1541}