use std::ops::Bound;
use kimberlite_store::{Key, TableId};
use crate::parser::{AggregateFunction, HavingCondition};
use crate::schema::{ColumnDef, ColumnName};
use crate::value::Value;
#[derive(Debug, Clone)]
pub struct TableMetadata {
pub table_id: TableId,
pub table_name: String,
pub columns: Vec<ColumnDef>,
pub primary_key: Vec<ColumnName>,
}
#[derive(Debug, Clone)]
pub struct JoinCondition {
pub left_col_idx: usize,
pub right_col_idx: usize,
pub op: JoinOp,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinOp {
Eq,
Lt,
Le,
Gt,
Ge,
}
#[derive(Debug, Clone)]
pub enum QueryPlan {
PointLookup {
metadata: TableMetadata,
key: Key,
columns: Vec<usize>,
column_names: Vec<ColumnName>,
},
RangeScan {
metadata: TableMetadata,
start: Bound<Key>,
end: Bound<Key>,
filter: Option<Filter>,
limit: Option<usize>,
order: ScanOrder,
order_by: Option<SortSpec>,
columns: Vec<usize>,
column_names: Vec<ColumnName>,
},
IndexScan {
metadata: TableMetadata,
index_id: u64,
index_name: String,
start: Bound<Key>,
end: Bound<Key>,
filter: Option<Filter>,
limit: Option<usize>,
order: ScanOrder,
order_by: Option<SortSpec>,
columns: Vec<usize>,
column_names: Vec<ColumnName>,
},
TableScan {
metadata: TableMetadata,
filter: Option<Filter>,
limit: Option<usize>,
order: Option<SortSpec>,
columns: Vec<usize>,
column_names: Vec<ColumnName>,
},
Aggregate {
metadata: TableMetadata,
source: Box<QueryPlan>,
group_by_cols: Vec<usize>,
group_by_names: Vec<ColumnName>,
aggregates: Vec<AggregateFunction>,
column_names: Vec<ColumnName>,
having: Vec<HavingCondition>,
},
Join {
join_type: crate::parser::JoinType,
left: Box<QueryPlan>,
right: Box<QueryPlan>,
on_conditions: Vec<JoinCondition>,
columns: Vec<usize>,
column_names: Vec<ColumnName>,
},
Materialize {
source: Box<QueryPlan>,
filter: Option<Filter>,
case_columns: Vec<CaseColumnDef>,
order: Option<SortSpec>,
limit: Option<usize>,
column_names: Vec<ColumnName>,
},
}
#[derive(Debug, Clone)]
pub struct CaseColumnDef {
pub alias: ColumnName,
pub when_clauses: Vec<CaseWhenClause>,
pub else_value: crate::value::Value,
}
#[derive(Debug, Clone)]
pub struct CaseWhenClause {
pub condition: Filter,
pub result: crate::value::Value,
}
impl QueryPlan {
pub fn column_names(&self) -> &[ColumnName] {
match self {
QueryPlan::PointLookup { column_names, .. }
| QueryPlan::RangeScan { column_names, .. }
| QueryPlan::IndexScan { column_names, .. }
| QueryPlan::TableScan { column_names, .. }
| QueryPlan::Aggregate { column_names, .. }
| QueryPlan::Join { column_names, .. }
| QueryPlan::Materialize { column_names, .. } => column_names,
}
}
#[allow(dead_code)]
pub fn column_indices(&self) -> &[usize] {
match self {
QueryPlan::PointLookup { columns, .. }
| QueryPlan::RangeScan { columns, .. }
| QueryPlan::IndexScan { columns, .. }
| QueryPlan::TableScan { columns, .. }
| QueryPlan::Join { columns, .. } => columns,
QueryPlan::Aggregate { group_by_cols, .. } => group_by_cols,
QueryPlan::Materialize { .. } => &[],
}
}
pub fn table_name(&self) -> &str {
match self {
QueryPlan::PointLookup { metadata, .. }
| QueryPlan::RangeScan { metadata, .. }
| QueryPlan::IndexScan { metadata, .. }
| QueryPlan::TableScan { metadata, .. }
| QueryPlan::Aggregate { metadata, .. } => &metadata.table_name,
QueryPlan::Join { left, .. } | QueryPlan::Materialize { source: left, .. } => {
left.table_name()
}
}
}
pub fn metadata(&self) -> Option<&TableMetadata> {
match self {
QueryPlan::PointLookup { metadata, .. }
| QueryPlan::RangeScan { metadata, .. }
| QueryPlan::IndexScan { metadata, .. }
| QueryPlan::TableScan { metadata, .. }
| QueryPlan::Aggregate { metadata, .. } => Some(metadata),
QueryPlan::Join { .. } | QueryPlan::Materialize { .. } => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ScanOrder {
#[default]
Ascending,
Descending,
}
#[derive(Debug, Clone)]
pub struct SortSpec {
pub columns: Vec<(usize, ScanOrder)>,
}
#[derive(Debug, Clone)]
pub enum Filter {
Condition(FilterCondition),
And(Vec<Filter>),
Or(Vec<Filter>),
}
impl Filter {
pub fn single(condition: FilterCondition) -> Self {
Filter::Condition(condition)
}
pub fn and(filters: Vec<Filter>) -> Self {
assert!(
!filters.is_empty(),
"AND filter must have at least one condition"
);
if filters.len() == 1 {
return filters
.into_iter()
.next()
.expect("filter list verified to have exactly 1 element");
}
Filter::And(filters)
}
pub fn or(filters: Vec<Filter>) -> Self {
assert!(
!filters.is_empty(),
"OR filter must have at least one condition"
);
if filters.len() == 1 {
return filters
.into_iter()
.next()
.expect("filter list verified to have exactly 1 element");
}
Filter::Or(filters)
}
pub fn matches(&self, row: &[Value]) -> bool {
match self {
Filter::Condition(c) => c.matches(row),
Filter::And(filters) => filters.iter().all(|f| f.matches(row)),
Filter::Or(filters) => filters.iter().any(|f| f.matches(row)),
}
}
}
#[derive(Debug, Clone)]
pub struct FilterCondition {
pub column_idx: usize,
pub op: FilterOp,
pub value: Value,
}
impl FilterCondition {
pub fn matches(&self, row: &[Value]) -> bool {
debug_assert!(
self.column_idx < row.len(),
"column index {} must be within row bounds (len={})",
self.column_idx,
row.len()
);
let Some(cell) = row.get(self.column_idx) else {
return false;
};
match &self.op {
FilterOp::Eq => cell == &self.value,
FilterOp::Lt => cell.compare(&self.value) == Some(std::cmp::Ordering::Less),
FilterOp::Le => matches!(
cell.compare(&self.value),
Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
),
FilterOp::Gt => cell.compare(&self.value) == Some(std::cmp::Ordering::Greater),
FilterOp::Ge => matches!(
cell.compare(&self.value),
Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
),
FilterOp::In(values) => {
if values.contains(cell) {
return true;
}
values.iter().any(|v| match (cell, v) {
(Value::TinyInt(a), Value::SmallInt(b)) => i16::from(*a) == *b,
(Value::TinyInt(a), Value::Integer(b)) => i32::from(*a) == *b,
(Value::TinyInt(a), Value::BigInt(b)) => i64::from(*a) == *b,
(Value::SmallInt(a), Value::TinyInt(b)) => *a == i16::from(*b),
(Value::SmallInt(a), Value::Integer(b)) => i32::from(*a) == *b,
(Value::SmallInt(a), Value::BigInt(b)) => i64::from(*a) == *b,
(Value::Integer(a), Value::TinyInt(b)) => *a == i32::from(*b),
(Value::Integer(a), Value::SmallInt(b)) => *a == i32::from(*b),
(Value::Integer(a), Value::BigInt(b)) => i64::from(*a) == *b,
(Value::BigInt(a), Value::TinyInt(b)) => *a == i64::from(*b),
(Value::BigInt(a), Value::SmallInt(b)) => *a == i64::from(*b),
(Value::BigInt(a), Value::Integer(b)) => *a == i64::from(*b),
_ => false,
})
}
FilterOp::Like(pattern) => {
debug_assert!(!pattern.is_empty(), "LIKE pattern must not be empty");
kimberlite_properties::sometimes!(
matches!(cell, Value::Text(_)),
"query.like_pattern_evaluated",
"LIKE pattern evaluated against a Text value (iterative DP path exercised)"
);
match cell {
Value::Text(s) => matches_like_pattern(s, pattern),
_ => false,
}
}
FilterOp::IsNull => cell.is_null(),
FilterOp::IsNotNull => !cell.is_null(),
}
}
}
pub(crate) fn matches_like_pattern(text: &str, pattern: &str) -> bool {
debug_assert!(!pattern.is_empty(), "LIKE pattern must not be empty");
let text_chars: Vec<char> = text.chars().collect();
#[allow(clippy::items_after_statements)]
#[derive(Clone, Copy, PartialEq, Eq)]
enum Token {
Literal(char),
Any, One, }
let mut tokens: Vec<Token> = Vec::new();
let pat_chars: Vec<char> = pattern.chars().collect();
let mut pi = 0;
while pi < pat_chars.len() {
if pat_chars[pi] == '\\' && pi + 1 < pat_chars.len() {
let next = pat_chars[pi + 1];
if next == '%' || next == '_' {
tokens.push(Token::Literal(next));
pi += 2;
continue;
}
}
tokens.push(match pat_chars[pi] {
'%' => Token::Any,
'_' => Token::One,
c => Token::Literal(c),
});
pi += 1;
}
let t_len = text_chars.len();
let p_len = tokens.len();
let mut dp = vec![false; p_len + 1];
dp[0] = true;
for j in 1..=p_len {
if tokens[j - 1] == Token::Any {
dp[j] = dp[j - 1];
} else {
break;
}
}
for i in 1..=t_len {
let mut new_dp = vec![false; p_len + 1];
for j in 1..=p_len {
new_dp[j] = match tokens[j - 1] {
Token::Any => {
dp[j] || new_dp[j - 1]
}
Token::One => {
dp[j - 1]
}
Token::Literal(c) => {
dp[j - 1] && text_chars[i - 1] == c
}
};
}
dp = new_dp;
}
dp[p_len]
}
#[derive(Debug, Clone)]
pub enum FilterOp {
Eq,
Lt,
Le,
Gt,
Ge,
In(Vec<Value>),
Like(String),
IsNull,
IsNotNull,
}