use std::ops::Bound;
use std::sync::Arc;
use kimberlite_store::{Key, TableId};
use crate::expression::{EvalContext, ScalarExpr, evaluate};
use crate::parser::{AggregateFunction, HavingCondition, ScalarCmpOp};
use crate::schema::{ColumnDef, ColumnName};
use crate::value::Value;
#[derive(Debug, Clone)]
pub struct TableMetadata {
pub table_id: TableId,
pub table_name: String,
pub columns: Vec<ColumnDef>,
pub primary_key: Vec<ColumnName>,
}
#[derive(Debug, Clone)]
pub struct JoinCondition {
pub left_col_idx: usize,
pub right_col_idx: usize,
pub op: JoinOp,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinOp {
Eq,
Lt,
Le,
Gt,
Ge,
}
#[derive(Debug, Clone)]
pub enum QueryPlan {
PointLookup {
metadata: TableMetadata,
key: Key,
columns: Vec<usize>,
column_names: Vec<ColumnName>,
},
RangeScan {
metadata: TableMetadata,
start: Bound<Key>,
end: Bound<Key>,
filter: Option<Filter>,
limit: Option<usize>,
offset: Option<usize>,
order: ScanOrder,
order_by: Option<SortSpec>,
columns: Vec<usize>,
column_names: Vec<ColumnName>,
},
IndexScan {
metadata: TableMetadata,
index_id: u64,
index_name: String,
start: Bound<Key>,
end: Bound<Key>,
filter: Option<Filter>,
limit: Option<usize>,
offset: Option<usize>,
order: ScanOrder,
order_by: Option<SortSpec>,
columns: Vec<usize>,
column_names: Vec<ColumnName>,
},
TableScan {
metadata: TableMetadata,
filter: Option<Filter>,
limit: Option<usize>,
offset: Option<usize>,
order: Option<SortSpec>,
columns: Vec<usize>,
column_names: Vec<ColumnName>,
},
Aggregate {
metadata: TableMetadata,
source: Box<QueryPlan>,
group_by_cols: Vec<usize>,
group_by_names: Vec<ColumnName>,
aggregates: Vec<AggregateFunction>,
aggregate_filters: Vec<Option<Filter>>,
column_names: Vec<ColumnName>,
having: Vec<HavingCondition>,
},
Join {
join_type: crate::parser::JoinType,
left: Box<QueryPlan>,
right: Box<QueryPlan>,
on_conditions: Vec<JoinCondition>,
columns: Vec<usize>,
column_names: Vec<ColumnName>,
},
Materialize {
source: Box<QueryPlan>,
filter: Option<Filter>,
case_columns: Vec<CaseColumnDef>,
scalar_columns: Vec<ScalarColumnDef>,
order: Option<SortSpec>,
limit: Option<usize>,
offset: Option<usize>,
column_names: Vec<ColumnName>,
},
}
#[derive(Debug, Clone)]
pub struct CaseColumnDef {
pub alias: ColumnName,
pub when_clauses: Vec<CaseWhenClause>,
pub else_value: crate::value::Value,
}
#[derive(Debug, Clone)]
pub struct CaseWhenClause {
pub condition: Filter,
pub result: crate::value::Value,
}
#[derive(Debug, Clone)]
pub struct ScalarColumnDef {
pub output_name: ColumnName,
pub expr: ScalarExpr,
pub columns: Arc<[ColumnName]>,
}
impl QueryPlan {
pub fn column_names(&self) -> &[ColumnName] {
match self {
QueryPlan::PointLookup { column_names, .. }
| QueryPlan::RangeScan { column_names, .. }
| QueryPlan::IndexScan { column_names, .. }
| QueryPlan::TableScan { column_names, .. }
| QueryPlan::Aggregate { column_names, .. }
| QueryPlan::Join { column_names, .. }
| QueryPlan::Materialize { column_names, .. } => column_names,
}
}
#[allow(dead_code)]
pub fn column_indices(&self) -> &[usize] {
match self {
QueryPlan::PointLookup { columns, .. }
| QueryPlan::RangeScan { columns, .. }
| QueryPlan::IndexScan { columns, .. }
| QueryPlan::TableScan { columns, .. }
| QueryPlan::Join { columns, .. } => columns,
QueryPlan::Aggregate { group_by_cols, .. } => group_by_cols,
QueryPlan::Materialize { .. } => &[],
}
}
pub fn table_name(&self) -> &str {
match self {
QueryPlan::PointLookup { metadata, .. }
| QueryPlan::RangeScan { metadata, .. }
| QueryPlan::IndexScan { metadata, .. }
| QueryPlan::TableScan { metadata, .. }
| QueryPlan::Aggregate { metadata, .. } => &metadata.table_name,
QueryPlan::Join { left, .. } | QueryPlan::Materialize { source: left, .. } => {
left.table_name()
}
}
}
pub fn metadata(&self) -> Option<&TableMetadata> {
match self {
QueryPlan::PointLookup { metadata, .. }
| QueryPlan::RangeScan { metadata, .. }
| QueryPlan::IndexScan { metadata, .. }
| QueryPlan::TableScan { metadata, .. }
| QueryPlan::Aggregate { metadata, .. } => Some(metadata),
QueryPlan::Join { .. } | QueryPlan::Materialize { .. } => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ScanOrder {
#[default]
Ascending,
Descending,
}
#[derive(Debug, Clone)]
pub struct SortSpec {
pub columns: Vec<(usize, ScanOrder)>,
}
#[derive(Debug, Clone)]
pub enum Filter {
Condition(FilterCondition),
And(Vec<Filter>),
Or(Vec<Filter>),
}
impl Filter {
pub fn single(condition: FilterCondition) -> Self {
Filter::Condition(condition)
}
pub fn and(filters: Vec<Filter>) -> Self {
assert!(
!filters.is_empty(),
"AND filter must have at least one condition"
);
if filters.len() == 1 {
return filters
.into_iter()
.next()
.expect("filter list verified to have exactly 1 element");
}
Filter::And(filters)
}
pub fn or(filters: Vec<Filter>) -> Self {
assert!(
!filters.is_empty(),
"OR filter must have at least one condition"
);
if filters.len() == 1 {
return filters
.into_iter()
.next()
.expect("filter list verified to have exactly 1 element");
}
Filter::Or(filters)
}
pub fn matches(&self, row: &[Value]) -> bool {
match self {
Filter::Condition(c) => c.matches(row),
Filter::And(filters) => filters.iter().all(|f| f.matches(row)),
Filter::Or(filters) => filters.iter().any(|f| f.matches(row)),
}
}
}
#[derive(Debug, Clone)]
pub struct FilterCondition {
pub column_idx: usize,
pub op: FilterOp,
pub value: Value,
}
impl FilterCondition {
pub fn matches(&self, row: &[Value]) -> bool {
debug_assert!(
self.column_idx < row.len(),
"column index {} must be within row bounds (len={})",
self.column_idx,
row.len()
);
let Some(cell) = row.get(self.column_idx) else {
return false;
};
match &self.op {
FilterOp::Eq => cell == &self.value,
FilterOp::Lt => cell.compare(&self.value) == Some(std::cmp::Ordering::Less),
FilterOp::Le => matches!(
cell.compare(&self.value),
Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
),
FilterOp::Gt => cell.compare(&self.value) == Some(std::cmp::Ordering::Greater),
FilterOp::Ge => matches!(
cell.compare(&self.value),
Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
),
FilterOp::In(values) => {
if values.contains(cell) {
return true;
}
values.iter().any(|v| match (cell, v) {
(Value::TinyInt(a), Value::SmallInt(b)) => i16::from(*a) == *b,
(Value::TinyInt(a), Value::Integer(b)) => i32::from(*a) == *b,
(Value::TinyInt(a), Value::BigInt(b)) => i64::from(*a) == *b,
(Value::SmallInt(a), Value::TinyInt(b)) => *a == i16::from(*b),
(Value::SmallInt(a), Value::Integer(b)) => i32::from(*a) == *b,
(Value::SmallInt(a), Value::BigInt(b)) => i64::from(*a) == *b,
(Value::Integer(a), Value::TinyInt(b)) => *a == i32::from(*b),
(Value::Integer(a), Value::SmallInt(b)) => *a == i32::from(*b),
(Value::Integer(a), Value::BigInt(b)) => i64::from(*a) == *b,
(Value::BigInt(a), Value::TinyInt(b)) => *a == i64::from(*b),
(Value::BigInt(a), Value::SmallInt(b)) => *a == i64::from(*b),
(Value::BigInt(a), Value::Integer(b)) => *a == i64::from(*b),
_ => false,
})
}
FilterOp::Like(pattern) => {
debug_assert!(!pattern.is_empty(), "LIKE pattern must not be empty");
kimberlite_properties::sometimes!(
matches!(cell, Value::Text(_)),
"query.like_pattern_evaluated",
"LIKE pattern evaluated against a Text value (iterative DP path exercised)"
);
match cell {
Value::Text(s) => matches_like_pattern(s, pattern),
_ => false,
}
}
FilterOp::NotLike(pattern) => {
debug_assert!(!pattern.is_empty(), "NOT LIKE pattern must not be empty");
match cell {
Value::Text(s) => !matches_like_pattern(s, pattern),
_ => false,
}
}
FilterOp::ILike(pattern) => {
debug_assert!(!pattern.is_empty(), "ILIKE pattern must not be empty");
match cell {
Value::Text(s) => {
let s_folded = s.to_lowercase();
let p_folded = pattern.to_lowercase();
matches_like_pattern(&s_folded, &p_folded)
}
_ => false,
}
}
FilterOp::NotILike(pattern) => {
debug_assert!(!pattern.is_empty(), "NOT ILIKE pattern must not be empty");
match cell {
Value::Text(s) => {
let s_folded = s.to_lowercase();
let p_folded = pattern.to_lowercase();
!matches_like_pattern(&s_folded, &p_folded)
}
_ => false,
}
}
FilterOp::IsNull => cell.is_null(),
FilterOp::IsNotNull => !cell.is_null(),
FilterOp::JsonExtractEq {
path,
as_text,
value,
} => match cell {
Value::Json(j) => {
let extracted = j.get(path);
match extracted {
None => value.is_null(),
Some(extracted_json) => {
if *as_text {
let text = match extracted_json {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
matches!(value, Value::Text(t) if t == &text)
} else {
match value {
Value::Json(v) => extracted_json == v,
_ => false,
}
}
}
}
}
_ => false,
},
FilterOp::JsonContains(target) => match (cell, target) {
(Value::Json(haystack), Value::Json(needle)) => json_contains(haystack, needle),
_ => false,
},
FilterOp::AlwaysTrue => true,
FilterOp::AlwaysFalse => false,
FilterOp::NotIn(values) => {
if cell.is_null() {
return false;
}
if values.contains(cell) {
return false;
}
!values.iter().any(|v| match (cell, v) {
(Value::TinyInt(a), Value::SmallInt(b)) => i16::from(*a) == *b,
(Value::TinyInt(a), Value::Integer(b)) => i32::from(*a) == *b,
(Value::TinyInt(a), Value::BigInt(b)) => i64::from(*a) == *b,
(Value::SmallInt(a), Value::TinyInt(b)) => *a == i16::from(*b),
(Value::SmallInt(a), Value::Integer(b)) => i32::from(*a) == *b,
(Value::SmallInt(a), Value::BigInt(b)) => i64::from(*a) == *b,
(Value::Integer(a), Value::TinyInt(b)) => *a == i32::from(*b),
(Value::Integer(a), Value::SmallInt(b)) => *a == i32::from(*b),
(Value::Integer(a), Value::BigInt(b)) => i64::from(*a) == *b,
(Value::BigInt(a), Value::TinyInt(b)) => *a == i64::from(*b),
(Value::BigInt(a), Value::SmallInt(b)) => *a == i64::from(*b),
(Value::BigInt(a), Value::Integer(b)) => *a == i64::from(*b),
_ => false,
})
}
FilterOp::NotBetween(low, high) => {
if cell.is_null() {
return false;
}
let in_range = matches!(
cell.compare(low),
Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
) && matches!(
cell.compare(high),
Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
);
!in_range
}
FilterOp::ScalarCmp {
columns,
lhs,
op,
rhs,
} => {
let ctx = EvalContext::new(columns, row);
let Ok(l) = evaluate(lhs, &ctx) else {
return false;
};
let Ok(r) = evaluate(rhs, &ctx) else {
return false;
};
if l.is_null() || r.is_null() {
return false;
}
let widened = numeric_widen(&l, &r);
let (la, ra) = widened.as_ref().map_or((&l, &r), |(a, b)| (a, b));
match op {
ScalarCmpOp::Eq => la.compare(ra) == Some(std::cmp::Ordering::Equal),
ScalarCmpOp::NotEq => la.compare(ra) != Some(std::cmp::Ordering::Equal),
ScalarCmpOp::Lt => la.compare(ra) == Some(std::cmp::Ordering::Less),
ScalarCmpOp::Le => matches!(
la.compare(ra),
Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
),
ScalarCmpOp::Gt => la.compare(ra) == Some(std::cmp::Ordering::Greater),
ScalarCmpOp::Ge => matches!(
la.compare(ra),
Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
),
}
}
}
}
}
fn numeric_widen(a: &Value, b: &Value) -> Option<(Value, Value)> {
let ai = value_as_i64(a)?;
let bi = value_as_i64(b)?;
if std::mem::discriminant(a) == std::mem::discriminant(b) {
return None;
}
Some((Value::BigInt(ai), Value::BigInt(bi)))
}
fn value_as_i64(v: &Value) -> Option<i64> {
match v {
Value::TinyInt(n) => Some(i64::from(*n)),
Value::SmallInt(n) => Some(i64::from(*n)),
Value::Integer(n) => Some(i64::from(*n)),
Value::BigInt(n) => Some(*n),
_ => None,
}
}
fn json_contains(haystack: &serde_json::Value, needle: &serde_json::Value) -> bool {
match (haystack, needle) {
(serde_json::Value::Object(h), serde_json::Value::Object(n)) => n
.iter()
.all(|(k, v)| h.get(k).is_some_and(|hv| json_contains(hv, v))),
(serde_json::Value::Array(h), serde_json::Value::Array(n)) => {
n.iter().all(|nv| h.iter().any(|hv| json_contains(hv, nv)))
}
(a, b) => a == b,
}
}
pub(crate) fn matches_like_pattern(text: &str, pattern: &str) -> bool {
debug_assert!(!pattern.is_empty(), "LIKE pattern must not be empty");
let text_chars: Vec<char> = text.chars().collect();
#[allow(clippy::items_after_statements)]
#[derive(Clone, Copy, PartialEq, Eq)]
enum Token {
Literal(char),
Any, One, }
let mut tokens: Vec<Token> = Vec::new();
let pat_chars: Vec<char> = pattern.chars().collect();
let mut pi = 0;
while pi < pat_chars.len() {
if pat_chars[pi] == '\\' && pi + 1 < pat_chars.len() {
let next = pat_chars[pi + 1];
if next == '%' || next == '_' {
tokens.push(Token::Literal(next));
pi += 2;
continue;
}
}
tokens.push(match pat_chars[pi] {
'%' => Token::Any,
'_' => Token::One,
c => Token::Literal(c),
});
pi += 1;
}
let t_len = text_chars.len();
let p_len = tokens.len();
let mut dp = vec![false; p_len + 1];
dp[0] = true;
for j in 1..=p_len {
if tokens[j - 1] == Token::Any {
dp[j] = dp[j - 1];
} else {
break;
}
}
for i in 1..=t_len {
let mut new_dp = vec![false; p_len + 1];
for j in 1..=p_len {
new_dp[j] = match tokens[j - 1] {
Token::Any => {
dp[j] || new_dp[j - 1]
}
Token::One => {
dp[j - 1]
}
Token::Literal(c) => {
dp[j - 1] && text_chars[i - 1] == c
}
};
}
dp = new_dp;
}
dp[p_len]
}
#[derive(Debug, Clone)]
pub enum FilterOp {
Eq,
Lt,
Le,
Gt,
Ge,
In(Vec<Value>),
NotIn(Vec<Value>),
NotBetween(Value, Value),
Like(String),
NotLike(String),
ILike(String),
NotILike(String),
IsNull,
IsNotNull,
JsonExtractEq {
path: String,
as_text: bool,
value: Value,
},
JsonContains(Value),
AlwaysTrue,
AlwaysFalse,
ScalarCmp {
columns: Arc<[ColumnName]>,
lhs: ScalarExpr,
op: ScalarCmpOp,
rhs: ScalarExpr,
},
}