use crate::soch_ql::{
ComparisonOp, LogicalOp, SelectQuery, SochQlParser, SochQuery, SochResult, SochValue,
SortDirection, WhereClause,
};
#[cfg(test)]
use crate::soch_ql::{Condition, OrderBy};
use sochdb_core::{Catalog, Result, SochDBError, SochRow, SochValue as CoreSochValue};
#[cfg(test)]
use sochdb_core::{SochSchema, SochType};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub enum QueryPlan {
TableScan {
table: String,
columns: Vec<String>,
predicate: Option<Box<QueryPlan>>,
},
IndexSeek { index: String, key_range: KeyRange },
Filter {
input: Box<QueryPlan>,
predicate: Predicate,
},
Project {
input: Box<QueryPlan>,
columns: Vec<String>,
},
Sort {
input: Box<QueryPlan>,
order_by: Vec<(String, bool)>, },
Limit {
input: Box<QueryPlan>,
count: usize,
offset: usize,
},
Empty,
}
#[derive(Debug, Clone)]
pub struct KeyRange {
pub start: Option<SochValue>,
pub end: Option<SochValue>,
pub inclusive_start: bool,
pub inclusive_end: bool,
}
impl KeyRange {
pub fn all() -> Self {
Self {
start: None,
end: None,
inclusive_start: true,
inclusive_end: true,
}
}
pub fn eq(value: SochValue) -> Self {
Self {
start: Some(value.clone()),
end: Some(value),
inclusive_start: true,
inclusive_end: true,
}
}
}
#[derive(Debug, Clone)]
pub struct Predicate {
pub conditions: Vec<PredicateCondition>,
pub operator: LogicalOp,
}
#[derive(Debug, Clone)]
pub struct PredicateCondition {
pub column: String,
pub operator: ComparisonOp,
pub value: CoreSochValue,
}
impl PredicateCondition {
pub fn from_soch_ql(column: String, operator: ComparisonOp, value: &SochValue) -> Self {
Self {
column,
operator,
value: Self::convert_value(value),
}
}
fn convert_value(v: &SochValue) -> CoreSochValue {
match v {
SochValue::Int(i) => CoreSochValue::Int(*i),
SochValue::UInt(u) => CoreSochValue::UInt(*u),
SochValue::Float(f) => CoreSochValue::Float(*f),
SochValue::Text(s) => CoreSochValue::Text(s.clone()),
SochValue::Bool(b) => CoreSochValue::Bool(*b),
SochValue::Null => CoreSochValue::Null,
SochValue::Binary(b) => CoreSochValue::Binary(b.clone()),
SochValue::Array(arr) => {
CoreSochValue::Array(arr.iter().map(Self::convert_value).collect())
}
}
}
pub fn evaluate(&self, row: &SochRow, column_idx: usize) -> bool {
if column_idx >= row.values.len() {
return false;
}
let row_value = &row.values[column_idx];
match self.operator {
ComparisonOp::Eq => row_value == &self.value,
ComparisonOp::Ne => row_value != &self.value,
ComparisonOp::Lt => {
Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Less)
}
ComparisonOp::Le => matches!(
Self::compare(row_value, &self.value),
Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
),
ComparisonOp::Gt => {
Self::compare(row_value, &self.value) == Some(std::cmp::Ordering::Greater)
}
ComparisonOp::Ge => matches!(
Self::compare(row_value, &self.value),
Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
),
ComparisonOp::Like => Self::like_match(row_value, &self.value),
ComparisonOp::In => Self::in_match(row_value, &self.value),
ComparisonOp::SimilarTo => {
Self::like_match(row_value, &self.value)
}
}
}
fn compare(a: &CoreSochValue, b: &CoreSochValue) -> Option<std::cmp::Ordering> {
match (a, b) {
(CoreSochValue::Int(a), CoreSochValue::Int(b)) => Some(a.cmp(b)),
(CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => Some(a.cmp(b)),
(CoreSochValue::Float(a), CoreSochValue::Float(b)) => a.partial_cmp(b),
(CoreSochValue::Text(a), CoreSochValue::Text(b)) => Some(a.cmp(b)),
_ => None,
}
}
fn like_match(value: &CoreSochValue, pattern: &CoreSochValue) -> bool {
match (value, pattern) {
(CoreSochValue::Text(v), CoreSochValue::Text(p)) => {
crate::like::like_match(v, p)
}
_ => false,
}
}
fn in_match(value: &CoreSochValue, list: &CoreSochValue) -> bool {
match list {
CoreSochValue::Array(values) => values.iter().any(|v| value == v),
_ => value == list, }
}
}
impl Predicate {
pub fn evaluate(&self, row: &SochRow, column_map: &HashMap<String, usize>) -> bool {
let results: Vec<bool> = self
.conditions
.iter()
.map(|cond| {
column_map
.get(&cond.column)
.map(|&idx| cond.evaluate(row, idx))
.unwrap_or(false)
})
.collect();
match self.operator {
LogicalOp::And => results.iter().all(|&r| r),
LogicalOp::Or => results.iter().any(|&r| r),
}
}
}
pub struct SochQlExecutor {
storage: Option<std::sync::Arc<dyn crate::optimizer_integration::StorageBackend>>,
}
impl SochQlExecutor {
pub fn new() -> Self {
Self { storage: None }
}
pub fn with_storage(
storage: std::sync::Arc<dyn crate::optimizer_integration::StorageBackend>,
) -> Self {
Self {
storage: Some(storage),
}
}
pub fn execute(&self, query: &str, catalog: &Catalog) -> Result<SochResult> {
let parsed = SochQlParser::parse(query)
.map_err(|e| SochDBError::InvalidArgument(format!("Parse error: {:?}", e)))?;
self.validate(&parsed, catalog)?;
let plan = self.plan(&parsed, catalog)?;
self.execute_plan(&plan, catalog)
}
pub fn validate(&self, query: &SochQuery, catalog: &Catalog) -> Result<()> {
match query {
SochQuery::Select(select) => {
if catalog.get_table(&select.table).is_none() {
return Err(SochDBError::NotFound(format!(
"Table '{}' not found",
select.table
)));
}
if let Some(entry) = catalog.get_table(&select.table)
&& let Some(schema) = &entry.schema
{
for col in &select.columns {
if col != "*" && !schema.fields.iter().any(|f| &f.name == col) {
return Err(SochDBError::InvalidArgument(format!(
"Column '{}' not found in table '{}'",
col, select.table
)));
}
}
}
Ok(())
}
SochQuery::Insert(insert) => {
if catalog.get_table(&insert.table).is_none() {
return Err(SochDBError::NotFound(format!(
"Table '{}' not found",
insert.table
)));
}
Ok(())
}
SochQuery::CreateTable(create) => {
if catalog.get_table(&create.table).is_some() {
return Err(SochDBError::InvalidArgument(format!(
"Table '{}' already exists",
create.table
)));
}
Ok(())
}
SochQuery::DropTable { table } => {
if catalog.get_table(table).is_none() {
return Err(SochDBError::NotFound(format!(
"Table '{}' not found",
table
)));
}
Ok(())
}
}
}
pub fn plan(&self, query: &SochQuery, catalog: &Catalog) -> Result<QueryPlan> {
match query {
SochQuery::Select(select) => self.plan_select(select, catalog),
_ => Err(SochDBError::InvalidArgument(
"Only SELECT queries can be planned".to_string(),
)),
}
}
fn plan_select(&self, select: &SelectQuery, _catalog: &Catalog) -> Result<QueryPlan> {
let mut plan = QueryPlan::TableScan {
table: select.table.clone(),
columns: select.columns.clone(),
predicate: None,
};
if let Some(where_clause) = &select.where_clause {
let predicate = self.build_predicate(where_clause);
plan = QueryPlan::Filter {
input: Box::new(plan),
predicate,
};
}
if !select.columns.contains(&"*".to_string()) {
plan = QueryPlan::Project {
input: Box::new(plan),
columns: select.columns.clone(),
};
}
if let Some(order_by) = &select.order_by {
plan = QueryPlan::Sort {
input: Box::new(plan),
order_by: vec![(
order_by.column.clone(),
matches!(order_by.direction, SortDirection::Asc),
)],
};
}
if select.limit.is_some() || select.offset.is_some() {
plan = QueryPlan::Limit {
input: Box::new(plan),
count: select.limit.unwrap_or(usize::MAX),
offset: select.offset.unwrap_or(0),
};
}
Ok(plan)
}
fn build_predicate(&self, where_clause: &WhereClause) -> Predicate {
Predicate {
conditions: where_clause
.conditions
.iter()
.map(|c| PredicateCondition::from_soch_ql(c.column.clone(), c.operator, &c.value))
.collect(),
operator: where_clause.operator,
}
}
#[allow(clippy::only_used_in_recursion)]
pub fn execute_plan(&self, plan: &QueryPlan, catalog: &Catalog) -> Result<SochResult> {
match plan {
QueryPlan::Empty => Ok(SochResult {
table: "result".to_string(),
columns: vec![],
rows: vec![],
}),
QueryPlan::TableScan { table, columns, .. } => {
let schema_columns = if let Some(entry) = catalog.get_table(table) {
if let Some(schema) = &entry.schema {
if columns.contains(&"*".to_string()) {
schema.fields.iter().map(|f| f.name.clone()).collect()
} else {
columns.clone()
}
} else {
columns.clone()
}
} else {
columns.clone()
};
let rows = if let Some(storage) = &self.storage {
let raw_rows = storage.table_scan(table, &schema_columns, None)?;
raw_rows
.into_iter()
.map(|row| {
schema_columns
.iter()
.map(|c| row.get(c).cloned().unwrap_or(SochValue::Null))
.collect()
})
.collect()
} else {
vec![] };
Ok(SochResult {
table: table.clone(),
columns: schema_columns,
rows,
})
}
QueryPlan::Filter { input, predicate } => {
let mut result = self.execute_plan(input, catalog)?;
let col_map: HashMap<String, usize> = result
.columns
.iter()
.enumerate()
.map(|(i, c)| (c.clone(), i))
.collect();
result.rows.retain(|row| {
let matches: Vec<bool> = predicate
.conditions
.iter()
.map(|cond| {
if let Some(&idx) = col_map.get(&cond.column) {
if idx < row.len() {
Self::eval_predicate_condition(cond, &row[idx])
} else {
false
}
} else {
false
}
})
.collect();
match predicate.operator {
LogicalOp::And => matches.iter().all(|&m| m),
LogicalOp::Or => matches.iter().any(|&m| m),
}
});
Ok(result)
}
QueryPlan::Project { input, columns } => {
let mut result = self.execute_plan(input, catalog)?;
let col_map: HashMap<String, usize> = result
.columns
.iter()
.enumerate()
.map(|(i, c)| (c.clone(), i))
.collect();
result.rows = result
.rows
.into_iter()
.map(|row| {
columns
.iter()
.map(|c| {
col_map
.get(c)
.and_then(|&i| row.get(i).cloned())
.unwrap_or(SochValue::Null)
})
.collect()
})
.collect();
result.columns = columns.clone();
Ok(result)
}
QueryPlan::Sort { input, order_by } => {
let mut result = self.execute_plan(input, catalog)?;
let col_map: HashMap<String, usize> = result
.columns
.iter()
.enumerate()
.map(|(i, c)| (c.clone(), i))
.collect();
result.rows.sort_by(|a, b| {
for (col, ascending) in order_by {
if let Some(&idx) = col_map.get(col) {
let va = a.get(idx);
let vb = b.get(idx);
let cmp = Self::compare_soch_values(va, vb);
let cmp = if *ascending { cmp } else { cmp.reverse() };
if cmp != std::cmp::Ordering::Equal {
return cmp;
}
}
}
std::cmp::Ordering::Equal
});
Ok(result)
}
QueryPlan::Limit {
input,
count,
offset,
} => {
let mut result = self.execute_plan(input, catalog)?;
result.rows = result.rows.into_iter().skip(*offset).take(*count).collect();
Ok(result)
}
QueryPlan::IndexSeek { .. } => {
Ok(SochResult {
table: "result".to_string(),
columns: vec![],
rows: vec![],
})
}
}
}
fn compare_soch_values(a: Option<&SochValue>, b: Option<&SochValue>) -> std::cmp::Ordering {
match (a, b) {
(None, None) => std::cmp::Ordering::Equal,
(None, Some(_)) => std::cmp::Ordering::Less,
(Some(_), None) => std::cmp::Ordering::Greater,
(Some(a), Some(b)) => match (a, b) {
(SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
(SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
(SochValue::Float(a), SochValue::Float(b)) => {
a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
}
(SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
(SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
_ => std::cmp::Ordering::Equal,
},
}
}
fn eval_predicate_condition(cond: &PredicateCondition, value: &SochValue) -> bool {
let core_val = PredicateCondition::convert_value(value);
match cond.operator {
ComparisonOp::Eq => core_val == cond.value,
ComparisonOp::Ne => core_val != cond.value,
ComparisonOp::Lt => {
PredicateCondition::compare(&core_val, &cond.value)
== Some(std::cmp::Ordering::Less)
}
ComparisonOp::Le => matches!(
PredicateCondition::compare(&core_val, &cond.value),
Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
),
ComparisonOp::Gt => {
PredicateCondition::compare(&core_val, &cond.value)
== Some(std::cmp::Ordering::Greater)
}
ComparisonOp::Ge => matches!(
PredicateCondition::compare(&core_val, &cond.value),
Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
),
ComparisonOp::Like => PredicateCondition::like_match(&core_val, &cond.value),
ComparisonOp::In => PredicateCondition::in_match(&core_val, &cond.value),
ComparisonOp::SimilarTo => PredicateCondition::like_match(&core_val, &cond.value),
}
}
}
impl Default for SochQlExecutor {
fn default() -> Self {
Self { storage: None }
}
}
pub fn execute_sochql(query: &str, catalog: &Catalog) -> Result<SochResult> {
SochQlExecutor::new().execute(query, catalog)
}
pub fn estimate_token_reduction(result: &SochResult) -> TokenReductionStats {
let row_count = result.rows.len();
let col_count = result.columns.len();
if row_count == 0 || col_count == 0 {
return TokenReductionStats::default();
}
let avg_col_name_len: usize = result.columns.iter().map(|c| c.len()).sum::<usize>() / col_count;
let avg_value_len = 10;
let json_tokens = 2 + row_count * (2 + col_count * (avg_col_name_len + 4 + avg_value_len));
let header_tokens = result.table.len() + 10 + result.columns.join(",").len();
let soch_tokens = header_tokens + row_count * (col_count * avg_value_len + col_count);
let reduction = 1.0 - (soch_tokens as f64 / json_tokens as f64);
TokenReductionStats {
json_tokens,
soch_tokens,
reduction_percent: (reduction * 100.0) as u32,
row_count,
col_count,
}
}
#[derive(Debug, Clone, Default)]
pub struct TokenReductionStats {
pub json_tokens: usize,
pub soch_tokens: usize,
pub reduction_percent: u32,
pub row_count: usize,
pub col_count: usize,
}
#[derive(Debug, Clone)]
pub struct SelectionVector {
indices: Vec<u32>,
batch_size: usize,
}
impl SelectionVector {
pub fn all(batch_size: usize) -> Self {
Self {
indices: (0..batch_size as u32).collect(),
batch_size,
}
}
pub fn empty() -> Self {
Self {
indices: Vec::new(),
batch_size: 0,
}
}
pub fn from_indices(indices: Vec<u32>, batch_size: usize) -> Self {
Self {
indices,
batch_size,
}
}
#[inline]
pub fn is_empty(&self) -> bool {
self.indices.is_empty()
}
#[inline]
pub fn len(&self) -> usize {
self.indices.len()
}
#[inline]
pub fn batch_size(&self) -> usize {
self.batch_size
}
#[inline]
pub fn selectivity(&self) -> f64 {
if self.batch_size == 0 {
0.0
} else {
self.len() as f64 / self.batch_size as f64
}
}
pub fn iter(&self) -> impl Iterator<Item = u32> + '_ {
self.indices.iter().copied()
}
pub fn filter<F>(&self, pred: F) -> Self
where
F: Fn(u32) -> bool,
{
Self {
indices: self.indices.iter().copied().filter(|&i| pred(i)).collect(),
batch_size: self.batch_size,
}
}
pub fn extend_masked(&mut self, start_idx: usize, mask: u16) {
for bit in 0..16 {
if (mask >> bit) & 1 == 1 {
self.indices.push((start_idx + bit) as u32);
}
}
}
}
#[derive(Debug, Clone)]
pub struct ColumnBatch {
pub values: Vec<CoreSochValue>,
pub name: String,
}
impl ColumnBatch {
pub fn new(name: String, values: Vec<CoreSochValue>) -> Self {
Self { values, name }
}
#[inline]
pub fn get(&self, idx: usize) -> Option<&CoreSochValue> {
self.values.get(idx)
}
#[allow(dead_code)]
pub fn as_i64_slice(&self) -> Option<Vec<i64>> {
self.values
.iter()
.map(|v| match v {
CoreSochValue::Int(i) => Some(*i),
CoreSochValue::UInt(u) => Some(*u as i64),
_ => None,
})
.collect()
}
pub fn len(&self) -> usize {
self.values.len()
}
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
}
#[derive(Debug, Clone)]
pub enum VectorPredicate {
IntGt { col_idx: usize, threshold: i64 },
IntLt { col_idx: usize, threshold: i64 },
IntEq { col_idx: usize, value: i64 },
IntGe { col_idx: usize, threshold: i64 },
IntLe { col_idx: usize, threshold: i64 },
StrEq { col_idx: usize, value: String },
StrPrefix { col_idx: usize, prefix: String },
BoolEq { col_idx: usize, value: bool },
IsNull { col_idx: usize },
IsNotNull { col_idx: usize },
}
pub struct VectorizedExecutor {
batch_size: usize,
}
impl VectorizedExecutor {
pub fn new(batch_size: usize) -> Self {
Self { batch_size }
}
pub fn default_batch_size() -> usize {
1024
}
pub fn evaluate_batch(
&self,
columns: &[ColumnBatch],
predicates: &[VectorPredicate],
) -> SelectionVector {
if columns.is_empty() {
return SelectionVector::empty();
}
let batch_size = columns[0].len().min(self.batch_size);
let mut selection = SelectionVector::all(batch_size);
for predicate in predicates {
if selection.is_empty() {
break; }
selection = match predicate {
VectorPredicate::IntGt { col_idx, threshold } => {
self.filter_int_gt(&columns[*col_idx], *threshold, &selection)
}
VectorPredicate::IntLt { col_idx, threshold } => {
self.filter_int_lt(&columns[*col_idx], *threshold, &selection)
}
VectorPredicate::IntEq { col_idx, value } => {
self.filter_int_eq(&columns[*col_idx], *value, &selection)
}
VectorPredicate::IntGe { col_idx, threshold } => {
self.filter_int_ge(&columns[*col_idx], *threshold, &selection)
}
VectorPredicate::IntLe { col_idx, threshold } => {
self.filter_int_le(&columns[*col_idx], *threshold, &selection)
}
VectorPredicate::StrEq { col_idx, value } => {
self.filter_str_eq(&columns[*col_idx], value, &selection)
}
VectorPredicate::StrPrefix { col_idx, prefix } => {
self.filter_str_prefix(&columns[*col_idx], prefix, &selection)
}
VectorPredicate::BoolEq { col_idx, value } => {
self.filter_bool_eq(&columns[*col_idx], *value, &selection)
}
VectorPredicate::IsNull { col_idx } => {
self.filter_is_null(&columns[*col_idx], &selection)
}
VectorPredicate::IsNotNull { col_idx } => {
self.filter_is_not_null(&columns[*col_idx], &selection)
}
};
}
selection
}
#[inline]
fn filter_int_gt(
&self,
column: &ColumnBatch,
threshold: i64,
selection: &SelectionVector,
) -> SelectionVector {
selection.filter(|idx| match column.get(idx as usize) {
Some(CoreSochValue::Int(v)) => *v > threshold,
Some(CoreSochValue::UInt(v)) => (*v as i64) > threshold,
_ => false,
})
}
#[inline]
fn filter_int_lt(
&self,
column: &ColumnBatch,
threshold: i64,
selection: &SelectionVector,
) -> SelectionVector {
selection.filter(|idx| match column.get(idx as usize) {
Some(CoreSochValue::Int(v)) => *v < threshold,
Some(CoreSochValue::UInt(v)) => (*v as i64) < threshold,
_ => false,
})
}
#[inline]
fn filter_int_eq(
&self,
column: &ColumnBatch,
value: i64,
selection: &SelectionVector,
) -> SelectionVector {
selection.filter(|idx| match column.get(idx as usize) {
Some(CoreSochValue::Int(v)) => *v == value,
Some(CoreSochValue::UInt(v)) => (*v as i64) == value,
_ => false,
})
}
#[inline]
fn filter_int_ge(
&self,
column: &ColumnBatch,
threshold: i64,
selection: &SelectionVector,
) -> SelectionVector {
selection.filter(|idx| match column.get(idx as usize) {
Some(CoreSochValue::Int(v)) => *v >= threshold,
Some(CoreSochValue::UInt(v)) => (*v as i64) >= threshold,
_ => false,
})
}
#[inline]
fn filter_int_le(
&self,
column: &ColumnBatch,
threshold: i64,
selection: &SelectionVector,
) -> SelectionVector {
selection.filter(|idx| match column.get(idx as usize) {
Some(CoreSochValue::Int(v)) => *v <= threshold,
Some(CoreSochValue::UInt(v)) => (*v as i64) <= threshold,
_ => false,
})
}
#[inline]
fn filter_str_eq(
&self,
column: &ColumnBatch,
value: &str,
selection: &SelectionVector,
) -> SelectionVector {
selection.filter(|idx| match column.get(idx as usize) {
Some(CoreSochValue::Text(s)) => s == value,
_ => false,
})
}
#[inline]
fn filter_str_prefix(
&self,
column: &ColumnBatch,
prefix: &str,
selection: &SelectionVector,
) -> SelectionVector {
selection.filter(|idx| match column.get(idx as usize) {
Some(CoreSochValue::Text(s)) => s.starts_with(prefix),
_ => false,
})
}
#[inline]
fn filter_bool_eq(
&self,
column: &ColumnBatch,
value: bool,
selection: &SelectionVector,
) -> SelectionVector {
selection.filter(|idx| match column.get(idx as usize) {
Some(CoreSochValue::Bool(b)) => *b == value,
_ => false,
})
}
#[inline]
fn filter_is_null(&self, column: &ColumnBatch, selection: &SelectionVector) -> SelectionVector {
selection.filter(|idx| matches!(column.get(idx as usize), Some(CoreSochValue::Null)))
}
#[inline]
fn filter_is_not_null(
&self,
column: &ColumnBatch,
selection: &SelectionVector,
) -> SelectionVector {
selection
.filter(|idx| !matches!(column.get(idx as usize), Some(CoreSochValue::Null) | None))
}
pub fn materialize(
&self,
columns: &[ColumnBatch],
selection: &SelectionVector,
) -> Vec<SochRow> {
selection
.iter()
.map(|idx| {
let values: Vec<CoreSochValue> = columns
.iter()
.map(|col| {
col.get(idx as usize)
.cloned()
.unwrap_or(CoreSochValue::Null)
})
.collect();
SochRow::new(values)
})
.collect()
}
pub fn row_to_columnar(&self, rows: &[SochRow], column_names: &[String]) -> Vec<ColumnBatch> {
if rows.is_empty() || column_names.is_empty() {
return vec![];
}
let num_cols = column_names.len().min(rows[0].values.len());
(0..num_cols)
.map(|col_idx| {
let values: Vec<CoreSochValue> = rows
.iter()
.map(|row| {
row.values
.get(col_idx)
.cloned()
.unwrap_or(CoreSochValue::Null)
})
.collect();
ColumnBatch::new(column_names[col_idx].clone(), values)
})
.collect()
}
}
impl Default for VectorizedExecutor {
fn default() -> Self {
Self::new(Self::default_batch_size())
}
}
#[derive(Debug, Clone, Default)]
pub struct VectorizedStats {
pub rows_processed: usize,
pub rows_selected: usize,
pub predicates_evaluated: usize,
pub short_circuits: usize,
pub time_us: u64,
}
impl VectorizedStats {
pub fn selectivity(&self) -> f64 {
if self.rows_processed == 0 {
0.0
} else {
self.rows_selected as f64 / self.rows_processed as f64
}
}
pub fn rows_per_sec(&self) -> f64 {
if self.time_us == 0 {
0.0
} else {
self.rows_processed as f64 / (self.time_us as f64 / 1_000_000.0)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_catalog() -> Catalog {
let mut catalog = Catalog::new("test_db");
let schema = SochSchema::new("users")
.field("id", SochType::UInt)
.field("name", SochType::Text)
.field("score", SochType::Float);
catalog.create_table(schema, 1).unwrap();
catalog
}
#[test]
fn test_validate_select() {
let catalog = test_catalog();
let executor = SochQlExecutor::new();
let query = SochQuery::Select(SelectQuery {
columns: vec!["id".to_string(), "name".to_string()],
table: "users".to_string(),
where_clause: None,
order_by: None,
limit: None,
offset: None,
});
assert!(executor.validate(&query, &catalog).is_ok());
}
#[test]
fn test_validate_nonexistent_table() {
let catalog = test_catalog();
let executor = SochQlExecutor::new();
let query = SochQuery::Select(SelectQuery {
columns: vec!["*".to_string()],
table: "nonexistent".to_string(),
where_clause: None,
order_by: None,
limit: None,
offset: None,
});
assert!(executor.validate(&query, &catalog).is_err());
}
#[test]
fn test_plan_select() {
let catalog = test_catalog();
let executor = SochQlExecutor::new();
let select = SelectQuery {
columns: vec!["id".to_string(), "name".to_string()],
table: "users".to_string(),
where_clause: Some(WhereClause {
conditions: vec![Condition {
column: "score".to_string(),
operator: ComparisonOp::Gt,
value: SochValue::Float(80.0),
}],
operator: LogicalOp::And,
}),
order_by: Some(OrderBy {
column: "score".to_string(),
direction: SortDirection::Desc,
}),
limit: Some(10),
offset: None,
};
let plan = executor.plan_select(&select, &catalog).unwrap();
match plan {
QueryPlan::Limit { input, count, .. } => {
assert_eq!(count, 10);
match *input {
QueryPlan::Sort { input, order_by } => {
assert_eq!(order_by[0].0, "score");
assert!(!order_by[0].1); match *input {
QueryPlan::Project { input, columns } => {
assert_eq!(columns, vec!["id", "name"]);
match *input {
QueryPlan::Filter { predicate, .. } => {
assert_eq!(predicate.conditions.len(), 1);
}
_ => panic!("Expected Filter"),
}
}
_ => panic!("Expected Project"),
}
}
_ => panic!("Expected Sort"),
}
}
_ => panic!("Expected Limit"),
}
}
#[test]
fn test_predicate_evaluation() {
let cond = PredicateCondition {
column: "score".to_string(),
operator: ComparisonOp::Gt,
value: CoreSochValue::Float(80.0),
};
let row_pass = SochRow::new(vec![
CoreSochValue::UInt(1),
CoreSochValue::Text("Alice".to_string()),
CoreSochValue::Float(95.0),
]);
let row_fail = SochRow::new(vec![
CoreSochValue::UInt(2),
CoreSochValue::Text("Bob".to_string()),
CoreSochValue::Float(75.0),
]);
assert!(cond.evaluate(&row_pass, 2));
assert!(!cond.evaluate(&row_fail, 2));
}
#[test]
fn test_token_reduction() {
let result = SochResult {
table: "user_statistics".to_string(),
columns: vec![
"user_id".to_string(),
"full_name".to_string(),
"email_address".to_string(),
"registration_date".to_string(),
"last_login".to_string(),
],
rows: (0..20)
.map(|i| {
vec![
SochValue::UInt(i as u64),
SochValue::Text(format!("User Number {}", i)),
SochValue::Text(format!("user{}@example.com", i)),
SochValue::Text("2024-01-15".to_string()),
SochValue::Text("2024-03-20".to_string()),
]
})
.collect(),
};
let stats = estimate_token_reduction(&result);
println!("JSON tokens: {}", stats.json_tokens);
println!("TOON tokens: {}", stats.soch_tokens);
println!("Reduction: {}%", stats.reduction_percent);
assert!(stats.soch_tokens < stats.json_tokens);
assert!(stats.reduction_percent > 0); }
#[test]
fn test_selection_vector_basic() {
let sel = SelectionVector::all(100);
assert_eq!(sel.len(), 100);
assert!(!sel.is_empty());
assert_eq!(sel.selectivity(), 1.0);
let empty = SelectionVector::empty();
assert!(empty.is_empty());
assert_eq!(empty.selectivity(), 0.0);
}
#[test]
fn test_selection_vector_filter() {
let sel = SelectionVector::all(10);
let filtered = sel.filter(|i| i % 2 == 0);
assert_eq!(filtered.len(), 5);
let indices: Vec<u32> = filtered.iter().collect();
assert_eq!(indices, vec![0, 2, 4, 6, 8]);
}
#[test]
fn test_vectorized_int_filter() {
let executor = VectorizedExecutor::new(1024);
let column = ColumnBatch::new(
"value".to_string(),
(0..10).map(CoreSochValue::Int).collect(),
);
let predicates = vec![VectorPredicate::IntGt {
col_idx: 0,
threshold: 5,
}];
let selection = executor.evaluate_batch(&[column], &predicates);
assert_eq!(selection.len(), 4);
let indices: Vec<u32> = selection.iter().collect();
assert_eq!(indices, vec![6, 7, 8, 9]);
}
#[test]
fn test_vectorized_multiple_predicates() {
let executor = VectorizedExecutor::new(1024);
let id_col = ColumnBatch::new("id".to_string(), (0..100).map(CoreSochValue::Int).collect());
let status_col = ColumnBatch::new(
"active".to_string(),
(0..100).map(|i| CoreSochValue::Bool(i % 2 == 0)).collect(),
);
let predicates = vec![
VectorPredicate::IntGe {
col_idx: 0,
threshold: 50,
},
VectorPredicate::IntLt {
col_idx: 0,
threshold: 60,
},
VectorPredicate::BoolEq {
col_idx: 1,
value: true,
},
];
let selection = executor.evaluate_batch(&[id_col, status_col], &predicates);
assert_eq!(selection.len(), 5);
let indices: Vec<u32> = selection.iter().collect();
assert_eq!(indices, vec![50, 52, 54, 56, 58]);
}
#[test]
fn test_vectorized_short_circuit() {
let executor = VectorizedExecutor::new(1024);
let column = ColumnBatch::new(
"value".to_string(),
(0..100).map(|_| CoreSochValue::Int(-1)).collect(),
);
let predicates = vec![
VectorPredicate::IntGt {
col_idx: 0,
threshold: 0,
},
VectorPredicate::IntLt {
col_idx: 0,
threshold: 100,
},
VectorPredicate::IntEq {
col_idx: 0,
value: 50,
},
];
let selection = executor.evaluate_batch(&[column], &predicates);
assert!(selection.is_empty());
}
#[test]
fn test_vectorized_string_predicates() {
let executor = VectorizedExecutor::new(1024);
let names = [
"Alice", "Bob", "Carol", "Dave", "Alice", "Alice", "Bob", "Carol",
];
let column = ColumnBatch::new(
"name".to_string(),
names
.iter()
.map(|s| CoreSochValue::Text(s.to_string()))
.collect(),
);
let predicates = vec![VectorPredicate::StrEq {
col_idx: 0,
value: "Alice".to_string(),
}];
let selection = executor.evaluate_batch(&[column], &predicates);
assert_eq!(selection.len(), 3);
let indices: Vec<u32> = selection.iter().collect();
assert_eq!(indices, vec![0, 4, 5]);
}
#[test]
fn test_vectorized_null_handling() {
let executor = VectorizedExecutor::new(1024);
let values = vec![
CoreSochValue::Int(1),
CoreSochValue::Null,
CoreSochValue::Int(2),
CoreSochValue::Null,
CoreSochValue::Int(3),
];
let column = ColumnBatch::new("value".to_string(), values);
let predicates = vec![VectorPredicate::IsNull { col_idx: 0 }];
let null_selection = executor.evaluate_batch(std::slice::from_ref(&column), &predicates);
assert_eq!(null_selection.len(), 2);
let not_null_predicates = vec![VectorPredicate::IsNotNull { col_idx: 0 }];
let not_null_selection = executor.evaluate_batch(&[column], ¬_null_predicates);
assert_eq!(not_null_selection.len(), 3); }
#[test]
fn test_row_to_columnar_conversion() {
let executor = VectorizedExecutor::new(1024);
let rows = vec![
SochRow::new(vec![
CoreSochValue::Int(1),
CoreSochValue::Text("Alice".to_string()),
]),
SochRow::new(vec![
CoreSochValue::Int(2),
CoreSochValue::Text("Bob".to_string()),
]),
SochRow::new(vec![
CoreSochValue::Int(3),
CoreSochValue::Text("Carol".to_string()),
]),
];
let column_names = vec!["id".to_string(), "name".to_string()];
let columns = executor.row_to_columnar(&rows, &column_names);
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].name, "id");
assert_eq!(columns[1].name, "name");
assert_eq!(columns[0].len(), 3);
assert_eq!(columns[1].len(), 3);
}
#[test]
fn test_materialize_selected_rows() {
let executor = VectorizedExecutor::new(1024);
let id_col = ColumnBatch::new(
"id".to_string(),
vec![
CoreSochValue::Int(1),
CoreSochValue::Int(2),
CoreSochValue::Int(3),
],
);
let name_col = ColumnBatch::new(
"name".to_string(),
vec![
CoreSochValue::Text("Alice".to_string()),
CoreSochValue::Text("Bob".to_string()),
CoreSochValue::Text("Carol".to_string()),
],
);
let selection = SelectionVector::from_indices(vec![0, 2], 3);
let rows = executor.materialize(&[id_col, name_col], &selection);
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].values[0], CoreSochValue::Int(1));
assert_eq!(rows[0].values[1], CoreSochValue::Text("Alice".to_string()));
assert_eq!(rows[1].values[0], CoreSochValue::Int(3));
assert_eq!(rows[1].values[1], CoreSochValue::Text("Carol".to_string()));
}
}