#![allow(clippy::ref_option)]
#![allow(clippy::trivially_copy_pass_by_ref)]
#![allow(clippy::items_after_statements)]
use std::cmp::Ordering;
use std::ops::Bound;
const SCAN_LIMIT_MULTIPLIER_WITH_SORT: usize = 10;
const SCAN_LIMIT_MULTIPLIER_NO_SORT: usize = 2;
const DEFAULT_SCAN_LIMIT: usize = 10_000;
const MAX_AGGREGATES_PER_QUERY: usize = 100;
const MAX_JOIN_OUTPUT_ROWS: usize = 1_000_000;
const MAX_GROUP_COUNT: usize = 100_000;
use bytes::Bytes;
use kimberlite_store::{Key, ProjectionStore, TableId};
use kimberlite_types::Offset;
use crate::error::{QueryError, Result};
use crate::key_encoder::successor_key;
use crate::plan::{QueryPlan, ScanOrder, SortSpec};
use crate::schema::{ColumnName, TableDef};
use crate::value::Value;
#[derive(Debug, Clone)]
pub struct QueryResult {
pub columns: Vec<ColumnName>,
pub rows: Vec<Row>,
}
impl QueryResult {
pub fn empty(columns: Vec<ColumnName>) -> Self {
Self {
columns,
rows: vec![],
}
}
pub fn len(&self) -> usize {
self.rows.len()
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
}
pub type Row = Vec<Value>;
#[allow(clippy::too_many_arguments)]
fn execute_index_scan<S: ProjectionStore>(
store: &mut S,
metadata: &crate::plan::TableMetadata,
index_id: u64,
start: &Bound<Key>,
end: &Bound<Key>,
filter: &Option<crate::plan::Filter>,
limit: &Option<usize>,
offset: &Option<usize>,
order: &ScanOrder,
order_by: &Option<crate::plan::SortSpec>,
columns: &[usize],
column_names: &[ColumnName],
position: Option<Offset>,
) -> Result<QueryResult> {
let (start_key, end_key) = bounds_to_range(start, end);
let limit_plus_offset = limit.map(|l| l.saturating_add(offset.unwrap_or(0)));
let scan_limit = if order_by.is_some() {
limit_plus_offset
.map(|l| l.saturating_mul(SCAN_LIMIT_MULTIPLIER_WITH_SORT))
.unwrap_or(DEFAULT_SCAN_LIMIT)
} else {
limit_plus_offset
.map(|l| l.saturating_mul(SCAN_LIMIT_MULTIPLIER_NO_SORT))
.unwrap_or(DEFAULT_SCAN_LIMIT)
};
debug_assert!(scan_limit > 0, "scan_limit must be positive");
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
metadata.table_id.as_u64().hash(&mut hasher);
index_id.hash(&mut hasher);
let index_table_id = TableId::new(hasher.finish());
let index_pairs = match position {
Some(pos) => store.scan_at(index_table_id, start_key..end_key, scan_limit, pos)?,
None => store.scan(index_table_id, start_key..end_key, scan_limit)?,
};
let mut full_rows = Vec::new();
let index_iter: Box<dyn Iterator<Item = &(Key, Bytes)>> = match order {
ScanOrder::Ascending => Box::new(index_pairs.iter()),
ScanOrder::Descending => Box::new(index_pairs.iter().rev()),
};
for (index_key, _) in index_iter {
let pk_key = extract_pk_from_index_key(index_key, metadata);
let bytes_opt = match position {
Some(pos) => store.get_at(metadata.table_id, &pk_key, pos)?,
None => store.get(metadata.table_id, &pk_key)?,
};
if let Some(bytes) = bytes_opt {
let full_row = decode_row(&bytes, metadata)?;
if let Some(f) = filter {
if !f.matches(&full_row) {
continue;
}
}
full_rows.push(full_row);
if order_by.is_none() {
if let Some(target) = limit_plus_offset {
if full_rows.len() >= target {
break;
}
}
}
}
}
if let Some(sort_spec) = order_by {
sort_rows(&mut full_rows, sort_spec);
}
apply_offset_and_limit(&mut full_rows, *offset, *limit);
let rows: Vec<Row> = full_rows
.iter()
.map(|full_row| project_row(full_row, columns))
.collect();
Ok(QueryResult {
columns: column_names.to_vec(),
rows,
})
}
#[allow(clippy::too_many_arguments)]
fn execute_table_scan<S: ProjectionStore>(
store: &mut S,
metadata: &crate::plan::TableMetadata,
filter: &Option<crate::plan::Filter>,
limit: &Option<usize>,
offset: &Option<usize>,
order: &Option<SortSpec>,
columns: &[usize],
column_names: &[ColumnName],
position: Option<Offset>,
) -> Result<QueryResult> {
let limit_plus_offset = limit.map(|l| l.saturating_add(offset.unwrap_or(0)));
let scan_limit = limit_plus_offset
.map(|l| l.saturating_mul(10))
.unwrap_or(100_000);
let pairs = match position {
Some(pos) => store.scan_at(metadata.table_id, Key::min()..Key::max(), scan_limit, pos)?,
None => store.scan(metadata.table_id, Key::min()..Key::max(), scan_limit)?,
};
let mut full_rows = Vec::new();
for (_, bytes) in &pairs {
let full_row = decode_row(bytes, metadata)?;
if let Some(f) = filter {
if !f.matches(&full_row) {
continue;
}
}
full_rows.push(full_row);
}
if let Some(sort_spec) = order {
sort_rows(&mut full_rows, sort_spec);
}
apply_offset_and_limit(&mut full_rows, *offset, *limit);
let rows: Vec<Row> = full_rows
.iter()
.map(|full_row| project_row(full_row, columns))
.collect();
Ok(QueryResult {
columns: column_names.to_vec(),
rows,
})
}
#[allow(clippy::too_many_arguments)]
fn execute_range_scan<S: ProjectionStore>(
store: &mut S,
metadata: &crate::plan::TableMetadata,
start: &Bound<Key>,
end: &Bound<Key>,
filter: &Option<crate::plan::Filter>,
limit: &Option<usize>,
offset: &Option<usize>,
order: &ScanOrder,
order_by: &Option<crate::plan::SortSpec>,
columns: &[usize],
column_names: &[ColumnName],
position: Option<Offset>,
) -> Result<QueryResult> {
let (start_key, end_key) = bounds_to_range(start, end);
let limit_plus_offset = limit.map(|l| l.saturating_add(offset.unwrap_or(0)));
let scan_limit = if order_by.is_some() {
limit_plus_offset
.map(|l| l.saturating_mul(SCAN_LIMIT_MULTIPLIER_WITH_SORT))
.unwrap_or(DEFAULT_SCAN_LIMIT)
} else {
limit_plus_offset
.map(|l| l.saturating_mul(SCAN_LIMIT_MULTIPLIER_NO_SORT))
.unwrap_or(DEFAULT_SCAN_LIMIT)
};
debug_assert!(scan_limit > 0, "scan_limit must be positive");
let pairs = match position {
Some(pos) => store.scan_at(metadata.table_id, start_key..end_key, scan_limit, pos)?,
None => store.scan(metadata.table_id, start_key..end_key, scan_limit)?,
};
let mut full_rows = Vec::new();
let row_iter: Box<dyn Iterator<Item = &(Key, Bytes)>> = match order {
ScanOrder::Ascending => Box::new(pairs.iter()),
ScanOrder::Descending => Box::new(pairs.iter().rev()),
};
for (_, bytes) in row_iter {
let full_row = decode_row(bytes, metadata)?;
if let Some(f) = filter {
if !f.matches(&full_row) {
continue;
}
}
full_rows.push(full_row);
if order_by.is_none() {
if let Some(target) = limit_plus_offset {
if full_rows.len() >= target {
break;
}
}
}
}
if let Some(sort_spec) = order_by {
sort_rows(&mut full_rows, sort_spec);
}
apply_offset_and_limit(&mut full_rows, *offset, *limit);
let rows: Vec<Row> = full_rows
.iter()
.map(|full_row| project_row(full_row, columns))
.collect();
Ok(QueryResult {
columns: column_names.to_vec(),
rows,
})
}
#[inline]
fn apply_offset_and_limit<T>(rows: &mut Vec<T>, offset: Option<usize>, limit: Option<usize>) {
if let Some(off) = offset {
if off >= rows.len() {
rows.clear();
} else {
rows.drain(0..off);
}
}
if let Some(lim) = limit {
rows.truncate(lim);
}
}
fn execute_point_lookup<S: ProjectionStore>(
store: &mut S,
metadata: &crate::plan::TableMetadata,
key: &Key,
columns: &[usize],
column_names: &[ColumnName],
position: Option<Offset>,
) -> Result<QueryResult> {
let result = match position {
Some(pos) => store.get_at(metadata.table_id, key, pos)?,
None => store.get(metadata.table_id, key)?,
};
match result {
Some(bytes) => {
let row = decode_and_project(&bytes, columns, metadata)?;
Ok(QueryResult {
columns: column_names.to_vec(),
rows: vec![row],
})
}
None => Ok(QueryResult::empty(column_names.to_vec())),
}
}
#[allow(clippy::too_many_lines, clippy::used_underscore_binding)]
fn execute_internal<S: ProjectionStore>(
store: &mut S,
plan: &QueryPlan,
_table_def: &TableDef, position: Option<Offset>,
) -> Result<QueryResult> {
kimberlite_properties::sometimes!(
position.is_some(),
"query.time_travel_at_position",
"query executes at a pinned historical log offset"
);
let result = execute_internal_inner(store, plan, _table_def, position)?;
#[cfg(any(test, feature = "sim"))]
{
let _expected_cols = plan.column_names().len();
kimberlite_properties::always!(
result.columns.len() == _expected_cols,
"query.result_columns_match_plan",
"query result column count must equal plan-declared schema column count"
);
kimberlite_properties::always!(
result.rows.iter().all(|r| r.len() == _expected_cols),
"query.row_width_matches_columns",
"every result row must have width equal to declared column count"
);
}
Ok(result)
}
#[allow(clippy::too_many_lines)]
fn execute_internal_inner<S: ProjectionStore>(
store: &mut S,
plan: &QueryPlan,
_table_def: &TableDef,
position: Option<Offset>,
) -> Result<QueryResult> {
match plan {
QueryPlan::PointLookup {
metadata,
key,
columns,
column_names,
} => execute_point_lookup(store, metadata, key, columns, column_names, position),
QueryPlan::RangeScan {
metadata,
start,
end,
filter,
limit,
offset,
order,
order_by,
columns,
column_names,
} => execute_range_scan(
store,
metadata,
start,
end,
filter,
limit,
offset,
order,
order_by,
columns,
column_names,
position,
),
QueryPlan::IndexScan {
metadata,
index_id,
start,
end,
filter,
limit,
offset,
order,
order_by,
columns,
column_names,
..
} => execute_index_scan(
store,
metadata,
*index_id,
start,
end,
filter,
limit,
offset,
order,
order_by,
columns,
column_names,
position,
),
QueryPlan::TableScan {
metadata,
filter,
limit,
offset,
order,
columns,
column_names,
} => execute_table_scan(
store,
metadata,
filter,
limit,
offset,
order,
columns,
column_names,
position,
),
QueryPlan::Aggregate {
metadata,
source,
group_by_cols,
group_by_names: _,
aggregates,
aggregate_filters,
column_names,
having,
} => execute_aggregate(
store,
source,
group_by_cols,
aggregates,
aggregate_filters,
column_names,
metadata,
having,
position,
),
QueryPlan::Join {
join_type,
left,
right,
on_conditions,
columns,
column_names,
} => execute_join(
store,
join_type,
left,
right,
on_conditions,
columns,
column_names,
position,
),
QueryPlan::Materialize {
source,
filter,
case_columns,
scalar_columns,
order,
limit,
offset,
column_names,
} => execute_materialize(
store,
source,
filter,
case_columns,
scalar_columns,
order,
limit,
offset,
column_names,
position,
),
}
}
#[allow(clippy::too_many_arguments)]
fn execute_materialize<S: ProjectionStore>(
store: &mut S,
source: &QueryPlan,
filter: &Option<crate::plan::Filter>,
case_columns: &[crate::plan::CaseColumnDef],
scalar_columns: &[crate::plan::ScalarColumnDef],
order: &Option<SortSpec>,
limit: &Option<usize>,
offset: &Option<usize>,
column_names: &[ColumnName],
position: Option<Offset>,
) -> Result<QueryResult> {
let dummy_def = TableDef {
table_id: kimberlite_store::TableId::from(0u64),
columns: vec![],
primary_key: vec![],
indexes: vec![],
};
let mut source_result = execute_internal(store, source, &dummy_def, position)?;
kimberlite_properties::sometimes!(
filter.is_some() || order.is_some() || limit.is_some() || offset.is_some(),
"query.materialize_applies_filter_order_limit",
"Materialize wrapper applies at least one of filter, order, limit, or offset"
);
if let Some(f) = filter {
source_result.rows.retain(|row| f.matches(row));
}
if !case_columns.is_empty() {
kimberlite_properties::sometimes!(
!source_result.rows.is_empty(),
"query.case_when_evaluated",
"CASE WHEN computed columns evaluated against at least one row"
);
for row in &mut source_result.rows {
for case_col in case_columns {
let val = evaluate_case_column(case_col, row);
row.push(val);
}
}
}
if !scalar_columns.is_empty() {
for row in &mut source_result.rows {
for sc in scalar_columns {
let ctx = crate::expression::EvalContext::new(&sc.columns, row);
let val = crate::expression::evaluate(&sc.expr, &ctx)?;
row.push(val);
}
}
}
let source_layout = source_result.columns.clone();
let mut full_layout: Vec<ColumnName> = source_layout;
full_layout.extend(case_columns.iter().map(|c| c.alias.clone()));
full_layout.extend(scalar_columns.iter().map(|c| c.output_name.clone()));
let needs_post_project = full_layout.len() != column_names.len()
|| full_layout
.iter()
.zip(column_names.iter())
.any(|(a, b)| a != b);
if needs_post_project {
let mut index_map = Vec::with_capacity(column_names.len());
for target in column_names {
let pos = full_layout
.iter()
.position(|c| c == target)
.ok_or_else(|| QueryError::ColumnNotFound {
table: String::new(),
column: target.to_string(),
})?;
index_map.push(pos);
}
for row in &mut source_result.rows {
let projected: Vec<Value> = index_map.iter().map(|&i| row[i].clone()).collect();
*row = projected;
}
}
if let Some(spec) = order {
sort_rows(&mut source_result.rows, spec);
}
apply_offset_and_limit(&mut source_result.rows, *offset, *limit);
Ok(QueryResult {
columns: column_names.to_vec(),
rows: source_result.rows,
})
}
fn evaluate_case_column(case_col: &crate::plan::CaseColumnDef, row: &[Value]) -> Value {
for clause in &case_col.when_clauses {
if clause.condition.matches(row) {
return clause.result.clone();
}
}
case_col.else_value.clone()
}
pub fn execute<S: ProjectionStore>(
store: &mut S,
plan: &QueryPlan,
table_def: &TableDef,
) -> Result<QueryResult> {
execute_internal(store, plan, table_def, None)
}
pub fn execute_at<S: ProjectionStore>(
store: &mut S,
plan: &QueryPlan,
table_def: &TableDef,
position: Offset,
) -> Result<QueryResult> {
execute_internal(store, plan, table_def, Some(position))
}
fn bounds_to_range(start: &Bound<Key>, end: &Bound<Key>) -> (Key, Key) {
let start_key = match start {
Bound::Included(k) => k.clone(),
Bound::Excluded(k) => successor_key(k),
Bound::Unbounded => Key::min(),
};
let end_key = match end {
Bound::Included(k) => successor_key(k),
Bound::Excluded(k) => k.clone(),
Bound::Unbounded => Key::max(),
};
(start_key, end_key)
}
fn extract_pk_from_index_key(index_key: &Key, metadata: &crate::plan::TableMetadata) -> Key {
use crate::key_encoder::{decode_key, encode_key};
let all_values = decode_key(index_key);
let pk_count = metadata.primary_key.len();
debug_assert!(pk_count > 0, "primary key columns must be non-empty");
debug_assert!(
all_values.len() >= pk_count,
"index key must contain at least the primary key values"
);
let pk_values: Vec<Value> = all_values
.iter()
.skip(all_values.len() - pk_count)
.cloned()
.collect();
debug_assert_eq!(
pk_values.len(),
pk_count,
"extracted primary key must have correct number of columns"
);
encode_key(&pk_values)
}
fn decode_row(bytes: &Bytes, metadata: &crate::plan::TableMetadata) -> Result<Row> {
let json: serde_json::Value = serde_json::from_slice(bytes)?;
let obj = json.as_object().ok_or_else(|| QueryError::TypeMismatch {
expected: "object".to_string(),
actual: format!("{json:?}"),
})?;
let mut row = Vec::with_capacity(metadata.columns.len());
for col_def in &metadata.columns {
let col_name = col_def.name.as_str();
let json_val = obj.get(col_name).unwrap_or(&serde_json::Value::Null);
let value = Value::from_json(json_val, col_def.data_type)?;
row.push(value);
}
Ok(row)
}
fn decode_and_project(
bytes: &Bytes,
columns: &[usize],
metadata: &crate::plan::TableMetadata,
) -> Result<Row> {
let full_row = decode_row(bytes, metadata)?;
Ok(project_row(&full_row, columns))
}
fn project_row(full_row: &[Value], columns: &[usize]) -> Row {
debug_assert!(
columns.iter().all(|&idx| idx < full_row.len()),
"column index out of bounds: columns={:?}, row_len={}",
columns,
full_row.len()
);
if columns.is_empty() {
return full_row.to_vec();
}
let projected: Vec<Value> = columns
.iter()
.map(|&idx| {
full_row.get(idx).cloned().unwrap_or_else(|| {
panic!(
"column index {} out of bounds (row len {})",
idx,
full_row.len()
);
})
})
.collect();
debug_assert_eq!(
projected.len(),
columns.len(),
"projected row length mismatch"
);
projected
}
fn sort_rows(rows: &mut [Row], spec: &SortSpec) {
rows.sort_by(|a, b| {
for (col_idx, order) in &spec.columns {
let a_val = a.get(*col_idx);
let b_val = b.get(*col_idx);
let cmp = match (a_val, b_val) {
(Some(av), Some(bv)) => av.compare(bv).unwrap_or(Ordering::Equal),
(None, None) => Ordering::Equal,
(None, Some(_)) => Ordering::Less,
(Some(_), None) => Ordering::Greater,
};
if cmp != Ordering::Equal {
return match order {
ScanOrder::Ascending => cmp,
ScanOrder::Descending => cmp.reverse(),
};
}
}
Ordering::Equal
});
}
#[allow(clippy::too_many_arguments)]
fn evaluate_join_conditions(row: &[Value], conditions: &[crate::plan::JoinCondition]) -> bool {
use crate::plan::JoinOp;
conditions.iter().all(|cond| {
let left_val = row.get(cond.left_col_idx);
let right_val = row.get(cond.right_col_idx);
if left_val.is_none() || right_val.is_none() {
return false;
}
let left_val = left_val.unwrap();
let right_val = right_val.unwrap();
match cond.op {
JoinOp::Eq => left_val == right_val,
JoinOp::Lt => left_val.compare(right_val) == Some(std::cmp::Ordering::Less),
JoinOp::Le => matches!(
left_val.compare(right_val),
Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
),
JoinOp::Gt => left_val.compare(right_val) == Some(std::cmp::Ordering::Greater),
JoinOp::Ge => matches!(
left_val.compare(right_val),
Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
),
}
})
}
fn execute_join<S: ProjectionStore>(
store: &mut S,
join_type: &crate::parser::JoinType,
left: &QueryPlan,
right: &QueryPlan,
on_conditions: &[crate::plan::JoinCondition],
_columns: &[usize],
column_names: &[ColumnName],
position: Option<Offset>,
) -> Result<QueryResult> {
let left_metadata = left.metadata().ok_or_else(|| {
QueryError::UnsupportedFeature("JOIN child plan missing metadata".to_string())
})?;
let right_metadata = right.metadata().ok_or_else(|| {
QueryError::UnsupportedFeature("JOIN child plan missing metadata".to_string())
})?;
let left_table_def = TableDef {
table_id: left_metadata.table_id,
columns: left_metadata.columns.clone(),
primary_key: left_metadata.primary_key.clone(),
indexes: vec![], };
let right_table_def = TableDef {
table_id: right_metadata.table_id,
columns: right_metadata.columns.clone(),
primary_key: right_metadata.primary_key.clone(),
indexes: vec![], };
let left_result = execute_internal(store, left, &left_table_def, position)?;
let right_result = execute_internal(store, right, &right_table_def, position)?;
let mut output_rows = Vec::new();
match join_type {
crate::parser::JoinType::Inner => {
for left_row in &left_result.rows {
for right_row in &right_result.rows {
let combined_row: Vec<Value> =
left_row.iter().chain(right_row.iter()).cloned().collect();
if evaluate_join_conditions(&combined_row, on_conditions) {
output_rows.push(combined_row);
kimberlite_properties::sometimes!(
output_rows.len() > MAX_JOIN_OUTPUT_ROWS,
"query.join_output_row_cap_hit",
"INNER JOIN output hits MAX_JOIN_OUTPUT_ROWS (1M) cap"
);
if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
return Err(QueryError::UnsupportedFeature(format!(
"JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
)));
}
}
}
}
}
crate::parser::JoinType::Left => {
for left_row in &left_result.rows {
let mut matched = false;
for right_row in &right_result.rows {
let combined_row: Vec<Value> =
left_row.iter().chain(right_row.iter()).cloned().collect();
if evaluate_join_conditions(&combined_row, on_conditions) {
output_rows.push(combined_row);
matched = true;
kimberlite_properties::sometimes!(
output_rows.len() > MAX_JOIN_OUTPUT_ROWS,
"query.left_join_output_row_cap_hit",
"LEFT JOIN output hits MAX_JOIN_OUTPUT_ROWS (1M) cap"
);
if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
return Err(QueryError::UnsupportedFeature(format!(
"JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
)));
}
}
}
if !matched {
let right_nulls = vec![Value::Null; right_result.columns.len()];
let combined_row: Vec<Value> = left_row
.iter()
.cloned()
.chain(right_nulls.into_iter())
.collect();
output_rows.push(combined_row);
}
}
}
crate::parser::JoinType::Right => {
for right_row in &right_result.rows {
let mut matched = false;
for left_row in &left_result.rows {
let combined_row: Vec<Value> =
left_row.iter().chain(right_row.iter()).cloned().collect();
if evaluate_join_conditions(&combined_row, on_conditions) {
output_rows.push(combined_row);
matched = true;
if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
return Err(QueryError::UnsupportedFeature(format!(
"JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
)));
}
}
}
if !matched {
let left_nulls = vec![Value::Null; left_result.columns.len()];
let combined_row: Vec<Value> = left_nulls
.into_iter()
.chain(right_row.iter().cloned())
.collect();
output_rows.push(combined_row);
}
}
}
crate::parser::JoinType::Full => {
let mut right_matched = vec![false; right_result.rows.len()];
for left_row in &left_result.rows {
let mut matched = false;
for (rj, right_row) in right_result.rows.iter().enumerate() {
let combined_row: Vec<Value> =
left_row.iter().chain(right_row.iter()).cloned().collect();
if evaluate_join_conditions(&combined_row, on_conditions) {
output_rows.push(combined_row);
matched = true;
right_matched[rj] = true;
if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
return Err(QueryError::UnsupportedFeature(format!(
"JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
)));
}
}
}
if !matched {
let right_nulls = vec![Value::Null; right_result.columns.len()];
let combined_row: Vec<Value> = left_row
.iter()
.cloned()
.chain(right_nulls.into_iter())
.collect();
output_rows.push(combined_row);
}
}
for (rj, right_row) in right_result.rows.iter().enumerate() {
if !right_matched[rj] {
let left_nulls = vec![Value::Null; left_result.columns.len()];
let combined_row: Vec<Value> = left_nulls
.into_iter()
.chain(right_row.iter().cloned())
.collect();
output_rows.push(combined_row);
}
}
}
crate::parser::JoinType::Cross => {
let estimated = left_result
.rows
.len()
.saturating_mul(right_result.rows.len());
if estimated > MAX_JOIN_OUTPUT_ROWS {
return Err(QueryError::UnsupportedFeature(format!(
"CROSS JOIN cardinality {estimated} exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective query"
)));
}
for left_row in &left_result.rows {
for right_row in &right_result.rows {
let combined_row: Vec<Value> =
left_row.iter().chain(right_row.iter()).cloned().collect();
output_rows.push(combined_row);
}
}
}
}
kimberlite_properties::sometimes!(
output_rows.len() > 1,
"query.join_produces_multi_row_output",
"join execution produces more than one output row"
);
Ok(QueryResult {
columns: column_names.to_vec(),
rows: output_rows,
})
}
#[allow(clippy::too_many_arguments)]
fn execute_aggregate<S: ProjectionStore>(
store: &mut S,
source: &QueryPlan,
group_by_cols: &[usize],
aggregates: &[crate::parser::AggregateFunction],
aggregate_filters: &[Option<crate::plan::Filter>],
column_names: &[ColumnName],
metadata: &crate::plan::TableMetadata,
having: &[crate::parser::HavingCondition],
position: Option<Offset>,
) -> Result<QueryResult> {
use std::collections::HashMap;
let dummy_table_def = TableDef {
table_id: metadata.table_id,
columns: metadata.columns.clone(),
primary_key: metadata.primary_key.clone(),
indexes: vec![],
};
let source_result = execute_internal(store, source, &dummy_table_def, position)?;
let mut groups: HashMap<Vec<Value>, AggregateState> = HashMap::new();
for row in source_result.rows {
let group_key: Vec<Value> = if group_by_cols.is_empty() {
vec![]
} else {
group_by_cols
.iter()
.map(|&idx| row.get(idx).cloned().unwrap_or(Value::Null))
.collect()
};
kimberlite_properties::sometimes!(
!groups.contains_key(&group_key) && groups.len() >= MAX_GROUP_COUNT,
"query.group_by_cardinality_cap_hit",
"GROUP BY hits MAX_GROUP_COUNT (100k) distinct group cap"
);
if !groups.contains_key(&group_key) && groups.len() >= MAX_GROUP_COUNT {
return Err(QueryError::UnsupportedFeature(format!(
"GROUP BY cardinality exceeds maximum of {MAX_GROUP_COUNT} distinct groups"
)));
}
let state = groups.entry(group_key).or_insert_with(AggregateState::new);
state.update(&row, aggregates, aggregate_filters, metadata)?;
}
let group_by_count = group_by_cols.len();
let mut result_rows = Vec::new();
for (group_key, state) in groups {
let agg_values = state.finalize(aggregates);
if !having.is_empty() && !evaluate_having(having, aggregates, &agg_values, group_by_count) {
continue;
}
let mut result_row = group_key; result_row.extend(agg_values); result_rows.push(result_row);
}
if result_rows.is_empty() && group_by_cols.is_empty() && having.is_empty() {
let state = AggregateState::new();
let agg_values = state.finalize(aggregates);
result_rows.push(agg_values);
}
Ok(QueryResult {
columns: column_names.to_vec(),
rows: result_rows,
})
}
fn evaluate_having(
having: &[crate::parser::HavingCondition],
aggregates: &[crate::parser::AggregateFunction],
agg_values: &[Value],
_group_by_count: usize,
) -> bool {
having.iter().all(|condition| match condition {
crate::parser::HavingCondition::AggregateComparison {
aggregate,
op,
value,
} => {
let agg_idx = aggregates.iter().position(|a| a == aggregate);
let Some(idx) = agg_idx else {
return false;
};
let Some(agg_value) = agg_values.get(idx) else {
return false;
};
match op {
crate::parser::HavingOp::Eq => agg_value == value,
crate::parser::HavingOp::Lt => agg_value.compare(value) == Some(Ordering::Less),
crate::parser::HavingOp::Le => matches!(
agg_value.compare(value),
Some(Ordering::Less | Ordering::Equal)
),
crate::parser::HavingOp::Gt => agg_value.compare(value) == Some(Ordering::Greater),
crate::parser::HavingOp::Ge => matches!(
agg_value.compare(value),
Some(Ordering::Greater | Ordering::Equal)
),
}
}
})
}
#[derive(Debug, Clone)]
struct AggregateState {
count: i64,
per_agg_counts: Vec<i64>,
non_null_counts: Vec<i64>, sums: Vec<Option<Value>>,
mins: Vec<Option<Value>>,
maxs: Vec<Option<Value>>,
}
impl AggregateState {
fn new() -> Self {
Self {
count: 0,
per_agg_counts: Vec::new(),
non_null_counts: Vec::new(),
sums: Vec::new(),
mins: Vec::new(),
maxs: Vec::new(),
}
}
fn update(
&mut self,
row: &[Value],
aggregates: &[crate::parser::AggregateFunction],
aggregate_filters: &[Option<crate::plan::Filter>],
metadata: &crate::plan::TableMetadata,
) -> Result<()> {
debug_assert!(!row.is_empty(), "row must have at least one column");
assert!(
aggregates.len() <= MAX_AGGREGATES_PER_QUERY,
"too many aggregates ({} > {})",
aggregates.len(),
MAX_AGGREGATES_PER_QUERY
);
let any_filter = aggregate_filters.iter().any(std::option::Option::is_some);
if !any_filter {
self.count += 1;
}
while self.sums.len() < aggregates.len() {
self.non_null_counts.push(0);
self.sums.push(None);
self.mins.push(None);
self.maxs.push(None);
self.per_agg_counts.push(0);
}
debug_assert_eq!(
self.sums.len(),
self.non_null_counts.len(),
"aggregate state vectors out of sync"
);
debug_assert_eq!(self.sums.len(), self.mins.len());
debug_assert_eq!(self.sums.len(), self.maxs.len());
let find_col_idx = |col: &ColumnName| -> usize {
metadata
.columns
.iter()
.position(|c| &c.name == col)
.unwrap_or(0)
};
for (i, agg) in aggregates.iter().enumerate() {
if let Some(Some(filter)) = aggregate_filters.get(i) {
if !filter.matches(row) {
continue;
}
}
self.per_agg_counts[i] += 1;
match agg {
crate::parser::AggregateFunction::CountStar => {
}
crate::parser::AggregateFunction::Count(col) => {
let col_idx = find_col_idx(col);
if let Some(val) = row.get(col_idx) {
if !val.is_null() {
self.non_null_counts[i] += 1;
}
}
}
crate::parser::AggregateFunction::Sum(col) => {
let col_idx = find_col_idx(col);
if let Some(val) = row.get(col_idx) {
if !val.is_null() {
self.sums[i] = Some(add_values(&self.sums[i], val)?);
}
}
}
crate::parser::AggregateFunction::Avg(col) => {
let col_idx = find_col_idx(col);
if let Some(val) = row.get(col_idx) {
if !val.is_null() {
self.sums[i] = Some(add_values(&self.sums[i], val)?);
}
}
}
crate::parser::AggregateFunction::Min(col) => {
let col_idx = find_col_idx(col);
if let Some(val) = row.get(col_idx) {
if !val.is_null() {
self.mins[i] = Some(min_value(&self.mins[i], val));
}
}
}
crate::parser::AggregateFunction::Max(col) => {
let col_idx = find_col_idx(col);
if let Some(val) = row.get(col_idx) {
if !val.is_null() {
self.maxs[i] = Some(max_value(&self.maxs[i], val));
}
}
}
}
}
debug_assert_eq!(
self.sums.len(),
aggregates.len(),
"aggregate state must match aggregate count after update"
);
Ok(())
}
fn finalize(&self, aggregates: &[crate::parser::AggregateFunction]) -> Vec<Value> {
let mut result = Vec::new();
for (i, agg) in aggregates.iter().enumerate() {
let per_agg_count = self.per_agg_counts.get(i).copied().unwrap_or(self.count);
let value = match agg {
crate::parser::AggregateFunction::CountStar => Value::BigInt(per_agg_count),
crate::parser::AggregateFunction::Count(_) => {
Value::BigInt(self.non_null_counts.get(i).copied().unwrap_or(0))
}
crate::parser::AggregateFunction::Sum(_) => self
.sums
.get(i)
.and_then(std::clone::Clone::clone)
.unwrap_or(Value::Null),
crate::parser::AggregateFunction::Avg(_) => {
if per_agg_count == 0 {
Value::Null
} else {
kimberlite_properties::never!(
per_agg_count == 0,
"query.avg_divide_by_zero",
"AVG divide_value must never be reached with per_agg_count == 0"
);
match self.sums.get(i).and_then(|v| v.as_ref()) {
Some(sum) => divide_value(sum, per_agg_count).unwrap_or(Value::Null),
None => Value::Null,
}
}
}
crate::parser::AggregateFunction::Min(_) => self
.mins
.get(i)
.and_then(std::clone::Clone::clone)
.unwrap_or(Value::Null),
crate::parser::AggregateFunction::Max(_) => self
.maxs
.get(i)
.and_then(std::clone::Clone::clone)
.unwrap_or(Value::Null),
};
result.push(value);
}
result
}
}
fn add_values(a: &Option<Value>, b: &Value) -> Result<Value> {
match a {
None => Ok(b.clone()),
Some(a_val) => match (a_val, b) {
(Value::BigInt(x), Value::BigInt(y)) => {
let checked = x.checked_add(*y);
kimberlite_properties::sometimes!(
checked.is_none(),
"query.sum_bigint_overflow_detected",
"SUM(BIGINT) overflow detected by checked_add"
);
if let Some(sum) = checked {
kimberlite_properties::never!(
sum != x.wrapping_add(*y)
|| (*x > 0 && *y > 0 && sum < 0)
|| (*x < 0 && *y < 0 && sum > 0),
"query.sum_bigint_silent_wrap",
"SUM(BIGINT) checked_add returned Some() for an overflowing result"
);
Ok(Value::BigInt(sum))
} else {
Err(QueryError::TypeMismatch {
expected: "BigInt (non-overflowing)".to_string(),
actual: format!("overflow: {x} + {y}"),
})
}
}
(Value::Integer(x), Value::Integer(y)) => x
.checked_add(*y)
.map(Value::Integer)
.ok_or_else(|| QueryError::TypeMismatch {
expected: "Integer (non-overflowing)".to_string(),
actual: format!("overflow: {x} + {y}"),
}),
(Value::SmallInt(x), Value::SmallInt(y)) => x
.checked_add(*y)
.map(Value::SmallInt)
.ok_or_else(|| QueryError::TypeMismatch {
expected: "SmallInt (non-overflowing)".to_string(),
actual: format!("overflow: {x} + {y}"),
}),
(Value::TinyInt(x), Value::TinyInt(y)) => x
.checked_add(*y)
.map(Value::TinyInt)
.ok_or_else(|| QueryError::TypeMismatch {
expected: "TinyInt (non-overflowing)".to_string(),
actual: format!("overflow: {x} + {y}"),
}),
(Value::Real(x), Value::Real(y)) => Ok(Value::Real(x + y)),
(Value::Decimal(x, sx), Value::Decimal(y, sy)) if sx == sy => x
.checked_add(*y)
.map(|sum| Value::Decimal(sum, *sx))
.ok_or_else(|| QueryError::TypeMismatch {
expected: "Decimal (non-overflowing)".to_string(),
actual: format!("overflow: {x} + {y}"),
}),
_ => Err(QueryError::TypeMismatch {
expected: format!("{a_val:?}"),
actual: format!("{b:?}"),
}),
},
}
}
fn min_value(a: &Option<Value>, b: &Value) -> Value {
match a {
None => b.clone(),
Some(a_val) => {
if let Some(ord) = a_val.compare(b) {
if ord == Ordering::Less {
a_val.clone()
} else {
b.clone()
}
} else {
a_val.clone() }
}
}
}
fn max_value(a: &Option<Value>, b: &Value) -> Value {
match a {
None => b.clone(),
Some(a_val) => {
if let Some(ord) = a_val.compare(b) {
if ord == Ordering::Greater {
a_val.clone()
} else {
b.clone()
}
} else {
a_val.clone() }
}
}
}
#[allow(clippy::cast_precision_loss)]
fn divide_value(val: &Value, count: i64) -> Option<Value> {
if count == 0 {
return Some(Value::Null);
}
match val {
Value::BigInt(x) => Some(Value::Real(*x as f64 / count as f64)),
Value::Integer(x) => Some(Value::Real(f64::from(*x) / count as f64)),
Value::SmallInt(x) => Some(Value::Real(f64::from(*x) / count as f64)),
Value::TinyInt(x) => Some(Value::Real(f64::from(*x) / count as f64)),
Value::Real(x) => Some(Value::Real(x / count as f64)),
Value::Decimal(x, scale) => {
let divisor = 10_i128.pow(u32::from(*scale));
let float_val = *x as f64 / divisor as f64;
Some(Value::Real(float_val / count as f64))
}
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::plan::Filter;
use crate::plan::FilterCondition;
use crate::plan::FilterOp;
#[test]
fn test_project_row() {
let row = vec![
Value::BigInt(1),
Value::Text("alice".to_string()),
Value::BigInt(30),
];
let projected = project_row(&row, &[0, 2]);
assert_eq!(projected, vec![Value::BigInt(1), Value::BigInt(30)]);
}
#[test]
fn test_project_row_all() {
let row = vec![Value::BigInt(1), Value::Text("bob".to_string())];
let projected = project_row(&row, &[]);
assert_eq!(projected, row);
}
#[test]
fn test_filter_matches() {
let row = vec![Value::BigInt(42), Value::Text("alice".to_string())];
let filter = Filter::single(FilterCondition {
column_idx: 0,
op: FilterOp::Eq,
value: Value::BigInt(42),
});
assert!(filter.matches(&row));
let filter_miss = Filter::single(FilterCondition {
column_idx: 0,
op: FilterOp::Eq,
value: Value::BigInt(99),
});
assert!(!filter_miss.matches(&row));
}
#[test]
fn test_sort_rows() {
let mut rows = vec![
vec![Value::BigInt(3), Value::Text("c".to_string())],
vec![Value::BigInt(1), Value::Text("a".to_string())],
vec![Value::BigInt(2), Value::Text("b".to_string())],
];
let spec = SortSpec {
columns: vec![(0, ScanOrder::Ascending)],
};
sort_rows(&mut rows, &spec);
assert_eq!(rows[0][0], Value::BigInt(1));
assert_eq!(rows[1][0], Value::BigInt(2));
assert_eq!(rows[2][0], Value::BigInt(3));
}
#[test]
fn test_sort_rows_descending() {
let mut rows = vec![
vec![Value::BigInt(1)],
vec![Value::BigInt(3)],
vec![Value::BigInt(2)],
];
let spec = SortSpec {
columns: vec![(0, ScanOrder::Descending)],
};
sort_rows(&mut rows, &spec);
assert_eq!(rows[0][0], Value::BigInt(3));
assert_eq!(rows[1][0], Value::BigInt(2));
assert_eq!(rows[2][0], Value::BigInt(1));
}
}