use alloc::borrow::Cow;
use alloc::vec::Vec;
use spg_sql::ast::{BinOp, Expr, Literal, SelectStatement};
use spg_storage::{Catalog, ColumnSchema, IndexKey, Row, Table, Value};
use crate::eval::{self, EvalContext};
use crate::{EngineError, QueryResult, apply_offset_and_limit, build_projection};
pub(crate) fn try_nsw_knn(
stmt: &SelectStatement,
table: &Table,
schema_cols: &[ColumnSchema],
table_alias: &str,
) -> Option<Vec<usize>> {
if stmt.distinct {
return None;
}
let limit = usize::try_from(stmt.limit_literal()?).ok()?;
if limit == 0 {
return None;
}
if stmt.order_by.len() != 1 {
return None;
}
let order = &stmt.order_by[0];
if order.desc {
return None;
}
let Expr::Binary { lhs, op, rhs } = &order.expr else {
return None;
};
let metric = match op {
BinOp::L2Distance => spg_storage::NswMetric::L2,
BinOp::InnerProduct => spg_storage::NswMetric::InnerProduct,
BinOp::CosineDistance => spg_storage::NswMetric::Cosine,
_ => return None,
};
let ((Expr::Column(col), literal) | (literal, Expr::Column(col))) =
(lhs.as_ref(), rhs.as_ref())
else {
return None;
};
if let Some(q) = &col.qualifier
&& q != table_alias
{
return None;
}
let col_pos = schema_cols.iter().position(|s| s.name == col.name)?;
let query = literal_to_vector(literal)?;
let idx = spg_storage::nsw_index_on(table, col_pos)?;
if let Some(where_expr) = &stmt.where_ {
let over_fetch = limit.saturating_mul(10).max(NSW_OVER_FETCH_FLOOR);
let candidates = spg_storage::nsw_query(table, &idx.name, &query, over_fetch, metric);
let ctx = EvalContext::new(schema_cols, Some(table_alias));
let mut kept: Vec<usize> = Vec::with_capacity(limit);
for i in candidates {
let row = &table.rows()[i];
let cond = eval::eval_expr(where_expr, row, &ctx).ok()?;
if matches!(cond, Value::Bool(true)) {
kept.push(i);
if kept.len() >= limit {
break;
}
}
}
Some(kept)
} else {
Some(spg_storage::nsw_query(
table, &idx.name, &query, limit, metric,
))
}
}
const NSW_OVER_FETCH_FLOOR: usize = 32;
pub(crate) fn literal_to_vector(e: &Expr) -> Option<Vec<f32>> {
match e {
Expr::Literal(Literal::Vector(v)) => Some(v.clone()),
Expr::Cast { expr, .. } => literal_to_vector(expr),
_ => None,
}
}
pub(crate) fn materialise_in_order(
stmt: &SelectStatement,
table: &Table,
schema_cols: &[ColumnSchema],
table_alias: &str,
ordered_rows: &[usize],
) -> Result<QueryResult, EngineError> {
let ctx = EvalContext::new(schema_cols, Some(table_alias));
let projection = build_projection(&stmt.items, schema_cols, table_alias)?;
let mut output_rows: Vec<Row> = Vec::with_capacity(ordered_rows.len());
for &i in ordered_rows {
let row = &table.rows()[i];
let mut values = Vec::with_capacity(projection.len());
for p in &projection {
values.push(eval::eval_expr(&p.expr, row, &ctx)?);
}
output_rows.push(Row::new(values));
}
apply_offset_and_limit(
&mut output_rows,
stmt.offset_literal(),
stmt.limit_literal(),
);
let columns: Vec<ColumnSchema> = projection
.into_iter()
.map(|p| ColumnSchema::new(p.output_name, p.ty, p.nullable))
.collect();
Ok(QueryResult::Rows {
columns,
rows: output_rows,
})
}
pub(crate) fn try_index_seek_positions(
where_expr: &Expr,
schema_cols: &[ColumnSchema],
table: &Table,
table_alias: &str,
) -> Option<Vec<usize>> {
if let Expr::Binary {
lhs,
op: BinOp::And,
rhs,
} = where_expr
{
if let Some(p) = try_index_seek_positions(lhs, schema_cols, table, table_alias) {
return Some(p);
}
return try_index_seek_positions(rhs, schema_cols, table, table_alias);
}
let Expr::Binary {
lhs,
op: BinOp::Eq,
rhs,
} = where_expr
else {
return None;
};
let (col_pos, value) = resolve_col_literal_pair(lhs, rhs, schema_cols, table_alias)
.or_else(|| resolve_col_literal_pair(rhs, lhs, schema_cols, table_alias))?;
let idx = table.index_on(col_pos)?;
let key = IndexKey::from_value(&value)?;
let locators = idx.lookup_eq(&key);
let mut out = Vec::with_capacity(locators.len());
for loc in locators {
match *loc {
spg_storage::RowLocator::Hot(i) => out.push(i),
spg_storage::RowLocator::Cold { .. } => return None,
}
}
Some(out)
}
pub(crate) fn try_index_seek<'a>(
where_expr: &Expr,
schema_cols: &[ColumnSchema],
catalog: &'a Catalog,
table: &'a Table,
table_alias: &str,
) -> Option<Vec<Cow<'a, Row>>> {
if let Expr::Binary {
lhs,
op: BinOp::And,
rhs,
} = where_expr
{
if let Some(rows) = try_index_seek(lhs, schema_cols, catalog, table, table_alias) {
return Some(rows);
}
return try_index_seek(rhs, schema_cols, catalog, table, table_alias);
}
if let Some(rows) = try_inlist_seek(where_expr, schema_cols, catalog, table, table_alias) {
return Some(rows);
}
let Expr::Binary {
lhs,
op: BinOp::Eq,
rhs,
} = where_expr
else {
return None;
};
let (col_pos, value) = resolve_col_literal_pair(lhs, rhs, schema_cols, table_alias)
.or_else(|| resolve_col_literal_pair(rhs, lhs, schema_cols, table_alias))?;
let idx = table.index_on(col_pos)?;
let key = IndexKey::from_value(&value)?;
let locators = idx.lookup_eq(&key);
let table_name = table.schema().name.as_str();
let mut out: Vec<Cow<'a, Row>> = Vec::with_capacity(locators.len());
for loc in locators {
match *loc {
spg_storage::RowLocator::Hot(i) => {
if let Some(row) = table.rows().get(i) {
out.push(Cow::Borrowed(row));
}
}
spg_storage::RowLocator::Cold { segment_id, .. } => {
if let Some(row) = catalog.resolve_cold_locator(table_name, segment_id, &key) {
out.push(Cow::Owned(row));
}
}
}
}
Some(out)
}
fn try_inlist_seek<'a>(
where_expr: &Expr,
schema_cols: &[ColumnSchema],
catalog: &'a Catalog,
table: &'a Table,
table_alias: &str,
) -> Option<Vec<Cow<'a, Row>>> {
let Expr::InList {
expr,
list,
negated: false,
} = where_expr
else {
return None;
};
let Expr::Column(c) = expr.as_ref() else {
return None;
};
if !c
.qualifier
.as_deref()
.is_none_or(|q| q.eq_ignore_ascii_case(table_alias))
{
return None;
}
let col_pos = schema_cols.iter().position(|s| s.name == c.name)?;
let idx = table.index_on(col_pos)?;
let mut keys: Vec<IndexKey> = Vec::with_capacity(list.len());
for e in list {
let Expr::Literal(l) = e else {
return None;
};
keys.push(IndexKey::from_value(&eval::literal_to_value(l))?);
}
let table_name = table.schema().name.as_str();
let mut out: Vec<Cow<'a, Row>> = Vec::new();
for key in &keys {
for loc in idx.lookup_eq(key) {
match *loc {
spg_storage::RowLocator::Hot(i) => {
if let Some(row) = table.rows().get(i) {
out.push(Cow::Borrowed(row));
}
}
spg_storage::RowLocator::Cold { segment_id, .. } => {
if let Some(row) = catalog.resolve_cold_locator(table_name, segment_id, key) {
out.push(Cow::Owned(row));
}
}
}
}
}
Some(out)
}
pub(crate) fn try_gin_seek<'a>(
where_expr: &Expr,
schema_cols: &[ColumnSchema],
catalog: &'a Catalog,
table: &'a Table,
table_alias: &str,
ctx: &eval::EvalContext<'_>,
) -> Option<Vec<Cow<'a, Row>>> {
if let Expr::Binary {
lhs,
op: BinOp::And,
rhs,
} = where_expr
{
if let Some(rows) = try_gin_seek(lhs, schema_cols, catalog, table, table_alias, ctx) {
return Some(rows);
}
return try_gin_seek(rhs, schema_cols, catalog, table, table_alias, ctx);
}
if let Expr::Binary {
lhs,
op: BinOp::Or,
rhs,
} = where_expr
{
let left = try_gin_seek(lhs, schema_cols, catalog, table, table_alias, ctx)?;
let right = try_gin_seek(rhs, schema_cols, catalog, table, table_alias, ctx)?;
let mut out: Vec<Cow<'a, Row>> = Vec::with_capacity(left.len() + right.len());
out.extend(left);
out.extend(right);
return Some(out);
}
let Expr::Binary {
lhs,
op: BinOp::TsMatch,
rhs,
} = where_expr
else {
return None;
};
let (col_pos, query) = resolve_gin_col_query(lhs, rhs, schema_cols, table_alias, ctx)
.or_else(|| resolve_gin_col_query(rhs, lhs, schema_cols, table_alias, ctx))?;
let idx = table
.indices()
.iter()
.find(|i| i.column_position == col_pos && (i.is_gin() || i.is_gin_fulltext()))?;
let candidates = gin_query_candidates(idx, &query)?;
let _ = catalog; let mut out: Vec<Cow<'a, Row>> = Vec::with_capacity(candidates.len());
for loc in candidates {
match loc {
spg_storage::RowLocator::Hot(i) => {
if let Some(row) = table.rows().get(i) {
out.push(Cow::Borrowed(row));
}
}
spg_storage::RowLocator::Cold { .. } => {}
}
}
Some(out)
}
pub(crate) fn try_trgm_seek<'a>(
where_expr: &Expr,
schema_cols: &[ColumnSchema],
table: &'a Table,
table_alias: &str,
) -> Option<Vec<Cow<'a, Row>>> {
if let Expr::Binary {
lhs,
op: BinOp::And,
rhs,
} = where_expr
{
if let Some(rows) = try_trgm_seek(lhs, schema_cols, table, table_alias) {
return Some(rows);
}
return try_trgm_seek(rhs, schema_cols, table, table_alias);
}
let Expr::Like { expr, pattern, .. } = where_expr else {
return None;
};
let Expr::Column(c) = expr.as_ref() else {
return None;
};
if let Some(q) = &c.qualifier
&& q != table_alias
{
return None;
}
let col_pos = schema_cols
.iter()
.position(|s| s.name.eq_ignore_ascii_case(&c.name))?;
let idx = table
.indices()
.iter()
.find(|i| i.column_position == col_pos && i.is_gin_trgm())?;
let Expr::Literal(spg_sql::ast::Literal::String(pat)) = pattern.as_ref() else {
return None;
};
let trigrams = spg_storage::trgm::trigrams_from_like_pattern(pat)?;
let mut iter = trigrams.iter();
let first = iter.next()?;
let mut acc: Vec<spg_storage::RowLocator> = {
let mut v = idx.gin_trgm_lookup(first).to_vec();
v.sort_by_key(locator_sort_key);
v.dedup_by_key(|l| locator_sort_key(l));
v
};
for tri in iter {
let mut next: Vec<spg_storage::RowLocator> = idx.gin_trgm_lookup(tri).to_vec();
next.sort_by_key(locator_sort_key);
next.dedup_by_key(|l| locator_sort_key(l));
let mut merged: Vec<spg_storage::RowLocator> =
Vec::with_capacity(acc.len().min(next.len()));
let (mut i, mut j) = (0usize, 0usize);
while i < acc.len() && j < next.len() {
let lk = locator_sort_key(&acc[i]);
let rk = locator_sort_key(&next[j]);
match lk.cmp(&rk) {
core::cmp::Ordering::Less => i += 1,
core::cmp::Ordering::Greater => j += 1,
core::cmp::Ordering::Equal => {
merged.push(acc[i]);
i += 1;
j += 1;
}
}
}
acc = merged;
if acc.is_empty() {
break;
}
}
let mut out: Vec<Cow<'a, Row>> = Vec::with_capacity(acc.len());
for loc in acc {
if let spg_storage::RowLocator::Hot(i) = loc
&& let Some(row) = table.rows().get(i)
{
out.push(Cow::Borrowed(row));
}
}
Some(out)
}
pub(crate) fn resolve_gin_col_query(
col_side: &Expr,
query_side: &Expr,
schema_cols: &[ColumnSchema],
table_alias: &str,
ctx: &eval::EvalContext<'_>,
) -> Option<(usize, spg_storage::TsQueryAst)> {
let column = match col_side {
Expr::Column(c) => c,
Expr::FunctionCall { name, args }
if name.eq_ignore_ascii_case("to_tsvector") && !args.is_empty() =>
{
if let Expr::Column(c) = args.last().unwrap() {
c
} else {
return None;
}
}
_ => return None,
};
let c = column;
if let Some(q) = &c.qualifier
&& q != table_alias
{
return None;
}
let pos = schema_cols.iter().position(|s| s.name == c.name)?;
let empty_row = Row::new(Vec::new());
let v = eval::eval_expr(query_side, &empty_row, ctx).ok()?;
let Value::TsQuery(q) = v else { return None };
Some((pos, q))
}
pub(crate) fn gin_query_candidates(
idx: &spg_storage::Index,
query: &spg_storage::TsQueryAst,
) -> Option<Vec<spg_storage::RowLocator>> {
use spg_storage::TsQueryAst;
match query {
TsQueryAst::Term { word, .. } => {
let mut v: Vec<spg_storage::RowLocator> = idx.gin_lookup_word(word).to_vec();
v.sort_by_key(locator_sort_key);
v.dedup_by_key(|l| locator_sort_key(l));
Some(v)
}
TsQueryAst::And(l, r) => {
let mut left = gin_query_candidates(idx, l)?;
let mut right = gin_query_candidates(idx, r)?;
left.sort_by_key(locator_sort_key);
right.sort_by_key(locator_sort_key);
let mut out: Vec<spg_storage::RowLocator> = Vec::new();
let (mut i, mut j) = (0usize, 0usize);
while i < left.len() && j < right.len() {
let lk = locator_sort_key(&left[i]);
let rk = locator_sort_key(&right[j]);
match lk.cmp(&rk) {
core::cmp::Ordering::Less => i += 1,
core::cmp::Ordering::Greater => j += 1,
core::cmp::Ordering::Equal => {
out.push(left[i]);
i += 1;
j += 1;
}
}
}
Some(out)
}
TsQueryAst::Or(l, r) => {
let mut out = gin_query_candidates(idx, l)?;
out.extend(gin_query_candidates(idx, r)?);
out.sort_by_key(locator_sort_key);
out.dedup_by_key(|l| locator_sort_key(l));
Some(out)
}
TsQueryAst::Not(_) | TsQueryAst::Phrase { .. } => None,
}
}
pub(crate) fn locator_sort_key(l: &spg_storage::RowLocator) -> (u8, u64, u64) {
match *l {
spg_storage::RowLocator::Hot(i) => (0, i as u64, 0),
spg_storage::RowLocator::Cold {
segment_id,
page_offset,
} => (1, u64::from(segment_id), u64::from(page_offset)),
}
}
pub(crate) fn try_pk_predicate(
where_expr: &Expr,
schema_cols: &[ColumnSchema],
table_alias: &str,
) -> Option<(usize, IndexKey)> {
let Expr::Binary {
lhs,
op: BinOp::Eq,
rhs,
} = where_expr
else {
return None;
};
let (col_pos, value) = resolve_col_literal_pair(lhs, rhs, schema_cols, table_alias)
.or_else(|| resolve_col_literal_pair(rhs, lhs, schema_cols, table_alias))?;
let key = IndexKey::from_value(&value)?;
Some((col_pos, key))
}
pub(crate) fn resolve_col_literal_pair(
col_side: &Expr,
lit_side: &Expr,
schema_cols: &[ColumnSchema],
table_alias: &str,
) -> Option<(usize, Value)> {
let Expr::Column(c) = col_side else {
return None;
};
if let Some(q) = &c.qualifier
&& q != table_alias
{
return None;
}
let pos = schema_cols.iter().position(|s| s.name == c.name)?;
let Expr::Literal(l) = lit_side else {
return None;
};
let v = match l {
Literal::Integer(n) => {
if let Ok(small) = i32::try_from(*n) {
Value::Int(small)
} else {
Value::BigInt(*n)
}
}
Literal::Float(x) => Value::Float(*x),
Literal::String(s) => Value::Text(s.clone()),
Literal::Bool(b) => Value::Bool(*b),
Literal::Null => Value::Null,
Literal::Vector(_)
| Literal::Interval { .. }
| Literal::TextArray(_)
| Literal::IntArray(_)
| Literal::BigIntArray(_) => return None,
};
Some((pos, v))
}