#![no_std]
extern crate alloc;
pub mod aggregate;
pub mod describe;
pub mod eval;
pub mod json;
pub mod memoize;
pub mod plan_cache;
pub mod publications;
pub mod query_stats;
pub mod reorder;
pub mod selectivity;
pub mod statistics;
pub mod subscriptions;
pub mod users;
pub use crate::users::{Role, ScramSecrets, UserError, UserStore};
use alloc::borrow::Cow;
use alloc::boxed::Box;
use alloc::collections::BTreeMap;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use core::fmt;
use spg_sql::ast::{
BinOp, ColumnDef, ColumnName, ColumnTypeName, CreateIndexStatement,
CreatePublicationStatement, CreateSubscriptionStatement, CreateTableStatement,
CreateUserStatement, Expr, FrameBound, FrameKind, FromClause, IndexMethod, InsertStatement,
JoinKind, Literal, OrderBy, SelectItem, SelectStatement, Statement, UnOp, UnionKind,
VecEncoding as SqlVecEncoding, WindowFrame,
};
use spg_sql::parser::{self, ParseError};
use spg_storage::{
Catalog, ColumnSchema, CompactReport, DataType, IndexKey, IndexKind, Row, StorageError, Table,
TableSchema, Value, VecEncoding,
};
use crate::eval::{EvalContext, EvalError};
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum QueryResult {
CommandOk {
affected: usize,
modified_catalog: bool,
},
Rows {
columns: Vec<ColumnSchema>,
rows: Vec<Row>,
},
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum EngineError {
Parse(ParseError),
Storage(StorageError),
Eval(EvalError),
Unsupported(String),
TransactionAlreadyOpen,
NoActiveTransaction,
WriteRequired,
RowLimitExceeded(usize),
Cancelled,
}
impl fmt::Display for EngineError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Parse(e) => write!(f, "parse: {e}"),
Self::Storage(e) => write!(f, "storage: {e}"),
Self::Eval(e) => write!(f, "eval: {e}"),
Self::Unsupported(s) => write!(f, "unsupported: {s}"),
Self::TransactionAlreadyOpen => f.write_str("a transaction is already open"),
Self::NoActiveTransaction => f.write_str("no active transaction"),
Self::WriteRequired => {
f.write_str("statement requires a write lock (use execute, not execute_readonly)")
}
Self::RowLimitExceeded(n) => {
write!(f, "query exceeded max_query_rows={n}")
}
Self::Cancelled => f.write_str("query cancelled (timeout or client request)"),
}
}
}
impl From<ParseError> for EngineError {
fn from(e: ParseError) -> Self {
Self::Parse(e)
}
}
impl From<StorageError> for EngineError {
fn from(e: StorageError) -> Self {
Self::Storage(e)
}
}
impl From<EvalError> for EngineError {
fn from(e: EvalError) -> Self {
Self::Eval(e)
}
}
pub type ClockFn = fn() -> i64;
pub type SaltFn = fn() -> [u8; 16];
#[derive(Debug, Clone, Copy)]
pub struct CancelToken<'a> {
flag: Option<&'a core::sync::atomic::AtomicBool>,
}
impl<'a> CancelToken<'a> {
#[must_use]
pub const fn none() -> Self {
Self { flag: None }
}
#[must_use]
pub const fn from_flag(f: &'a core::sync::atomic::AtomicBool) -> Self {
Self { flag: Some(f) }
}
#[must_use]
pub fn is_cancelled(self) -> bool {
self.flag
.is_some_and(|f| f.load(core::sync::atomic::Ordering::Relaxed))
}
#[inline]
pub fn check(self) -> Result<(), EngineError> {
if self.is_cancelled() {
Err(EngineError::Cancelled)
} else {
Ok(())
}
}
}
const ENVELOPE_MAGIC: &[u8; 8] = b"SPGENV01";
const ENVELOPE_VERSION_V1: u8 = 1;
const ENVELOPE_VERSION_V2: u8 = 2;
const ENVELOPE_VERSION_V3: u8 = 3;
const ENVELOPE_VERSION_V4: u8 = 4;
const ENVELOPE_VERSION_V5: u8 = 5;
fn build_envelope(
catalog: &[u8],
users: &[u8],
pubs: &[u8],
subs: &[u8],
stats: &[u8],
) -> Vec<u8> {
let mut out = Vec::with_capacity(
8 + 1
+ 4
+ catalog.len()
+ 4
+ users.len()
+ 4
+ pubs.len()
+ 4
+ subs.len()
+ 4
+ stats.len()
+ 4,
);
out.extend_from_slice(ENVELOPE_MAGIC);
out.push(ENVELOPE_VERSION_V5);
out.extend_from_slice(
&u32::try_from(catalog.len())
.expect("≤ 4G catalog")
.to_le_bytes(),
);
out.extend_from_slice(catalog);
out.extend_from_slice(
&u32::try_from(users.len())
.expect("≤ 4G users")
.to_le_bytes(),
);
out.extend_from_slice(users);
out.extend_from_slice(
&u32::try_from(pubs.len())
.expect("≤ 4G publications")
.to_le_bytes(),
);
out.extend_from_slice(pubs);
out.extend_from_slice(
&u32::try_from(subs.len())
.expect("≤ 4G subscriptions")
.to_le_bytes(),
);
out.extend_from_slice(subs);
out.extend_from_slice(
&u32::try_from(stats.len())
.expect("≤ 4G statistics")
.to_le_bytes(),
);
out.extend_from_slice(stats);
let crc = spg_crypto::crc32::crc32(&out);
out.extend_from_slice(&crc.to_le_bytes());
out
}
enum EnvelopeParse<'a> {
Bare,
Pair {
catalog: &'a [u8],
users: &'a [u8],
publications: Option<&'a [u8]>,
subscriptions: Option<&'a [u8]>,
statistics: Option<&'a [u8]>,
},
CrcMismatch {
expected: u32,
computed: u32,
},
}
fn split_envelope(buf: &[u8]) -> EnvelopeParse<'_> {
if buf.len() < 8 + 1 + 4 || &buf[..8] != ENVELOPE_MAGIC {
return EnvelopeParse::Bare;
}
let version = buf[8];
if !matches!(
version,
ENVELOPE_VERSION_V1
| ENVELOPE_VERSION_V2
| ENVELOPE_VERSION_V3
| ENVELOPE_VERSION_V4
| ENVELOPE_VERSION_V5
) {
return EnvelopeParse::Bare;
}
let mut p = 9usize;
let Some(cat_len_bytes) = buf.get(p..p + 4) else {
return EnvelopeParse::Bare;
};
let Ok(cat_len_arr) = cat_len_bytes.try_into() else {
return EnvelopeParse::Bare;
};
let cat_len = u32::from_le_bytes(cat_len_arr) as usize;
p += 4;
if p + cat_len + 4 > buf.len() {
return EnvelopeParse::Bare;
}
let catalog = &buf[p..p + cat_len];
p += cat_len;
let Some(user_len_bytes) = buf.get(p..p + 4) else {
return EnvelopeParse::Bare;
};
let Ok(user_len_arr) = user_len_bytes.try_into() else {
return EnvelopeParse::Bare;
};
let user_len = u32::from_le_bytes(user_len_arr) as usize;
p += 4;
if p + user_len > buf.len() {
return EnvelopeParse::Bare;
}
let users = &buf[p..p + user_len];
p += user_len;
let publications = if matches!(
version,
ENVELOPE_VERSION_V3 | ENVELOPE_VERSION_V4 | ENVELOPE_VERSION_V5
) {
let Some(pubs_len_bytes) = buf.get(p..p + 4) else {
return EnvelopeParse::Bare;
};
let Ok(pubs_len_arr) = pubs_len_bytes.try_into() else {
return EnvelopeParse::Bare;
};
let pubs_len = u32::from_le_bytes(pubs_len_arr) as usize;
p += 4;
if p + pubs_len > buf.len() {
return EnvelopeParse::Bare;
}
let pubs_slice = &buf[p..p + pubs_len];
p += pubs_len;
Some(pubs_slice)
} else {
None
};
let subscriptions = if matches!(version, ENVELOPE_VERSION_V4 | ENVELOPE_VERSION_V5) {
let Some(subs_len_bytes) = buf.get(p..p + 4) else {
return EnvelopeParse::Bare;
};
let Ok(subs_len_arr) = subs_len_bytes.try_into() else {
return EnvelopeParse::Bare;
};
let subs_len = u32::from_le_bytes(subs_len_arr) as usize;
p += 4;
if p + subs_len > buf.len() {
return EnvelopeParse::Bare;
}
let subs_slice = &buf[p..p + subs_len];
p += subs_len;
Some(subs_slice)
} else {
None
};
let statistics = if version == ENVELOPE_VERSION_V5 {
let Some(stats_len_bytes) = buf.get(p..p + 4) else {
return EnvelopeParse::Bare;
};
let Ok(stats_len_arr) = stats_len_bytes.try_into() else {
return EnvelopeParse::Bare;
};
let stats_len = u32::from_le_bytes(stats_len_arr) as usize;
p += 4;
if p + stats_len > buf.len() {
return EnvelopeParse::Bare;
}
let stats_slice = &buf[p..p + stats_len];
p += stats_len;
Some(stats_slice)
} else {
None
};
if matches!(
version,
ENVELOPE_VERSION_V2 | ENVELOPE_VERSION_V3 | ENVELOPE_VERSION_V4 | ENVELOPE_VERSION_V5
) {
if p + 4 != buf.len() {
return EnvelopeParse::Bare;
}
let Ok(crc_arr) = buf[p..p + 4].try_into() else {
return EnvelopeParse::Bare;
};
let expected = u32::from_le_bytes(crc_arr);
let computed = spg_crypto::crc32::crc32(&buf[..p]);
if expected != computed {
return EnvelopeParse::CrcMismatch { expected, computed };
}
} else if p != buf.len() {
return EnvelopeParse::Bare;
}
EnvelopeParse::Pair {
catalog,
users,
publications,
subscriptions,
statistics,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TxId(pub u64);
pub const IMPLICIT_TX: TxId = TxId(0);
pub const COMPACTION_TARGET_DEFAULT_BYTES: u64 = 4 * 1024 * 1024;
#[derive(Debug, Default, Clone)]
struct TxState {
catalog: Catalog,
savepoints: Vec<(String, Catalog)>,
}
#[derive(Debug, Default)]
pub struct Engine {
catalog: Catalog,
tx_catalogs: BTreeMap<TxId, TxState>,
current_tx: Option<TxId>,
next_tx_id: u64,
clock: Option<ClockFn>,
salt_fn: Option<SaltFn>,
max_query_rows: Option<usize>,
users: UserStore,
publications: publications::Publications,
subscriptions: subscriptions::Subscriptions,
statistics: statistics::Statistics,
plan_cache: plan_cache::PlanCache,
query_stats: query_stats::QueryStats,
activity_provider: Option<ActivityProvider>,
audit_chain_provider: Option<AuditChainProvider>,
audit_verifier: Option<AuditVerifier>,
slow_query_threshold_us: Option<u64>,
slow_query_logger: Option<SlowQueryLogger>,
}
pub type SlowQueryLogger = fn(&str, u64);
fn render_create_table(name: &str, columns: &[ColumnSchema]) -> String {
let mut out = alloc::format!("CREATE TABLE {name} (");
for (i, col) in columns.iter().enumerate() {
if i > 0 {
out.push_str(", ");
}
out.push_str(&col.name);
out.push(' ');
out.push_str(&render_data_type(col.ty));
if !col.nullable {
out.push_str(" NOT NULL");
}
if col.auto_increment {
out.push_str(" AUTO_INCREMENT");
}
}
out.push(')');
out
}
fn render_data_type(ty: DataType) -> String {
match ty {
DataType::SmallInt => "SMALLINT".into(),
DataType::Int => "INT".into(),
DataType::BigInt => "BIGINT".into(),
DataType::Float => "FLOAT".into(),
DataType::Text => "TEXT".into(),
DataType::Varchar(n) => alloc::format!("VARCHAR({n})"),
DataType::Char(n) => alloc::format!("CHAR({n})"),
DataType::Bool => "BOOL".into(),
DataType::Vector { dim, encoding } => match encoding {
spg_storage::VecEncoding::F32 => alloc::format!("VECTOR({dim})"),
spg_storage::VecEncoding::Sq8 => alloc::format!("VECTOR({dim}) USING SQ8"),
spg_storage::VecEncoding::F16 => alloc::format!("VECTOR({dim}) USING HALF"),
},
DataType::Numeric { precision, scale } => {
alloc::format!("NUMERIC({precision},{scale})")
}
DataType::Date => "DATE".into(),
DataType::Timestamp => "TIMESTAMP".into(),
DataType::Interval => "INTERVAL".into(),
DataType::Json => "JSON".into(),
DataType::Jsonb => "JSONB".into(),
DataType::Timestamptz => "TIMESTAMPTZ".into(),
}
}
#[derive(Debug, Clone)]
pub struct ActivityRow {
pub pid: u32,
pub user: String,
pub started_at_us: i64,
pub current_sql: String,
pub wait_event: String,
pub elapsed_us: i64,
pub in_transaction: bool,
}
pub type ActivityProvider = fn() -> Vec<ActivityRow>;
#[derive(Debug, Clone)]
pub struct AuditRow {
pub seq: i64,
pub ts_ms: i64,
pub prev_hash_hex: String,
pub entry_hash_hex: String,
pub sql: String,
}
pub type AuditChainProvider = fn() -> Vec<AuditRow>;
pub type AuditVerifier = fn() -> (i64, i64);
impl Engine {
pub fn new() -> Self {
Self {
catalog: Catalog::new(),
tx_catalogs: BTreeMap::new(),
current_tx: None,
next_tx_id: 1,
clock: None,
salt_fn: None,
max_query_rows: None,
users: UserStore::new(),
publications: publications::Publications::new(),
subscriptions: subscriptions::Subscriptions::new(),
statistics: statistics::Statistics::new(),
plan_cache: plan_cache::PlanCache::new(),
query_stats: query_stats::QueryStats::new(),
activity_provider: None,
audit_chain_provider: None,
audit_verifier: None,
slow_query_threshold_us: None,
slow_query_logger: None,
}
}
pub fn restore(catalog: Catalog) -> Self {
Self {
catalog,
tx_catalogs: BTreeMap::new(),
current_tx: None,
next_tx_id: 1,
clock: None,
salt_fn: None,
max_query_rows: None,
users: UserStore::new(),
publications: publications::Publications::new(),
subscriptions: subscriptions::Subscriptions::new(),
statistics: statistics::Statistics::new(),
plan_cache: plan_cache::PlanCache::new(),
query_stats: query_stats::QueryStats::new(),
activity_provider: None,
audit_chain_provider: None,
audit_verifier: None,
slow_query_threshold_us: None,
slow_query_logger: None,
}
}
pub fn restore_envelope(buf: &[u8]) -> Result<Self, EngineError> {
match split_envelope(buf) {
EnvelopeParse::Pair {
catalog: catalog_bytes,
users: user_bytes,
publications: pub_bytes,
subscriptions: sub_bytes,
statistics: stats_bytes,
} => {
let catalog = Catalog::deserialize(catalog_bytes).map_err(EngineError::Storage)?;
let users = users::deserialize_users(user_bytes)
.map_err(|e| EngineError::Unsupported(alloc::format!("users restore: {e}")))?;
let publications = match pub_bytes {
Some(b) => publications::Publications::deserialize(b).map_err(|e| {
EngineError::Unsupported(alloc::format!("publications restore: {e:?}"))
})?,
None => publications::Publications::new(),
};
let subscriptions = match sub_bytes {
Some(b) => subscriptions::Subscriptions::deserialize(b).map_err(|e| {
EngineError::Unsupported(alloc::format!("subscriptions restore: {e:?}"))
})?,
None => subscriptions::Subscriptions::new(),
};
let statistics = match stats_bytes {
Some(b) => statistics::Statistics::deserialize(b).map_err(|e| {
EngineError::Unsupported(alloc::format!("statistics restore: {e:?}"))
})?,
None => statistics::Statistics::new(),
};
Ok(Self {
catalog,
tx_catalogs: BTreeMap::new(),
current_tx: None,
next_tx_id: 1,
clock: None,
salt_fn: None,
max_query_rows: None,
users,
publications,
subscriptions,
statistics,
plan_cache: plan_cache::PlanCache::new(),
query_stats: query_stats::QueryStats::new(),
activity_provider: None,
audit_chain_provider: None,
audit_verifier: None,
slow_query_threshold_us: None,
slow_query_logger: None,
})
}
EnvelopeParse::CrcMismatch { expected, computed } => {
Err(EngineError::Storage(StorageError::Corrupt(alloc::format!(
"snapshot envelope CRC32 mismatch (expected={expected:#010x}, computed={computed:#010x})"
))))
}
EnvelopeParse::Bare => {
let catalog = Catalog::deserialize(buf).map_err(EngineError::Storage)?;
Ok(Self::restore(catalog))
}
}
}
pub const fn users(&self) -> &UserStore {
&self.users
}
pub fn create_user(
&mut self,
name: &str,
password: &str,
role: Role,
salt: [u8; 16],
) -> Result<(), UserError> {
self.users.create(name, password, role, salt)?;
let scram_salt = self.salt_fn.map_or_else(
|| {
let mut s = [0u8; users::SCRAM_SALT_LEN];
let digest = spg_crypto::hash(name.as_bytes());
s.copy_from_slice(&digest[16..32]);
s
},
|f| f(),
);
self.users
.enable_scram(name, password, scram_salt, users::SCRAM_DEFAULT_ITERS)?;
Ok(())
}
pub fn drop_user(&mut self, name: &str) -> Result<(), UserError> {
self.users.drop(name)
}
pub fn verify_user(&self, name: &str, password: &str) -> Option<Role> {
self.users.verify(name, password)
}
#[must_use]
pub const fn with_clock(mut self, clock: ClockFn) -> Self {
self.clock = Some(clock);
self
}
#[must_use]
pub const fn with_salt_fn(mut self, f: SaltFn) -> Self {
self.salt_fn = Some(f);
self
}
#[must_use]
pub const fn with_max_query_rows(mut self, n: usize) -> Self {
self.max_query_rows = Some(n);
self
}
pub const fn catalog(&self) -> &Catalog {
&self.catalog
}
pub fn snapshot(&self) -> Vec<u8> {
if self.users.is_empty()
&& self.publications.is_empty()
&& self.subscriptions.is_empty()
&& self.statistics.is_empty()
{
self.catalog.serialize()
} else {
build_envelope(
&self.catalog.serialize(),
&users::serialize_users(&self.users),
&self.publications.serialize(),
&self.subscriptions.serialize(),
&self.statistics.serialize(),
)
}
}
pub fn in_transaction(&self) -> bool {
!self.tx_catalogs.is_empty()
}
pub fn alloc_tx_id(&mut self) -> TxId {
let id = TxId(self.next_tx_id);
self.next_tx_id = self.next_tx_id.saturating_add(1);
id
}
pub fn replace_catalog(&mut self, catalog: Catalog) {
self.catalog = catalog;
}
pub fn freeze_oldest_to_cold(
&mut self,
table_name: &str,
index_name: &str,
max_rows: usize,
) -> Result<spg_storage::FreezeReport, EngineError> {
let report = self
.active_catalog_mut()
.freeze_oldest_to_cold(table_name, index_name, max_rows)
.map_err(EngineError::Storage)?;
if let Some(t) = self.active_catalog_mut().get_mut(table_name) {
t.mark_cold_row_count_stale();
}
Ok(report)
}
pub fn receive_cold_segment(
&mut self,
segment_id: u32,
bytes: Vec<u8>,
) -> Result<(), EngineError> {
let mut new_cat = self.catalog.clone();
match new_cat.load_segment_bytes_at(segment_id, bytes) {
Ok(()) => {
self.replace_catalog(new_cat);
Ok(())
}
Err(StorageError::Corrupt(msg)) if msg.contains("already occupied") => Ok(()),
Err(e) => Err(EngineError::Storage(e)),
}
}
pub fn compact_cold_segments_with_target(
&mut self,
target_segment_bytes: u64,
) -> Result<Vec<(String, String, CompactReport)>, EngineError> {
let table_names = self.active_catalog().table_names();
let mut reports: Vec<(String, String, CompactReport)> = Vec::new();
for tname in table_names {
if is_internal_table_name(&tname) {
continue;
}
let idx_names: Vec<String> = {
let Some(t) = self.active_catalog().get(&tname) else {
continue;
};
t.indices()
.iter()
.filter(|i| matches!(i.kind, IndexKind::BTree(_)))
.map(|i| i.name.clone())
.collect()
};
for iname in idx_names {
let report = self
.active_catalog_mut()
.compact_cold_segments(&tname, &iname, target_segment_bytes)
.map_err(EngineError::Storage)?;
if report.merged_segment_id.is_some() {
if let Some(t) = self.active_catalog_mut().get_mut(&tname) {
t.mark_cold_row_count_stale();
}
reports.push((tname.clone(), iname, report));
}
}
}
Ok(reports)
}
fn active_catalog(&self) -> &Catalog {
match self.current_tx {
Some(t) => self
.tx_catalogs
.get(&t)
.map_or(&self.catalog, |s| &s.catalog),
None => &self.catalog,
}
}
fn active_catalog_mut(&mut self) -> &mut Catalog {
let tx = self.current_tx;
match tx {
Some(t) => match self.tx_catalogs.get_mut(&t) {
Some(s) => &mut s.catalog,
None => &mut self.catalog,
},
None => &mut self.catalog,
}
}
pub fn execute_readonly(&self, sql: &str) -> Result<QueryResult, EngineError> {
self.execute_readonly_with_cancel(sql, CancelToken::none())
}
pub fn execute_readonly_with_cancel(
&self,
sql: &str,
cancel: CancelToken<'_>,
) -> Result<QueryResult, EngineError> {
cancel.check()?;
let mut stmt = parser::parse_statement(sql)?;
let now_micros = self.clock.map(|f| f());
rewrite_clock_calls(&mut stmt, now_micros);
if let Statement::Select(s) = &mut stmt {
resolve_order_by_position(s);
reorder::reorder_joins(s, &self.catalog, &self.statistics);
}
let result = match stmt {
Statement::Select(s) => self.exec_select_cancel(&s, cancel),
Statement::ShowTables => Ok(self.exec_show_tables()),
Statement::ShowColumns(table) => self.exec_show_columns(&table),
Statement::ShowUsers => Ok(self.exec_show_users()),
Statement::ShowPublications => Ok(self.exec_show_publications()),
Statement::ShowSubscriptions => Ok(self.exec_show_subscriptions()),
Statement::WaitForWalPosition { .. } => Err(EngineError::Unsupported(
"WAIT FOR WAL POSITION must be handled by the server layer".into(),
)),
Statement::Explain(e) => self.exec_explain(&e, cancel),
_ => Err(EngineError::WriteRequired),
};
self.enforce_row_limit(result)
}
fn enforce_row_limit(
&self,
result: Result<QueryResult, EngineError>,
) -> Result<QueryResult, EngineError> {
if let (Ok(QueryResult::Rows { rows, .. }), Some(cap)) = (&result, self.max_query_rows)
&& rows.len() > cap
{
return Err(EngineError::RowLimitExceeded(cap));
}
result
}
pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
self.execute_in_with_cancel(sql, IMPLICIT_TX, CancelToken::none())
}
pub fn execute_with_cancel(
&mut self,
sql: &str,
cancel: CancelToken<'_>,
) -> Result<QueryResult, EngineError> {
self.execute_in_with_cancel(sql, IMPLICIT_TX, cancel)
}
pub fn execute_in(&mut self, sql: &str, tx_id: TxId) -> Result<QueryResult, EngineError> {
self.execute_in_with_cancel(sql, tx_id, CancelToken::none())
}
pub fn execute_in_with_cancel(
&mut self,
sql: &str,
tx_id: TxId,
cancel: CancelToken<'_>,
) -> Result<QueryResult, EngineError> {
let saved = self.current_tx;
self.current_tx = Some(tx_id);
let result = self.execute_inner_with_cancel(sql, cancel);
self.current_tx = saved;
result
}
pub fn prepare(&self, sql: &str) -> Result<Statement, ParseError> {
let mut stmt = parser::parse_statement(sql)?;
let now_micros = self.clock.map(|f| f());
rewrite_clock_calls(&mut stmt, now_micros);
if let Statement::Select(s) = &mut stmt {
expand_group_by_all(s);
resolve_order_by_position(s);
reorder::reorder_joins(s, &self.catalog, &self.statistics);
}
Ok(stmt)
}
pub fn prepare_cached(&mut self, sql: &str) -> Result<Statement, ParseError> {
let current_version = self.statistics.version();
if let Some(plan) = self.plan_cache.get(sql) {
if plan.statistics_version == current_version {
return Ok(plan.stmt.clone());
}
}
self.plan_cache.evict(sql);
let stmt = self.prepare(sql)?;
let source_tables = plan_cache::collect_source_tables(&stmt);
let plan = plan_cache::PreparedPlan {
stmt: stmt.clone(),
statistics_version: current_version,
source_tables,
describe_columns: alloc::vec::Vec::new(),
};
self.plan_cache.insert(String::from(sql), plan);
Ok(stmt)
}
pub fn plan_cache(&self) -> &plan_cache::PlanCache {
&self.plan_cache
}
pub fn plan_cache_mut(&mut self) -> &mut plan_cache::PlanCache {
&mut self.plan_cache
}
pub fn describe_prepared(
&self,
stmt: &Statement,
) -> (Vec<u32>, Vec<ColumnSchema>) {
describe::describe_prepared(stmt, self.active_catalog())
}
pub fn execute_prepared(
&mut self,
mut stmt: Statement,
params: &[Value],
) -> Result<QueryResult, EngineError> {
substitute_placeholders(&mut stmt, params)?;
self.execute_stmt_with_cancel(stmt, CancelToken::none())
}
fn execute_inner_with_cancel(
&mut self,
sql: &str,
cancel: CancelToken<'_>,
) -> Result<QueryResult, EngineError> {
cancel.check()?;
let stmt = self.prepare(sql)?;
let start_us = self.clock.map(|f| f());
let result = self.execute_stmt_with_cancel(stmt, cancel);
if let (Some(t0), Ok(_)) = (start_us, &result) {
let now = self.clock.map_or(t0, |f| f());
let elapsed = now.saturating_sub(t0).max(0) as u64;
self.query_stats.record(sql, elapsed, now as u64);
if let (Some(threshold), Some(logger)) =
(self.slow_query_threshold_us, self.slow_query_logger)
&& elapsed >= threshold
{
logger(sql, elapsed);
}
}
result
}
fn execute_stmt_with_cancel(
&mut self,
stmt: Statement,
cancel: CancelToken<'_>,
) -> Result<QueryResult, EngineError> {
cancel.check()?;
let result = match stmt {
Statement::CreateTable(s) => self.exec_create_table(s),
Statement::CreateExtension(_) => Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: false,
}),
Statement::CreateIndex(s) => self.exec_create_index(s),
Statement::Insert(s) => self.exec_insert(s),
Statement::Update(s) => self.exec_update_cancel(&s, cancel),
Statement::Delete(s) => self.exec_delete_cancel(&s, cancel),
Statement::Select(s) => self.exec_select_cancel(&s, cancel),
Statement::Begin => self.exec_begin(),
Statement::Commit => self.exec_commit(),
Statement::Rollback => self.exec_rollback(),
Statement::Savepoint(name) => self.exec_savepoint(name),
Statement::RollbackToSavepoint(name) => self.exec_rollback_to_savepoint(&name),
Statement::ReleaseSavepoint(name) => self.exec_release_savepoint(&name),
Statement::ShowTables => Ok(self.exec_show_tables()),
Statement::ShowColumns(table) => self.exec_show_columns(&table),
Statement::ShowUsers => Ok(self.exec_show_users()),
Statement::ShowPublications => Ok(self.exec_show_publications()),
Statement::ShowSubscriptions => Ok(self.exec_show_subscriptions()),
Statement::CreateUser(s) => self.exec_create_user(&s),
Statement::DropUser(name) => self.exec_drop_user(&name),
Statement::Explain(e) => self.exec_explain(&e, cancel),
Statement::AlterIndex(s) => self.exec_alter_index(s),
Statement::AlterTable(s) => self.exec_alter_table(s),
Statement::CreatePublication(s) => self.exec_create_publication(s),
Statement::DropPublication(name) => self.exec_drop_publication(&name),
Statement::CreateSubscription(s) => self.exec_create_subscription(s),
Statement::DropSubscription(name) => self.exec_drop_subscription(&name),
Statement::WaitForWalPosition { .. } => Err(EngineError::Unsupported(
"WAIT FOR WAL POSITION must be handled by the server layer".into(),
)),
Statement::Analyze(target) => self.exec_analyze(target.as_deref()),
Statement::CompactColdSegments => self.exec_compact_cold_segments(),
};
self.enforce_row_limit(result)
}
fn exec_create_publication(
&mut self,
s: CreatePublicationStatement,
) -> Result<QueryResult, EngineError> {
self.publications
.create(s.name, s.scope)
.map_err(|e| EngineError::Unsupported(alloc::format!("CREATE PUBLICATION: {e:?}")))?;
Ok(QueryResult::CommandOk {
affected: 1,
modified_catalog: true,
})
}
fn exec_drop_publication(&mut self, name: &str) -> Result<QueryResult, EngineError> {
let removed = self.publications.drop(name);
Ok(QueryResult::CommandOk {
affected: usize::from(removed),
modified_catalog: removed,
})
}
pub const fn publications(&self) -> &publications::Publications {
&self.publications
}
fn exec_create_subscription(
&mut self,
s: CreateSubscriptionStatement,
) -> Result<QueryResult, EngineError> {
let sub = subscriptions::Subscription {
conn_str: s.conn_str,
publications: s.publications,
enabled: true,
last_received_pos: 0,
};
self.subscriptions
.create(s.name, sub)
.map_err(|e| EngineError::Unsupported(alloc::format!("CREATE SUBSCRIPTION: {e:?}")))?;
Ok(QueryResult::CommandOk {
affected: 1,
modified_catalog: true,
})
}
fn exec_drop_subscription(&mut self, name: &str) -> Result<QueryResult, EngineError> {
let removed = self.subscriptions.drop(name);
Ok(QueryResult::CommandOk {
affected: usize::from(removed),
modified_catalog: removed,
})
}
pub const fn subscriptions(&self) -> &subscriptions::Subscriptions {
&self.subscriptions
}
pub fn subscription_advance(&mut self, name: &str, pos: u64) -> bool {
self.subscriptions.update_last_received_pos(name, pos)
}
fn exec_show_subscriptions(&self) -> QueryResult {
let columns = alloc::vec![
ColumnSchema::new("name", DataType::Text, false),
ColumnSchema::new("conn_str", DataType::Text, false),
ColumnSchema::new("publications", DataType::Text, false),
ColumnSchema::new("enabled", DataType::Bool, false),
ColumnSchema::new("last_received_pos", DataType::BigInt, false),
];
let rows: Vec<Row> = self
.subscriptions
.iter()
.map(|(name, sub)| {
Row::new(alloc::vec![
Value::Text(name.clone()),
Value::Text(sub.conn_str.clone()),
Value::Text(sub.publications.join(", ")),
Value::Bool(sub.enabled),
Value::BigInt(i64::try_from(sub.last_received_pos).unwrap_or(i64::MAX)),
])
})
.collect();
QueryResult::Rows { columns, rows }
}
fn exec_spg_statistic(&self) -> QueryResult {
let columns = alloc::vec![
ColumnSchema::new("table_name", DataType::Text, false),
ColumnSchema::new("column_name", DataType::Text, false),
ColumnSchema::new("null_frac", DataType::Float, false),
ColumnSchema::new("n_distinct", DataType::BigInt, false),
ColumnSchema::new("histogram_bounds", DataType::Text, false),
ColumnSchema::new("cold_row_count", DataType::BigInt, false),
];
let rows: Vec<Row> = self
.statistics
.iter()
.map(|((t, c), s)| {
let cold = self
.catalog
.get(t)
.map_or(0, |table| table.cold_row_count());
Row::new(alloc::vec![
Value::Text(t.clone()),
Value::Text(c.clone()),
Value::Float(f64::from(s.null_frac)),
Value::BigInt(i64::try_from(s.n_distinct).unwrap_or(i64::MAX)),
Value::Text(render_histogram_bounds(&s.histogram_bounds)),
Value::BigInt(i64::try_from(cold).unwrap_or(i64::MAX)),
])
})
.collect();
QueryResult::Rows { columns, rows }
}
fn exec_spg_stat_replication(&self) -> QueryResult {
let columns = alloc::vec![
ColumnSchema::new("name", DataType::Text, false),
ColumnSchema::new("conn_str", DataType::Text, false),
ColumnSchema::new("publications", DataType::Text, false),
ColumnSchema::new("last_received_pos", DataType::BigInt, false),
ColumnSchema::new("enabled", DataType::Bool, false),
];
let rows: Vec<Row> = self
.subscriptions
.iter()
.map(|(name, sub)| {
Row::new(alloc::vec![
Value::Text(name.clone()),
Value::Text(sub.conn_str.clone()),
Value::Text(sub.publications.join(",")),
Value::BigInt(i64::try_from(sub.last_received_pos).unwrap_or(i64::MAX)),
Value::Bool(sub.enabled),
])
})
.collect();
QueryResult::Rows { columns, rows }
}
fn exec_spg_stat_segment(&self) -> QueryResult {
let columns = alloc::vec![
ColumnSchema::new("segment_id", DataType::BigInt, false),
ColumnSchema::new("table_name", DataType::Text, false),
ColumnSchema::new("num_rows", DataType::BigInt, false),
ColumnSchema::new("num_pages", DataType::BigInt, false),
ColumnSchema::new("total_bytes", DataType::BigInt, false),
];
let mut segment_owners: alloc::collections::BTreeMap<u32, String> = BTreeMap::new();
for tname in self.catalog.table_names() {
if is_internal_table_name(&tname) {
continue;
}
let Some(t) = self.catalog.get(&tname) else {
continue;
};
for idx in t.indices() {
if let spg_storage::IndexKind::BTree(map) = &idx.kind {
for (_, locs) in map.iter() {
for loc in locs {
if let spg_storage::RowLocator::Cold { segment_id, .. } = loc {
segment_owners.entry(*segment_id).or_insert_with(|| tname.clone());
}
}
}
}
}
}
let rows: Vec<Row> = self
.catalog
.cold_segment_ids_global()
.iter()
.filter_map(|&id| {
let seg = self.catalog.cold_segment(id)?;
let meta = seg.meta();
let owner = segment_owners
.get(&id)
.cloned()
.unwrap_or_default();
Some(Row::new(alloc::vec![
Value::BigInt(i64::from(id)),
Value::Text(owner),
Value::BigInt(i64::try_from(meta.num_rows).unwrap_or(i64::MAX)),
Value::BigInt(i64::from(meta.num_pages)),
Value::BigInt(i64::try_from(meta.total_bytes).unwrap_or(i64::MAX)),
]))
})
.collect();
QueryResult::Rows { columns, rows }
}
fn exec_spg_stat_query(&self) -> QueryResult {
let columns = alloc::vec![
ColumnSchema::new("sql", DataType::Text, false),
ColumnSchema::new("exec_count", DataType::BigInt, false),
ColumnSchema::new("total_us", DataType::BigInt, false),
ColumnSchema::new("mean_us", DataType::BigInt, false),
ColumnSchema::new("max_us", DataType::BigInt, false),
ColumnSchema::new("last_seen_us", DataType::BigInt, false),
];
let rows: Vec<Row> = self
.query_stats
.snapshot()
.into_iter()
.map(|(sql, s)| {
let mean = if s.exec_count == 0 {
0
} else {
s.total_us / s.exec_count
};
Row::new(alloc::vec![
Value::Text(sql),
Value::BigInt(i64::try_from(s.exec_count).unwrap_or(i64::MAX)),
Value::BigInt(i64::try_from(s.total_us).unwrap_or(i64::MAX)),
Value::BigInt(i64::try_from(mean).unwrap_or(i64::MAX)),
Value::BigInt(i64::try_from(s.max_us).unwrap_or(i64::MAX)),
Value::BigInt(i64::try_from(s.last_seen_us).unwrap_or(i64::MAX)),
])
})
.collect();
QueryResult::Rows { columns, rows }
}
#[must_use]
pub const fn with_activity_provider(mut self, f: ActivityProvider) -> Self {
self.activity_provider = Some(f);
self
}
#[must_use]
pub const fn with_audit_providers(
mut self,
chain: AuditChainProvider,
verify: AuditVerifier,
) -> Self {
self.audit_chain_provider = Some(chain);
self.audit_verifier = Some(verify);
self
}
#[must_use]
pub const fn with_slow_query_log(
mut self,
threshold_us: u64,
logger: SlowQueryLogger,
) -> Self {
self.slow_query_threshold_us = Some(threshold_us);
self.slow_query_logger = Some(logger);
self
}
pub fn set_plan_cache_max(&mut self, n: usize) {
self.plan_cache.set_max_entries(n);
}
fn exec_spg_stat_activity(&self) -> QueryResult {
let columns = alloc::vec![
ColumnSchema::new("pid", DataType::Int, false),
ColumnSchema::new("user", DataType::Text, false),
ColumnSchema::new("started_at_us", DataType::BigInt, false),
ColumnSchema::new("current_sql", DataType::Text, false),
ColumnSchema::new("wait_event", DataType::Text, false),
ColumnSchema::new("elapsed_us", DataType::BigInt, false),
ColumnSchema::new("in_transaction", DataType::Bool, false),
];
let rows: Vec<Row> = self
.activity_provider
.map(|f| f())
.unwrap_or_default()
.into_iter()
.map(|r| {
Row::new(alloc::vec![
Value::Int(i32::try_from(r.pid).unwrap_or(i32::MAX)),
Value::Text(r.user),
Value::BigInt(r.started_at_us),
Value::Text(r.current_sql),
Value::Text(r.wait_event),
Value::BigInt(r.elapsed_us),
Value::Bool(r.in_transaction),
])
})
.collect();
QueryResult::Rows { columns, rows }
}
fn exec_spg_table_ddl(&self) -> QueryResult {
let columns = alloc::vec![
ColumnSchema::new("table_name", DataType::Text, false),
ColumnSchema::new("ddl", DataType::Text, false),
];
let rows: Vec<Row> = self
.catalog
.table_names()
.into_iter()
.filter(|n| !is_internal_table_name(n))
.filter_map(|name| {
let table = self.catalog.get(&name)?;
let ddl = render_create_table(&name, &table.schema().columns);
Some(Row::new(alloc::vec![
Value::Text(name),
Value::Text(ddl),
]))
})
.collect();
QueryResult::Rows { columns, rows }
}
fn exec_spg_role_ddl(&self) -> QueryResult {
let columns = alloc::vec![
ColumnSchema::new("role_name", DataType::Text, false),
ColumnSchema::new("ddl", DataType::Text, false),
];
let rows: Vec<Row> = self
.users
.iter()
.map(|(name, rec)| {
let ddl = alloc::format!(
"CREATE USER {name} WITH PASSWORD '<redacted>' ROLE '{}'",
rec.role.as_str(),
);
Row::new(alloc::vec![Value::Text(String::from(name)), Value::Text(ddl)])
})
.collect();
QueryResult::Rows { columns, rows }
}
fn exec_spg_database_ddl(&self) -> QueryResult {
let columns = alloc::vec![ColumnSchema::new("ddl", DataType::Text, false)];
let mut out = String::new();
for (name, rec) in self.users.iter() {
out.push_str(&alloc::format!(
"CREATE USER {name} WITH PASSWORD '<redacted>' ROLE '{}';\n",
rec.role.as_str(),
));
}
for name in self.catalog.table_names() {
if is_internal_table_name(&name) {
continue;
}
if let Some(table) = self.catalog.get(&name) {
out.push_str(&render_create_table(&name, &table.schema().columns));
out.push_str(";\n");
}
}
QueryResult::Rows {
columns,
rows: alloc::vec![Row::new(alloc::vec![Value::Text(out)])],
}
}
fn exec_spg_audit_chain(&self) -> QueryResult {
let columns = alloc::vec![
ColumnSchema::new("seq", DataType::BigInt, false),
ColumnSchema::new("ts_ms", DataType::BigInt, false),
ColumnSchema::new("prev_hash", DataType::Text, false),
ColumnSchema::new("entry_hash", DataType::Text, false),
ColumnSchema::new("sql", DataType::Text, false),
];
let rows: Vec<Row> = self
.audit_chain_provider
.map(|f| f())
.unwrap_or_default()
.into_iter()
.map(|r| {
Row::new(alloc::vec![
Value::BigInt(r.seq),
Value::BigInt(r.ts_ms),
Value::Text(r.prev_hash_hex),
Value::Text(r.entry_hash_hex),
Value::Text(r.sql),
])
})
.collect();
QueryResult::Rows { columns, rows }
}
fn exec_spg_audit_verify(&self) -> QueryResult {
let columns = alloc::vec![
ColumnSchema::new("verified_count", DataType::BigInt, false),
ColumnSchema::new("broken_at_seq", DataType::BigInt, false),
];
let (verified, broken) = self.audit_verifier.map(|f| f()).unwrap_or((0, -1));
let row = Row::new(alloc::vec![
Value::BigInt(verified),
Value::BigInt(broken),
]);
QueryResult::Rows {
columns,
rows: alloc::vec![row],
}
}
pub fn query_stats(&self) -> &query_stats::QueryStats {
&self.query_stats
}
pub fn query_stats_mut(&mut self) -> &mut query_stats::QueryStats {
&mut self.query_stats
}
pub const fn statistics(&self) -> &statistics::Statistics {
&self.statistics
}
pub fn tables_needing_analyze(&self) -> Vec<String> {
const MIN_ROWS: u64 = 100;
let mut out = Vec::new();
for name in self.catalog.table_names() {
if is_internal_table_name(&name) {
continue;
}
let Some(table) = self.catalog.get(&name) else {
continue;
};
let row_count = table.rows().len() as u64;
let modified = self.statistics.modified_since_last_analyze(&name);
let base = row_count.max(MIN_ROWS);
let threshold = base.saturating_add(9) / 10;
if modified >= threshold {
out.push(name);
}
}
out
}
fn exec_analyze(&mut self, target: Option<&str>) -> Result<QueryResult, EngineError> {
let names: Vec<String> = if let Some(name) = target {
if self.catalog.get(name).is_none() {
return Err(EngineError::Storage(StorageError::TableNotFound {
name: name.to_string(),
}));
}
alloc::vec![name.to_string()]
} else {
self.catalog
.table_names()
.into_iter()
.filter(|n| !is_internal_table_name(n))
.collect()
};
let mut analysed = 0usize;
for table_name in &names {
self.analyze_one_table(table_name)?;
analysed += 1;
}
if analysed > 0 {
self.statistics.bump_version();
if target.is_some() {
for t in &names {
self.plan_cache.evict_referencing(t);
}
} else {
self.plan_cache.clear();
}
}
Ok(QueryResult::CommandOk {
affected: analysed,
modified_catalog: true,
})
}
fn exec_compact_cold_segments(&mut self) -> Result<QueryResult, EngineError> {
let target = COMPACTION_TARGET_DEFAULT_BYTES;
let reports = self.compact_cold_segments_with_target(target)?;
let columns = alloc::vec![
ColumnSchema::new("table_name", DataType::Text, false),
ColumnSchema::new("index_name", DataType::Text, false),
ColumnSchema::new("sources_merged", DataType::BigInt, false),
ColumnSchema::new("merged_segment_id", DataType::BigInt, false),
ColumnSchema::new("merged_rows", DataType::BigInt, false),
ColumnSchema::new("deleted_rows_pruned", DataType::BigInt, false),
ColumnSchema::new("bytes_reclaimed_estimate", DataType::BigInt, false),
];
let rows: Vec<Row> = reports
.into_iter()
.map(|(tname, iname, report)| {
Row::new(alloc::vec![
Value::Text(tname),
Value::Text(iname),
Value::BigInt(i64::try_from(report.sources.len()).unwrap_or(i64::MAX)),
Value::BigInt(i64::from(report.merged_segment_id.unwrap_or(0))),
Value::BigInt(i64::try_from(report.merged_rows).unwrap_or(i64::MAX)),
Value::BigInt(
i64::try_from(report.deleted_rows_pruned).unwrap_or(i64::MAX),
),
Value::BigInt(
i64::try_from(report.bytes_reclaimed_estimate).unwrap_or(i64::MAX),
),
])
})
.collect();
Ok(QueryResult::Rows { columns, rows })
}
fn analyze_one_table(&mut self, table_name: &str) -> Result<(), EngineError> {
let table = self.catalog.get(table_name).ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: table_name.to_string(),
})
})?;
let schema = table.schema().clone();
let row_count = table.rows().len();
self.statistics.clear_table(table_name);
for (col_pos, col_schema) in schema.columns.iter().enumerate() {
if matches!(col_schema.ty, DataType::Vector { .. }) {
continue;
}
let mut non_null_values: Vec<Value> = Vec::with_capacity(row_count);
let mut nulls: u64 = 0;
for row in table.rows() {
match row.values.get(col_pos) {
Some(Value::Null) | None => nulls += 1,
Some(v) => non_null_values.push(v.clone()),
}
}
non_null_values.sort_by(|a, b| sort_values_for_histogram(a, b));
let non_null: Vec<String> = non_null_values
.iter()
.map(canonical_value_repr)
.collect();
let null_frac = if row_count == 0 {
0.0
} else {
#[allow(clippy::cast_precision_loss)]
let f = nulls as f32 / row_count as f32;
f
};
let n_distinct = statistics::estimate_n_distinct(&non_null);
let histogram_bounds = statistics::build_histogram(&non_null);
self.statistics.set(
table_name.to_string(),
col_schema.name.clone(),
statistics::ColumnStats {
null_frac,
n_distinct,
histogram_bounds,
},
);
}
self.statistics.reset_modified(table_name);
let cold_count = {
let table = self
.active_catalog()
.get(table_name)
.expect("table still present");
table.count_cold_locators()
};
let table_mut = self
.active_catalog_mut()
.get_mut(table_name)
.expect("table still present");
table_mut.set_cold_row_count(cold_count);
Ok(())
}
fn exec_show_publications(&self) -> QueryResult {
let columns = alloc::vec![
ColumnSchema::new("name", DataType::Text, false),
ColumnSchema::new("scope", DataType::Text, false),
ColumnSchema::new("table_count", DataType::Int, true),
];
let rows: Vec<Row> = self
.publications
.iter()
.map(|(name, scope)| {
let (scope_str, count_val) = match scope {
spg_sql::ast::PublicationScope::AllTables => {
("FOR ALL TABLES".to_string(), Value::Null)
}
spg_sql::ast::PublicationScope::ForTables(ts) => (
alloc::format!("FOR TABLE {}", ts.join(", ")),
Value::Int(i32::try_from(ts.len()).unwrap_or(i32::MAX)),
),
spg_sql::ast::PublicationScope::AllTablesExcept(ts) => (
alloc::format!("FOR ALL TABLES EXCEPT {}", ts.join(", ")),
Value::Int(i32::try_from(ts.len()).unwrap_or(i32::MAX)),
),
};
Row::new(alloc::vec![
Value::Text(name.clone()),
Value::Text(scope_str),
count_val,
])
})
.collect();
QueryResult::Rows { columns, rows }
}
fn exec_show_users(&self) -> QueryResult {
let columns = alloc::vec![
ColumnSchema::new("name", DataType::Text, false),
ColumnSchema::new("role", DataType::Text, false),
];
let rows: Vec<Row> = self
.users
.iter()
.map(|(name, rec)| {
Row::new(alloc::vec![
Value::Text(name.to_string()),
Value::Text(rec.role.as_str().to_string()),
])
})
.collect();
QueryResult::Rows { columns, rows }
}
fn exec_create_user(&mut self, s: &CreateUserStatement) -> Result<QueryResult, EngineError> {
if self.in_transaction() {
return Err(EngineError::Unsupported(
"CREATE USER is not allowed inside a transaction".into(),
));
}
let role = users::Role::parse(&s.role).ok_or_else(|| {
EngineError::Unsupported(alloc::format!("invalid role: {:?}", s.role))
})?;
let salt = self.salt_fn.map_or_else(
|| {
let mut s_bytes = [0u8; 16];
let digest = spg_crypto::hash(s.name.as_bytes());
s_bytes.copy_from_slice(&digest[..16]);
s_bytes
},
|f| f(),
);
self.users
.create(&s.name, &s.password, role, salt)
.map_err(|e| EngineError::Unsupported(alloc::format!("CREATE USER: {e}")))?;
Ok(QueryResult::CommandOk {
affected: 1,
modified_catalog: true,
})
}
fn exec_drop_user(&mut self, name: &str) -> Result<QueryResult, EngineError> {
if self.in_transaction() {
return Err(EngineError::Unsupported(
"DROP USER is not allowed inside a transaction".into(),
));
}
self.users
.drop(name)
.map_err(|e| EngineError::Unsupported(alloc::format!("DROP USER: {e}")))?;
Ok(QueryResult::CommandOk {
affected: 1,
modified_catalog: true,
})
}
fn exec_update_cancel(
&mut self,
stmt: &spg_sql::ast::UpdateStatement,
cancel: CancelToken<'_>,
) -> Result<QueryResult, EngineError> {
if let Some(w) = &stmt.where_ {
let schema_cols = self
.active_catalog()
.get(&stmt.table)
.ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: stmt.table.clone(),
})
})?
.schema()
.columns
.clone();
if let Some((col_pos, key)) = try_pk_predicate(w, &schema_cols, stmt.table.as_str())
&& let Some(idx_name) = self
.active_catalog()
.get(&stmt.table)
.and_then(|t| t.index_on(col_pos).map(|i| i.name.clone()))
{
let _ = self
.active_catalog_mut()
.promote_cold_row(&stmt.table, &idx_name, &key);
}
}
let table = self
.active_catalog_mut()
.get_mut(&stmt.table)
.ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: stmt.table.clone(),
})
})?;
let schema_cols: Vec<ColumnSchema> = table.schema().columns.clone();
let mut targets: Vec<(usize, &Expr)> = Vec::with_capacity(stmt.assignments.len());
for (col, expr) in &stmt.assignments {
let pos = schema_cols
.iter()
.position(|c| c.name == *col)
.ok_or_else(|| {
EngineError::Eval(EvalError::ColumnNotFound { name: col.clone() })
})?;
targets.push((pos, expr));
}
let ctx = EvalContext::new(&schema_cols, Some(stmt.table.as_str()));
let mut planned: Vec<(usize, Vec<Value>)> = Vec::new();
for (i, row) in table.rows().iter().enumerate() {
if i.is_multiple_of(256) {
cancel.check()?;
}
if let Some(w) = &stmt.where_ {
let cond = eval::eval_expr(w, row, &ctx)?;
if !matches!(cond, Value::Bool(true)) {
continue;
}
}
let mut new_vals = row.values.clone();
for (pos, expr) in &targets {
let v = eval::eval_expr(expr, row, &ctx)?;
new_vals[*pos] =
coerce_value(v, schema_cols[*pos].ty, &schema_cols[*pos].name, *pos)?;
}
planned.push((i, new_vals));
}
let plan_with_old: Vec<(usize, Vec<Value>, Vec<Value>)> = planned
.iter()
.map(|(pos, new_vals)| (*pos, table.rows()[*pos].values.clone(), new_vals.clone()))
.collect();
let self_fks = table.schema().foreign_keys.clone();
let affected = planned.len();
let _ = table;
if !self_fks.is_empty() {
let new_rows: Vec<Vec<Value>> = planned
.iter()
.map(|(_pos, new_vals)| new_vals.clone())
.collect();
enforce_fk_inserts(self.active_catalog(), &stmt.table, &self_fks, &new_rows)?;
}
let child_plan = plan_fk_parent_updates(self.active_catalog(), &stmt.table, &plan_with_old)?;
for step in &child_plan {
apply_fk_child_step(self.active_catalog_mut(), step)?;
}
let table = self
.active_catalog_mut()
.get_mut(&stmt.table)
.ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: stmt.table.clone(),
})
})?;
let updated_for_returning: Vec<Vec<Value>> =
if stmt.returning.is_some() {
planned.iter().map(|(_pos, vals)| vals.clone()).collect()
} else {
Vec::new()
};
for (pos, vals) in planned {
table.update_row(pos, vals)?;
}
let _ = table;
if !self.in_transaction() && affected > 0 {
self.statistics
.record_modifications(&stmt.table, affected as u64);
}
if let Some(items) = &stmt.returning {
return self.build_returning_rows(
&stmt.table,
items,
updated_for_returning,
);
}
Ok(QueryResult::CommandOk {
affected,
modified_catalog: !self.in_transaction(),
})
}
fn exec_delete_cancel(
&mut self,
stmt: &spg_sql::ast::DeleteStatement,
cancel: CancelToken<'_>,
) -> Result<QueryResult, EngineError> {
let mut cold_shadow_count: usize = 0;
if let Some(w) = &stmt.where_ {
let schema_cols = self
.active_catalog()
.get(&stmt.table)
.ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: stmt.table.clone(),
})
})?
.schema()
.columns
.clone();
if let Some((col_pos, key)) = try_pk_predicate(w, &schema_cols, stmt.table.as_str())
&& let Some(idx_name) = self
.active_catalog()
.get(&stmt.table)
.and_then(|t| t.index_on(col_pos).map(|i| i.name.clone()))
{
cold_shadow_count = self
.active_catalog_mut()
.shadow_cold_row(&stmt.table, &idx_name, &key)
.unwrap_or(0);
}
}
let table = self
.active_catalog_mut()
.get_mut(&stmt.table)
.ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: stmt.table.clone(),
})
})?;
let schema_cols: Vec<ColumnSchema> = table.schema().columns.clone();
let ctx = EvalContext::new(&schema_cols, Some(stmt.table.as_str()));
let mut positions: Vec<usize> = Vec::new();
let mut to_delete_rows: Vec<Vec<Value>> = Vec::new();
for (i, row) in table.rows().iter().enumerate() {
if i.is_multiple_of(256) {
cancel.check()?;
}
let keep = if let Some(w) = &stmt.where_ {
let cond = eval::eval_expr(w, row, &ctx)?;
!matches!(cond, Value::Bool(true))
} else {
false
};
if !keep {
positions.push(i);
to_delete_rows.push(row.values.clone());
}
}
let _ = table;
let cascade_plan = plan_fk_parent_deletions(
self.active_catalog(),
&stmt.table,
&positions,
&to_delete_rows,
)?;
for step in &cascade_plan {
apply_fk_child_step(self.active_catalog_mut(), step)?;
}
let table = self
.active_catalog_mut()
.get_mut(&stmt.table)
.ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: stmt.table.clone(),
})
})?;
let affected = table.delete_rows(&positions) + cold_shadow_count;
let _ = table;
if !self.in_transaction() && affected > 0 {
self.statistics
.record_modifications(&stmt.table, affected as u64);
}
if let Some(items) = &stmt.returning {
return self.build_returning_rows(
&stmt.table,
items,
to_delete_rows,
);
}
Ok(QueryResult::CommandOk {
affected,
modified_catalog: !self.in_transaction(),
})
}
#[allow(clippy::format_push_string)]
fn exec_explain(
&self,
e: &spg_sql::ast::ExplainStatement,
cancel: CancelToken<'_>,
) -> Result<QueryResult, EngineError> {
let mut lines = Vec::<String>::new();
explain_select(&e.inner, self, 0, &mut lines);
if e.suggest {
let suggestions = build_index_suggestions(&e.inner, self);
for s in suggestions {
lines.push(s);
}
} else if e.analyze {
let started = self.clock.map(|f| f());
let exec = self.exec_select_cancel(&e.inner, cancel)?;
let elapsed_micros = match (self.clock, started) {
(Some(f), Some(s)) => Some(f().saturating_sub(s)),
_ => None,
};
let row_count = if let QueryResult::Rows { rows, .. } = &exec {
rows.len()
} else {
0
};
annotate_explain_lines(&mut lines, row_count, self);
let mut total = alloc::format!("Total: rows={row_count}");
if let Some(us) = elapsed_micros {
total.push_str(&alloc::format!(" elapsed={us}us"));
}
lines.push(total);
}
let columns = alloc::vec![ColumnSchema::new("QUERY PLAN", DataType::Text, false)];
let rows: Vec<Row> = lines
.into_iter()
.map(|l| Row::new(alloc::vec![Value::Text(l)]))
.collect();
Ok(QueryResult::Rows { columns, rows })
}
fn exec_show_tables(&self) -> QueryResult {
let columns = alloc::vec![ColumnSchema::new("name", DataType::Text, false)];
let rows: Vec<Row> = self
.active_catalog()
.table_names()
.into_iter()
.map(|n| Row::new(alloc::vec![Value::Text(n)]))
.collect();
QueryResult::Rows { columns, rows }
}
fn exec_show_columns(&self, table_name: &str) -> Result<QueryResult, EngineError> {
let table =
self.active_catalog()
.get(table_name)
.ok_or_else(|| StorageError::TableNotFound {
name: table_name.into(),
})?;
let columns = alloc::vec![
ColumnSchema::new("name", DataType::Text, false),
ColumnSchema::new("type", DataType::Text, false),
ColumnSchema::new("nullable", DataType::Bool, false),
];
let rows: Vec<Row> = table
.schema()
.columns
.iter()
.map(|c| {
Row::new(alloc::vec![
Value::Text(c.name.clone()),
Value::Text(alloc::format!("{}", c.ty)),
Value::Bool(c.nullable),
])
})
.collect();
Ok(QueryResult::Rows { columns, rows })
}
fn exec_begin(&mut self) -> Result<QueryResult, EngineError> {
let tx_id = self.current_tx.ok_or(EngineError::NoActiveTransaction)?;
if self.tx_catalogs.contains_key(&tx_id) {
return Err(EngineError::TransactionAlreadyOpen);
}
self.tx_catalogs.insert(
tx_id,
TxState {
catalog: self.catalog.clone(),
savepoints: Vec::new(),
},
);
Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: false,
})
}
fn exec_commit(&mut self) -> Result<QueryResult, EngineError> {
let tx_id = self.current_tx.ok_or(EngineError::NoActiveTransaction)?;
let state = self
.tx_catalogs
.remove(&tx_id)
.ok_or(EngineError::NoActiveTransaction)?;
self.catalog = state.catalog;
Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: true,
})
}
fn exec_rollback(&mut self) -> Result<QueryResult, EngineError> {
let tx_id = self.current_tx.ok_or(EngineError::NoActiveTransaction)?;
if self.tx_catalogs.remove(&tx_id).is_none() {
return Err(EngineError::NoActiveTransaction);
}
Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: false,
})
}
fn exec_savepoint(&mut self, name: String) -> Result<QueryResult, EngineError> {
let tx_id = self.current_tx.ok_or(EngineError::NoActiveTransaction)?;
let state = self
.tx_catalogs
.get_mut(&tx_id)
.ok_or(EngineError::NoActiveTransaction)?;
state.savepoints.retain(|(n, _)| n != &name);
let snapshot = state.catalog.clone();
state.savepoints.push((name, snapshot));
Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: false,
})
}
fn exec_rollback_to_savepoint(&mut self, name: &str) -> Result<QueryResult, EngineError> {
let tx_id = self.current_tx.ok_or(EngineError::NoActiveTransaction)?;
let state = self
.tx_catalogs
.get_mut(&tx_id)
.ok_or(EngineError::NoActiveTransaction)?;
let pos = state
.savepoints
.iter()
.rposition(|(n, _)| n == name)
.ok_or_else(|| {
EngineError::Unsupported(alloc::format!("savepoint not found: {name}"))
})?;
let snapshot = state.savepoints[pos].1.clone();
state.savepoints.truncate(pos + 1);
state.catalog = snapshot;
Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: false,
})
}
fn exec_release_savepoint(&mut self, name: &str) -> Result<QueryResult, EngineError> {
let tx_id = self.current_tx.ok_or(EngineError::NoActiveTransaction)?;
let state = self
.tx_catalogs
.get_mut(&tx_id)
.ok_or(EngineError::NoActiveTransaction)?;
let pos = state
.savepoints
.iter()
.rposition(|(n, _)| n == name)
.ok_or_else(|| {
EngineError::Unsupported(alloc::format!("savepoint not found: {name}"))
})?;
state.savepoints.truncate(pos);
Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: false,
})
}
fn exec_alter_table(
&mut self,
s: spg_sql::ast::AlterTableStatement,
) -> Result<QueryResult, EngineError> {
match s.target {
spg_sql::ast::AlterTableTarget::SetHotTierBytes(n) => {
let table = self
.active_catalog_mut()
.get_mut(&s.name)
.ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: s.name.clone(),
})
})?;
table.schema_mut().hot_tier_bytes = Some(n);
}
spg_sql::ast::AlterTableTarget::AddForeignKey(fk) => {
let cols_snapshot = self
.active_catalog()
.get(&s.name)
.ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: s.name.clone(),
})
})?
.schema()
.columns
.clone();
let storage_fk = resolve_foreign_key(
&s.name,
&cols_snapshot,
fk,
self.active_catalog(),
)?;
let existing_rows: Vec<Vec<Value>> = self
.active_catalog()
.get(&s.name)
.expect("checked above")
.rows()
.iter()
.map(|r| r.values.clone())
.collect();
enforce_fk_inserts(
self.active_catalog(),
&s.name,
core::slice::from_ref(&storage_fk),
&existing_rows,
)?;
let table = self
.active_catalog_mut()
.get_mut(&s.name)
.expect("checked above");
if let Some(name) = &storage_fk.name
&& table
.schema()
.foreign_keys
.iter()
.any(|f| f.name.as_ref() == Some(name))
{
return Err(EngineError::Unsupported(alloc::format!(
"ALTER TABLE ADD CONSTRAINT: a constraint named {name:?} already exists"
)));
}
table.schema_mut().foreign_keys.push(storage_fk);
}
spg_sql::ast::AlterTableTarget::DropForeignKey(name) => {
let table = self
.active_catalog_mut()
.get_mut(&s.name)
.ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: s.name.clone(),
})
})?;
let fks = &mut table.schema_mut().foreign_keys;
let before = fks.len();
fks.retain(|f| f.name.as_ref() != Some(&name));
if fks.len() == before {
return Err(EngineError::Unsupported(alloc::format!(
"ALTER TABLE DROP CONSTRAINT: no FK named {name:?} on {:?}",
s.name
)));
}
}
}
Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: !self.in_transaction(),
})
}
fn exec_alter_index(
&mut self,
stmt: spg_sql::ast::AlterIndexStatement,
) -> Result<QueryResult, EngineError> {
let spg_sql::ast::AlterIndexStatement {
name: idx_name,
target,
} = stmt;
let spg_sql::ast::AlterIndexTarget::Rebuild { encoding } = target;
let target = encoding.map(|e| match e {
SqlVecEncoding::F32 => VecEncoding::F32,
SqlVecEncoding::Sq8 => VecEncoding::Sq8,
SqlVecEncoding::F16 => VecEncoding::F16,
});
let table_name = {
let cat = self.active_catalog();
let mut found: Option<String> = None;
for tname in cat.table_names() {
if let Some(t) = cat.get(&tname)
&& t.indices().iter().any(|i| i.name == idx_name)
{
found = Some(tname);
break;
}
}
found.ok_or_else(|| {
EngineError::Storage(StorageError::IndexNotFound {
name: idx_name.clone(),
})
})?
};
let table = self
.active_catalog_mut()
.get_mut(&table_name)
.expect("table found above");
table.rebuild_nsw_index(&idx_name, target)?;
self.plan_cache.evict_referencing(&table_name);
Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: !self.in_transaction(),
})
}
fn exec_create_index(
&mut self,
stmt: CreateIndexStatement,
) -> Result<QueryResult, EngineError> {
let table = self
.active_catalog_mut()
.get_mut(&stmt.table)
.ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: stmt.table.clone(),
})
})?;
if stmt.if_not_exists && table.indices().iter().any(|i| i.name == stmt.name) {
return Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: false,
});
}
let _ = &stmt.extra_columns; let table_name = stmt.table.clone();
let included_positions: Vec<usize> = if stmt.included_columns.is_empty() {
Vec::new()
} else {
let schema = table.schema();
stmt.included_columns
.iter()
.map(|c| {
schema.column_position(c).ok_or_else(|| {
EngineError::Storage(StorageError::ColumnNotFound {
column: c.clone(),
})
})
})
.collect::<Result<Vec<_>, _>>()?
};
match stmt.method {
IndexMethod::BTree => table.add_index(stmt.name.clone(), &stmt.column)?,
IndexMethod::Hnsw => {
if !included_positions.is_empty() {
return Err(EngineError::Unsupported(
"INCLUDE columns are not supported on HNSW indexes".into(),
));
}
table.add_nsw_index(stmt.name.clone(), &stmt.column, spg_storage::NSW_DEFAULT_M)?;
}
IndexMethod::Brin => {
if !included_positions.is_empty() {
return Err(EngineError::Unsupported(
"INCLUDE columns are not supported on BRIN indexes".into(),
));
}
table.add_brin_index(stmt.name.clone(), &stmt.column)?;
}
}
if !included_positions.is_empty()
&& let Some(idx) = table.indices_mut().iter_mut().find(|i| i.name == stmt.name)
{
idx.included_columns = included_positions;
}
if let Some(pred_expr) = &stmt.partial_predicate {
let canonical = pred_expr.to_string();
if matches!(stmt.method, IndexMethod::Hnsw | IndexMethod::Brin) {
return Err(EngineError::Unsupported(
"WHERE predicates are not supported on HNSW or BRIN indexes".into(),
));
}
if let Some(idx) = table.indices_mut().iter_mut().find(|i| i.name == stmt.name) {
idx.partial_predicate = Some(canonical);
}
}
if let Some(key_expr) = &stmt.expression {
if matches!(stmt.method, IndexMethod::Hnsw | IndexMethod::Brin) {
return Err(EngineError::Unsupported(
"Expression keys are not supported on HNSW or BRIN indexes".into(),
));
}
let canonical = key_expr.to_string();
if let Some(idx) = table.indices_mut().iter_mut().find(|i| i.name == stmt.name) {
idx.expression = Some(canonical);
}
}
self.plan_cache.evict_referencing(&table_name);
Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: !self.in_transaction(),
})
}
fn exec_create_table(
&mut self,
stmt: CreateTableStatement,
) -> Result<QueryResult, EngineError> {
if stmt.if_not_exists && self.active_catalog().get(&stmt.name).is_some() {
return Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: false,
});
}
let table_name = stmt.name.clone();
let inline_pk_columns: Vec<String> = stmt
.columns
.iter()
.filter(|c| c.is_primary_key)
.map(|c| c.name.clone())
.collect();
let cols = stmt
.columns
.into_iter()
.map(column_def_to_schema)
.collect::<Result<Vec<_>, _>>()?;
let mut cols = cols;
for tc in &stmt.table_constraints {
if let spg_sql::ast::TableConstraint::PrimaryKey { columns, .. } = tc {
for col_name in columns {
if let Some(col) = cols.iter_mut().find(|c| c.name == *col_name) {
col.nullable = false;
}
}
}
}
let mut fks: Vec<spg_storage::ForeignKeyConstraint> =
Vec::with_capacity(stmt.foreign_keys.len());
for fk in stmt.foreign_keys {
fks.push(resolve_foreign_key(
&table_name,
&cols,
fk,
self.active_catalog(),
)?);
}
let mut schema = TableSchema::new(table_name.clone(), cols);
schema.foreign_keys = fks;
let mut uc_storage: Vec<spg_storage::UniquenessConstraint> = Vec::new();
for tc in &stmt.table_constraints {
let (is_pk, names) = match tc {
spg_sql::ast::TableConstraint::PrimaryKey { columns, .. } => {
(true, columns.clone())
}
spg_sql::ast::TableConstraint::Unique { columns, .. } => {
(false, columns.clone())
}
};
let mut positions = Vec::with_capacity(names.len());
for n in &names {
let pos = schema
.columns
.iter()
.position(|c| c.name == *n)
.ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"table constraint references unknown column {n:?}"
))
})?;
positions.push(pos);
}
uc_storage.push(spg_storage::UniquenessConstraint {
is_primary_key: is_pk,
columns: positions,
});
}
schema.uniqueness_constraints = uc_storage.clone();
self.active_catalog_mut().create_table(schema)?;
let table = self
.active_catalog_mut()
.get_mut(&table_name)
.expect("just created");
for (i, col_name) in inline_pk_columns.iter().enumerate() {
let idx_name = if inline_pk_columns.len() == 1 {
alloc::format!("{table_name}_pkey")
} else {
alloc::format!("{table_name}_pkey_{i}")
};
if let Err(e) = table.add_index(idx_name, col_name) {
return Err(EngineError::Storage(e));
}
}
for (i, tc) in stmt.table_constraints.iter().enumerate() {
let (is_pk, names) = match tc {
spg_sql::ast::TableConstraint::PrimaryKey { columns, .. } => {
(true, columns)
}
spg_sql::ast::TableConstraint::Unique { columns, .. } => {
(false, columns)
}
};
let leading = &names[0];
let already = table
.indices()
.iter()
.any(|idx| {
matches!(idx.kind, spg_storage::IndexKind::BTree(_))
&& table.schema().columns[idx.column_position].name == *leading
});
if already {
continue;
}
let suffix = if is_pk { "pkey" } else { "key" };
let idx_name = if names.len() == 1 {
alloc::format!("{table_name}_{leading}_{suffix}")
} else {
alloc::format!("{table_name}_{leading}_{suffix}_{i}")
};
if let Err(e) = table.add_index(idx_name, leading) {
return Err(EngineError::Storage(e));
}
}
Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: !self.in_transaction(),
})
}
fn exec_insert(&mut self, stmt: InsertStatement) -> Result<QueryResult, EngineError> {
let clock = self.clock;
let table = self
.active_catalog_mut()
.get_mut(&stmt.table)
.ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: stmt.table.clone(),
})
})?;
let column_meta: Vec<ColumnSchema> = table.schema().columns.clone();
let schema_cols_len = column_meta.len();
let tuple_pos: Option<Vec<Option<usize>>> = match &stmt.columns {
None => None, Some(cols) => {
let mut map = alloc::vec![None; schema_cols_len];
for (j, name) in cols.iter().enumerate() {
let idx = column_meta
.iter()
.position(|c| c.name == *name)
.ok_or_else(|| {
EngineError::Eval(EvalError::ColumnNotFound { name: name.clone() })
})?;
if map[idx].is_some() {
return Err(EngineError::Storage(StorageError::ArityMismatch {
expected: schema_cols_len,
actual: cols.len(),
}));
}
map[idx] = Some(j);
}
for (i, col) in column_meta.iter().enumerate() {
if map[i].is_none()
&& !col.nullable
&& col.default.is_none()
&& col.runtime_default.is_none()
&& !col.auto_increment
{
return Err(EngineError::Storage(StorageError::NullInNotNull {
column: col.name.clone(),
}));
}
}
Some(map)
}
};
let expected_tuple_len = stmt.columns.as_ref().map_or(schema_cols_len, Vec::len);
let fks = table.schema().foreign_keys.clone();
let mut affected = 0usize;
let mut all_values: Vec<Vec<Value>> = Vec::with_capacity(stmt.rows.len());
for tuple in stmt.rows {
if tuple.len() != expected_tuple_len {
return Err(EngineError::Storage(StorageError::ArityMismatch {
expected: expected_tuple_len,
actual: tuple.len(),
}));
}
let values: Vec<Value> = if let Some(map) = &tuple_pos {
let raw_tuple: Vec<Value> = tuple
.into_iter()
.map(literal_expr_to_value)
.collect::<Result<_, _>>()?;
let mut out = Vec::with_capacity(schema_cols_len);
for (i, col) in column_meta.iter().enumerate() {
let mut raw = match map[i] {
Some(j) => raw_tuple[j].clone(),
None => resolve_column_default_free(col, clock)?,
};
if col.auto_increment && raw.is_null() {
let next = table.next_auto_value(i).ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"AUTO_INCREMENT applies to integer columns only (column `{}`)",
col.name
))
})?;
raw = Value::BigInt(next);
}
out.push(coerce_value(raw, col.ty, &col.name, i)?);
}
out
} else {
let mut out = Vec::with_capacity(schema_cols_len);
for (i, (col, expr)) in column_meta.iter().zip(tuple).enumerate() {
let mut raw = literal_expr_to_value(expr)?;
if col.auto_increment && raw.is_null() {
let next = table.next_auto_value(i).ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"AUTO_INCREMENT applies to integer columns only (column `{}`)",
col.name
))
})?;
raw = Value::BigInt(next);
}
out.push(coerce_value(raw, col.ty, &col.name, i)?);
}
out
};
all_values.push(values);
}
let uniqueness = table.schema().uniqueness_constraints.clone();
let _ = table;
if !fks.is_empty() {
enforce_fk_inserts(self.active_catalog(), &stmt.table, &fks, &all_values)?;
}
enforce_uniqueness_inserts(
self.active_catalog(),
&stmt.table,
&uniqueness,
&all_values,
)?;
let mut pending_updates: Vec<(usize, Vec<Value>)> = Vec::new();
let mut skipped_count = 0usize;
if let Some(clause) = &stmt.on_conflict {
let conflict_cols = resolve_on_conflict_columns(
self.active_catalog(),
&stmt.table,
clause.target_columns.as_slice(),
)?;
let mut kept: Vec<Vec<Value>> = Vec::with_capacity(all_values.len());
let mut seen_keys: Vec<Vec<Value>> = Vec::new();
for values in all_values {
let key_tuple: Vec<&Value> =
conflict_cols.iter().map(|&c| &values[c]).collect();
let has_null_key = key_tuple.iter().any(|v| matches!(v, Value::Null));
let collides_with_table = !has_null_key
&& on_conflict_keys_exist(
self.active_catalog(),
&stmt.table,
&conflict_cols,
&key_tuple,
);
let key_tuple_owned: Vec<Value> =
key_tuple.iter().map(|v| (*v).clone()).collect();
let collides_with_batch = !has_null_key
&& seen_keys.iter().any(|k| k == &key_tuple_owned);
let collides = collides_with_table || collides_with_batch;
match (&clause.action, collides) {
(_, false) => {
seen_keys.push(key_tuple_owned);
kept.push(values);
}
(spg_sql::ast::OnConflictAction::Nothing, true) => {
skipped_count += 1;
}
(
spg_sql::ast::OnConflictAction::Update {
assignments,
where_,
},
true,
) => {
if !collides_with_table {
skipped_count += 1;
continue;
}
let target_pos = lookup_row_position_by_keys(
self.active_catalog(),
&stmt.table,
&conflict_cols,
&key_tuple,
)
.ok_or_else(|| {
EngineError::Unsupported(
"ON CONFLICT DO UPDATE: conflict detected but row \
position could not be resolved (cold-tier row?)"
.into(),
)
})?;
let updated = apply_on_conflict_assignments(
self.active_catalog(),
&stmt.table,
target_pos,
&values,
assignments,
where_.as_ref(),
)?;
if let Some(new_row) = updated {
pending_updates.push((target_pos, new_row));
} else {
skipped_count += 1;
}
}
}
}
all_values = kept;
}
let table = self
.active_catalog_mut()
.get_mut(&stmt.table)
.ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: stmt.table.clone(),
})
})?;
let mut returning_rows: Vec<Vec<Value>> = Vec::new();
for values in all_values {
if stmt.returning.is_some() {
returning_rows.push(values.clone());
}
table.insert(Row::new(values))?;
affected += 1;
}
for (pos, new_row) in pending_updates {
if stmt.returning.is_some() {
returning_rows.push(new_row.clone());
}
table.update_row(pos, new_row)?;
affected += 1;
}
let _ = skipped_count;
if let Some(items) = &stmt.returning {
let _ = table;
return self.build_returning_rows(
&stmt.table,
items,
returning_rows,
);
}
if !self.in_transaction() && affected > 0 {
self.statistics
.record_modifications(&stmt.table, affected as u64);
}
Ok(QueryResult::CommandOk {
affected,
modified_catalog: !self.in_transaction(),
})
}
fn exec_select_as_of_segment(
&self,
stmt: &SelectStatement,
from: &spg_sql::ast::FromClause,
segment_id: u32,
) -> Result<QueryResult, EngineError> {
if !from.joins.is_empty()
|| stmt.group_by.is_some()
|| stmt.having.is_some()
|| !stmt.unions.is_empty()
|| !stmt.order_by.is_empty()
|| stmt.offset.is_some()
|| stmt.distinct
|| aggregate::uses_aggregate(stmt)
{
return Err(EngineError::Unsupported(
"AS OF SEGMENT supports SELECT projection + WHERE + LIMIT only \
(joins / aggregates / ORDER BY are STABILITY § \"Out of v6.10\")"
.into(),
));
}
let table = self
.active_catalog()
.get(&from.primary.name)
.ok_or_else(|| StorageError::TableNotFound {
name: from.primary.name.clone(),
})?;
let schema = table.schema().clone();
let schema_cols = &schema.columns;
let alias = from
.primary
.alias
.as_deref()
.unwrap_or(from.primary.name.as_str());
let ctx = EvalContext::new(schema_cols, Some(alias));
let seg = self
.active_catalog()
.cold_segment(segment_id)
.ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"AS OF SEGMENT: cold segment {segment_id} not registered"
))
})?;
let mut out_rows: Vec<Row> = Vec::new();
let mut limit_remaining: Option<usize> =
stmt.limit.as_ref().and_then(|n| usize::try_from(*n).ok());
for (_key, body) in seg.scan() {
let (row, _consumed) = spg_storage::decode_row_body_dense(&body, &schema)
.map_err(EngineError::Storage)?;
if let Some(where_expr) = &stmt.where_ {
let cond = self.eval_expr_simple(where_expr, &row, &ctx)?;
if !matches!(cond, Value::Bool(true)) {
continue;
}
}
let projected = self.project_row_simple(&row, &stmt.items, schema_cols, alias)?;
out_rows.push(projected);
if let Some(rem) = limit_remaining.as_mut() {
if *rem == 0 {
out_rows.pop();
break;
}
*rem -= 1;
}
}
let columns = self.derive_output_columns(&stmt.items, schema_cols, alias);
Ok(QueryResult::Rows {
columns,
rows: out_rows,
})
}
fn eval_expr_simple(
&self,
expr: &Expr,
row: &Row,
ctx: &EvalContext,
) -> Result<Value, EngineError> {
let cancel = CancelToken::none();
self.eval_expr_with_correlated(expr, row, ctx, cancel, None)
}
fn build_returning_rows(
&self,
table_name: &str,
items: &[SelectItem],
mutated_rows: Vec<Vec<Value>>,
) -> Result<QueryResult, EngineError> {
let table = self.active_catalog().get(table_name).ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: table_name.into(),
})
})?;
let schema_cols = table.schema().columns.clone();
let columns = self.derive_output_columns(items, &schema_cols, table_name);
let mut out_rows: Vec<Row> = Vec::with_capacity(mutated_rows.len());
for values in mutated_rows {
let row = Row::new(values);
let projected = self.project_row_simple(&row, items, &schema_cols, table_name)?;
out_rows.push(projected);
}
Ok(QueryResult::Rows {
columns,
rows: out_rows,
})
}
fn project_row_simple(
&self,
row: &Row,
items: &[SelectItem],
schema_cols: &[ColumnSchema],
alias: &str,
) -> Result<Row, EngineError> {
let ctx = EvalContext::new(schema_cols, Some(alias));
let cancel = CancelToken::none();
let mut out_vals = Vec::new();
for item in items {
match item {
SelectItem::Wildcard => {
out_vals.extend(row.values.iter().cloned());
}
SelectItem::Expr { expr, .. } => {
let v = self.eval_expr_with_correlated(expr, row, &ctx, cancel, None)?;
out_vals.push(v);
}
}
}
Ok(Row::new(out_vals))
}
fn derive_output_columns(
&self,
items: &[SelectItem],
schema_cols: &[ColumnSchema],
_alias: &str,
) -> Vec<ColumnSchema> {
let mut out = Vec::new();
for item in items {
match item {
SelectItem::Wildcard => {
out.extend(schema_cols.iter().cloned());
}
SelectItem::Expr { alias, .. } => {
let name = alias
.clone()
.unwrap_or_else(|| "?column?".to_string());
out.push(ColumnSchema::new(name, DataType::Text, true));
}
}
}
out
}
fn exec_select_cancel(
&self,
stmt: &SelectStatement,
cancel: CancelToken<'_>,
) -> Result<QueryResult, EngineError> {
cancel.check()?;
if let Some(from) = &stmt.from
&& let Some(seg_id) = from.primary.as_of_segment
{
return self.exec_select_as_of_segment(stmt, from, seg_id);
}
if let Some(from) = &stmt.from
&& from.joins.is_empty()
&& stmt.where_.is_none()
&& stmt.group_by.is_none()
&& stmt.having.is_none()
&& stmt.unions.is_empty()
&& stmt.order_by.is_empty()
&& stmt.limit.is_none()
&& stmt.offset.is_none()
&& !stmt.distinct
&& stmt.items.iter().all(|i| matches!(i, SelectItem::Wildcard))
{
let lower = from.primary.name.to_ascii_lowercase();
match lower.as_str() {
"spg_statistic" => return Ok(self.exec_spg_statistic()),
"spg_stat_replication" => return Ok(self.exec_spg_stat_replication()),
"spg_stat_segment" => return Ok(self.exec_spg_stat_segment()),
"spg_stat_query" => return Ok(self.exec_spg_stat_query()),
"spg_stat_activity" => return Ok(self.exec_spg_stat_activity()),
"spg_audit_chain" => return Ok(self.exec_spg_audit_chain()),
"spg_audit_verify" => return Ok(self.exec_spg_audit_verify()),
"spg_table_ddl" => return Ok(self.exec_spg_table_ddl()),
"spg_role_ddl" => return Ok(self.exec_spg_role_ddl()),
"spg_database_ddl" => return Ok(self.exec_spg_database_ddl()),
_ => {}
}
}
if !stmt.ctes.is_empty() {
return self.exec_with_ctes(stmt, cancel);
}
let mut stmt_owned;
let stmt_ref: &SelectStatement = if expr_tree_has_subquery(stmt) {
stmt_owned = stmt.clone();
self.resolve_select_subqueries(&mut stmt_owned, cancel)?;
&stmt_owned
} else {
stmt
};
if stmt_ref.unions.is_empty() {
return self.exec_bare_select_cancel(stmt_ref, cancel);
}
let mut head = stmt_ref.clone();
head.unions = Vec::new();
head.order_by = Vec::new();
head.limit = None;
let QueryResult::Rows { columns, mut rows } =
self.exec_bare_select_cancel(&head, cancel)?
else {
unreachable!("bare SELECT cannot return CommandOk")
};
for (kind, peer) in &stmt_ref.unions {
let QueryResult::Rows {
columns: peer_cols,
rows: peer_rows,
} = self.exec_bare_select_cancel(peer, cancel)?
else {
unreachable!("bare SELECT cannot return CommandOk")
};
if peer_cols.len() != columns.len() {
return Err(EngineError::Unsupported(alloc::format!(
"UNION arity mismatch: head has {} columns, peer has {}",
columns.len(),
peer_cols.len()
)));
}
rows.extend(peer_rows);
if matches!(kind, UnionKind::Distinct) {
rows = dedup_rows(rows);
}
}
if !stmt.order_by.is_empty() {
let synth_ctx = EvalContext::new(&columns, None);
let descs: Vec<bool> = stmt.order_by.iter().map(|o| o.desc).collect();
let mut tagged: Vec<(Vec<f64>, Row)> = Vec::with_capacity(rows.len());
for r in rows {
let keys = build_order_keys(&stmt.order_by, &r, &synth_ctx)?;
tagged.push((keys, r));
}
sort_by_keys(&mut tagged, &descs);
rows = tagged.into_iter().map(|(_, r)| r).collect();
}
apply_offset_and_limit(&mut rows, stmt.offset, stmt.limit);
Ok(QueryResult::Rows { columns, rows })
}
#[allow(clippy::too_many_lines)]
#[allow(clippy::too_many_lines)] fn exec_bare_select_cancel(
&self,
stmt: &SelectStatement,
cancel: CancelToken<'_>,
) -> Result<QueryResult, EngineError> {
if select_has_window(stmt) {
return self.exec_select_with_window(stmt, cancel);
}
let Some(from) = &stmt.from else {
let empty_schema: Vec<ColumnSchema> = Vec::new();
let ctx = EvalContext::new(&empty_schema, None);
let projection = build_projection(&stmt.items, &empty_schema, "")?;
let dummy_row = Row::new(Vec::new());
let mut values = Vec::with_capacity(projection.len());
for p in &projection {
values.push(eval::eval_expr(&p.expr, &dummy_row, &ctx)?);
}
let columns: Vec<ColumnSchema> = projection
.into_iter()
.map(|p| ColumnSchema::new(p.output_name, p.ty, p.nullable))
.collect();
return Ok(QueryResult::Rows {
columns,
rows: alloc::vec![Row::new(values)],
});
};
if !from.joins.is_empty() {
return self.exec_joined_select(stmt, from);
}
let primary = &from.primary;
let table = self.active_catalog().get(&primary.name).ok_or_else(|| {
StorageError::TableNotFound {
name: primary.name.clone(),
}
})?;
let schema_cols = &table.schema().columns;
let alias = primary.alias.as_deref().unwrap_or(primary.name.as_str());
let ctx = EvalContext::new(schema_cols, Some(alias));
if let Some(nsw_rows) = try_nsw_knn(stmt, table, schema_cols, alias) {
return materialise_in_order(stmt, table, schema_cols, alias, &nsw_rows);
}
let indexed_rows: Option<Vec<Cow<'_, Row>>> = stmt
.where_
.as_ref()
.and_then(|w| try_index_seek(w, schema_cols, self.active_catalog(), table, alias));
if aggregate::uses_aggregate(stmt) {
let mut filtered: Vec<&Row> = Vec::new();
let mut memo = memoize::MemoizeCache::new();
if let Some(rows) = &indexed_rows {
for cow in rows {
let row = cow.as_ref();
if let Some(where_expr) = &stmt.where_ {
let cond = self.eval_expr_with_correlated(
where_expr,
row,
&ctx,
cancel,
Some(&mut memo),
)?;
if !matches!(cond, Value::Bool(true)) {
continue;
}
}
filtered.push(row);
}
} else {
for i in 0..table.row_count() {
let row = &table.rows()[i];
if let Some(where_expr) = &stmt.where_ {
let cond = self.eval_expr_with_correlated(
where_expr,
row,
&ctx,
cancel,
Some(&mut memo),
)?;
if !matches!(cond, Value::Bool(true)) {
continue;
}
}
filtered.push(row);
}
}
let mut agg = aggregate::run(stmt, &filtered, schema_cols, Some(alias))?;
apply_offset_and_limit(&mut agg.rows, stmt.offset, stmt.limit);
return Ok(QueryResult::Rows {
columns: agg.columns,
rows: agg.rows,
});
}
let projection = build_projection(&stmt.items, schema_cols, alias)?;
let mut tagged: Vec<(Vec<f64>, Row)> = Vec::new();
let mut memo = memoize::MemoizeCache::new();
let mut process_row = |row: &Row, loop_idx: usize| -> Result<(), EngineError> {
if loop_idx.is_multiple_of(256) {
cancel.check()?;
}
if let Some(where_expr) = &stmt.where_ {
let cond = self.eval_expr_with_correlated(
where_expr,
row,
&ctx,
cancel,
Some(&mut memo),
)?;
if !matches!(cond, Value::Bool(true)) {
return Ok(());
}
}
let mut values = Vec::with_capacity(projection.len());
for p in &projection {
values.push(eval::eval_expr(&p.expr, row, &ctx)?);
}
let order_keys = if stmt.order_by.is_empty() {
Vec::new()
} else {
build_order_keys(&stmt.order_by, row, &ctx)?
};
tagged.push((order_keys, Row::new(values)));
Ok(())
};
if let Some(rows) = &indexed_rows {
for (loop_idx, cow) in rows.iter().enumerate() {
process_row(cow.as_ref(), loop_idx)?;
}
} else {
for i in 0..table.row_count() {
process_row(&table.rows()[i], i)?;
}
}
if !stmt.order_by.is_empty() {
let keep = if stmt.distinct {
None
} else {
stmt.limit
.map(|l| l as usize + stmt.offset.map_or(0, |o| o as usize))
};
let descs: Vec<bool> = stmt.order_by.iter().map(|o| o.desc).collect();
partial_sort_tagged(&mut tagged, keep, &descs);
}
let mut output_rows: Vec<Row> = tagged.into_iter().map(|(_, r)| r).collect();
if stmt.distinct {
output_rows = dedup_rows(output_rows);
}
apply_offset_and_limit(&mut output_rows, stmt.offset, stmt.limit);
let columns: Vec<ColumnSchema> = projection
.into_iter()
.map(|p| ColumnSchema::new(p.output_name, p.ty, p.nullable))
.collect();
Ok(QueryResult::Rows {
columns,
rows: output_rows,
})
}
#[allow(clippy::too_many_lines)]
fn exec_joined_select(
&self,
stmt: &SelectStatement,
from: &FromClause,
) -> Result<QueryResult, EngineError> {
let primary_table = self
.active_catalog()
.get(&from.primary.name)
.ok_or_else(|| StorageError::TableNotFound {
name: from.primary.name.clone(),
})?;
let primary_alias = from
.primary
.alias
.as_deref()
.unwrap_or(from.primary.name.as_str())
.to_string();
let mut joined_tables: Vec<(&Table, String, JoinKind, Option<&Expr>)> = Vec::new();
for j in &from.joins {
let t = self.active_catalog().get(&j.table.name).ok_or_else(|| {
StorageError::TableNotFound {
name: j.table.name.clone(),
}
})?;
let a = j
.table
.alias
.as_deref()
.unwrap_or(j.table.name.as_str())
.to_string();
joined_tables.push((t, a, j.kind, j.on.as_ref()));
}
let mut combined_schema: Vec<ColumnSchema> = Vec::new();
for col in &primary_table.schema().columns {
combined_schema.push(ColumnSchema::new(
alloc::format!("{primary_alias}.{}", col.name),
col.ty,
col.nullable,
));
}
for (t, a, _, _) in &joined_tables {
for col in &t.schema().columns {
combined_schema.push(ColumnSchema::new(
alloc::format!("{a}.{}", col.name),
col.ty,
col.nullable,
));
}
}
let ctx = EvalContext::new(&combined_schema, None);
let mut working: Vec<Row> = primary_table.rows().iter().cloned().collect();
let mut produced_len = primary_table.schema().columns.len();
for (t, _, kind, on) in &joined_tables {
let right_arity = t.schema().columns.len();
let mut next: Vec<Row> = Vec::new();
for left in &working {
let mut left_matched = false;
for right in t.rows() {
let mut combined_vals = left.values.clone();
combined_vals.extend(right.values.iter().cloned());
let combined = Row::new(combined_vals);
let keep = if let Some(on_expr) = on {
let cond = eval::eval_expr(on_expr, &combined, &ctx)?;
matches!(cond, Value::Bool(true))
} else {
true
};
if keep {
next.push(combined);
left_matched = true;
}
}
if !left_matched && matches!(kind, JoinKind::Left) {
let mut combined_vals = left.values.clone();
for _ in 0..right_arity {
combined_vals.push(Value::Null);
}
next.push(Row::new(combined_vals));
}
}
working = next;
produced_len += right_arity;
debug_assert!(produced_len <= combined_schema.len());
}
let mut filtered: Vec<Row> = Vec::new();
for row in working {
if let Some(where_expr) = &stmt.where_ {
let cond = eval::eval_expr(where_expr, &row, &ctx)?;
if !matches!(cond, Value::Bool(true)) {
continue;
}
}
filtered.push(row);
}
if aggregate::uses_aggregate(stmt) {
let refs: Vec<&Row> = filtered.iter().collect();
let mut agg = aggregate::run(stmt, &refs, &combined_schema, None)?;
apply_offset_and_limit(&mut agg.rows, stmt.offset, stmt.limit);
return Ok(QueryResult::Rows {
columns: agg.columns,
rows: agg.rows,
});
}
let projection = build_projection(&stmt.items, &combined_schema, "")?;
let mut tagged: Vec<(Vec<f64>, Row)> = Vec::new();
for row in &filtered {
let mut values = Vec::with_capacity(projection.len());
for p in &projection {
values.push(eval::eval_expr(&p.expr, row, &ctx)?);
}
let order_keys = if stmt.order_by.is_empty() {
Vec::new()
} else {
build_order_keys(&stmt.order_by, row, &ctx)?
};
tagged.push((order_keys, Row::new(values)));
}
if !stmt.order_by.is_empty() {
let keep = if stmt.distinct {
None
} else {
stmt.limit
.map(|l| l as usize + stmt.offset.map_or(0, |o| o as usize))
};
let descs: Vec<bool> = stmt.order_by.iter().map(|o| o.desc).collect();
partial_sort_tagged(&mut tagged, keep, &descs);
}
let mut output_rows: Vec<Row> = tagged.into_iter().map(|(_, r)| r).collect();
if stmt.distinct {
output_rows = dedup_rows(output_rows);
}
apply_offset_and_limit(&mut output_rows, stmt.offset, stmt.limit);
let columns: Vec<ColumnSchema> = projection
.into_iter()
.map(|p| ColumnSchema::new(p.output_name, p.ty, p.nullable))
.collect();
Ok(QueryResult::Rows {
columns,
rows: output_rows,
})
}
}
#[derive(Debug, Clone)]
struct ProjectedItem {
expr: Expr,
output_name: String,
ty: DataType,
nullable: bool,
}
fn dedup_rows(rows: Vec<Row>) -> Vec<Row> {
let mut out: Vec<Row> = Vec::with_capacity(rows.len());
for r in rows {
if !out.iter().any(|seen| seen == &r) {
out.push(r);
}
}
out
}
fn value_to_order_key(v: &Value) -> Result<f64, EngineError> {
match v {
Value::Null => Ok(f64::INFINITY),
Value::SmallInt(n) => Ok(f64::from(*n)),
Value::Int(n) => Ok(f64::from(*n)),
Value::Date(d) => Ok(f64::from(*d)),
#[allow(clippy::cast_precision_loss)]
Value::Timestamp(t) => Ok(*t as f64),
#[allow(clippy::cast_precision_loss)]
Value::Numeric { scaled, scale } => {
let mut divisor = 1.0_f64;
for _ in 0..*scale {
divisor *= 10.0;
}
Ok((*scaled as f64) / divisor)
}
#[allow(clippy::cast_precision_loss)]
Value::BigInt(n) => Ok(*n as f64),
Value::Float(x) => Ok(*x),
Value::Bool(b) => Ok(if *b { 1.0 } else { 0.0 }),
Value::Text(s) => {
let mut key: u64 = 0;
for &b in s.as_bytes().iter().take(8) {
key = (key << 8) | u64::from(b);
}
#[allow(clippy::cast_precision_loss)]
Ok(key as f64)
}
Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_) => {
Err(EngineError::Unsupported(
"ORDER BY of a raw vector column is not meaningful — use `<->`".into(),
))
}
Value::Interval { .. } => Err(EngineError::Unsupported(
"ORDER BY of an INTERVAL is not supported in v2.11 \
(months vs micros has no single canonical ordering)"
.into(),
)),
Value::Json(_) => Err(EngineError::Unsupported(
"ORDER BY of a JSON value is not supported — cast the document to text first".into(),
)),
_ => Err(EngineError::Unsupported(
"ORDER BY of this value type is not supported".into(),
)),
}
}
fn try_nsw_knn(
stmt: &SelectStatement,
table: &Table,
schema_cols: &[ColumnSchema],
table_alias: &str,
) -> Option<Vec<usize>> {
if stmt.distinct {
return None;
}
let limit = usize::try_from(stmt.limit?).ok()?;
if limit == 0 {
return None;
}
if stmt.order_by.len() != 1 {
return None;
}
let order = &stmt.order_by[0];
if order.desc {
return None;
}
let Expr::Binary { lhs, op, rhs } = &order.expr else {
return None;
};
let metric = match op {
BinOp::L2Distance => spg_storage::NswMetric::L2,
BinOp::InnerProduct => spg_storage::NswMetric::InnerProduct,
BinOp::CosineDistance => spg_storage::NswMetric::Cosine,
_ => return None,
};
let ((Expr::Column(col), literal) | (literal, Expr::Column(col))) =
(lhs.as_ref(), rhs.as_ref())
else {
return None;
};
if let Some(q) = &col.qualifier
&& q != table_alias
{
return None;
}
let col_pos = schema_cols.iter().position(|s| s.name == col.name)?;
let query = literal_to_vector(literal)?;
let idx = spg_storage::nsw_index_on(table, col_pos)?;
if let Some(where_expr) = &stmt.where_ {
let over_fetch = limit.saturating_mul(10).max(NSW_OVER_FETCH_FLOOR);
let candidates = spg_storage::nsw_query(table, &idx.name, &query, over_fetch, metric);
let ctx = EvalContext::new(schema_cols, Some(table_alias));
let mut kept: Vec<usize> = Vec::with_capacity(limit);
for i in candidates {
let row = &table.rows()[i];
let cond = eval::eval_expr(where_expr, row, &ctx).ok()?;
if matches!(cond, Value::Bool(true)) {
kept.push(i);
if kept.len() >= limit {
break;
}
}
}
Some(kept)
} else {
Some(spg_storage::nsw_query(
table, &idx.name, &query, limit, metric,
))
}
}
const NSW_OVER_FETCH_FLOOR: usize = 32;
fn literal_to_vector(e: &Expr) -> Option<Vec<f32>> {
match e {
Expr::Literal(Literal::Vector(v)) => Some(v.clone()),
Expr::Cast { expr, .. } => literal_to_vector(expr),
_ => None,
}
}
fn materialise_in_order(
stmt: &SelectStatement,
table: &Table,
schema_cols: &[ColumnSchema],
table_alias: &str,
ordered_rows: &[usize],
) -> Result<QueryResult, EngineError> {
let ctx = EvalContext::new(schema_cols, Some(table_alias));
let projection = build_projection(&stmt.items, schema_cols, table_alias)?;
let mut output_rows: Vec<Row> = Vec::with_capacity(ordered_rows.len());
for &i in ordered_rows {
let row = &table.rows()[i];
let mut values = Vec::with_capacity(projection.len());
for p in &projection {
values.push(eval::eval_expr(&p.expr, row, &ctx)?);
}
output_rows.push(Row::new(values));
}
apply_offset_and_limit(&mut output_rows, stmt.offset, stmt.limit);
let columns: Vec<ColumnSchema> = projection
.into_iter()
.map(|p| ColumnSchema::new(p.output_name, p.ty, p.nullable))
.collect();
Ok(QueryResult::Rows {
columns,
rows: output_rows,
})
}
fn try_index_seek<'a>(
where_expr: &Expr,
schema_cols: &[ColumnSchema],
catalog: &'a Catalog,
table: &'a Table,
table_alias: &str,
) -> Option<Vec<Cow<'a, Row>>> {
let Expr::Binary {
lhs,
op: BinOp::Eq,
rhs,
} = where_expr
else {
return None;
};
let (col_pos, value) = resolve_col_literal_pair(lhs, rhs, schema_cols, table_alias)
.or_else(|| resolve_col_literal_pair(rhs, lhs, schema_cols, table_alias))?;
let idx = table.index_on(col_pos)?;
let key = IndexKey::from_value(&value)?;
let locators = idx.lookup_eq(&key);
let table_name = table.schema().name.as_str();
let mut out: Vec<Cow<'a, Row>> = Vec::with_capacity(locators.len());
for loc in locators {
match *loc {
spg_storage::RowLocator::Hot(i) => {
if let Some(row) = table.rows().get(i) {
out.push(Cow::Borrowed(row));
}
}
spg_storage::RowLocator::Cold { segment_id, .. } => {
if let Some(row) = catalog.resolve_cold_locator(table_name, segment_id, &key) {
out.push(Cow::Owned(row));
}
}
}
}
Some(out)
}
fn try_pk_predicate(
where_expr: &Expr,
schema_cols: &[ColumnSchema],
table_alias: &str,
) -> Option<(usize, IndexKey)> {
let Expr::Binary {
lhs,
op: BinOp::Eq,
rhs,
} = where_expr
else {
return None;
};
let (col_pos, value) = resolve_col_literal_pair(lhs, rhs, schema_cols, table_alias)
.or_else(|| resolve_col_literal_pair(rhs, lhs, schema_cols, table_alias))?;
let key = IndexKey::from_value(&value)?;
Some((col_pos, key))
}
fn resolve_col_literal_pair(
col_side: &Expr,
lit_side: &Expr,
schema_cols: &[ColumnSchema],
table_alias: &str,
) -> Option<(usize, Value)> {
let Expr::Column(c) = col_side else {
return None;
};
if let Some(q) = &c.qualifier
&& q != table_alias
{
return None;
}
let pos = schema_cols.iter().position(|s| s.name == c.name)?;
let Expr::Literal(l) = lit_side else {
return None;
};
let v = match l {
Literal::Integer(n) => {
if let Ok(small) = i32::try_from(*n) {
Value::Int(small)
} else {
Value::BigInt(*n)
}
}
Literal::Float(x) => Value::Float(*x),
Literal::String(s) => Value::Text(s.clone()),
Literal::Bool(b) => Value::Bool(*b),
Literal::Null => Value::Null,
Literal::Vector(_) | Literal::Interval { .. } => return None,
};
Some((pos, v))
}
fn resolve_projection_column<'a>(
c: &ColumnName,
schema_cols: &'a [ColumnSchema],
table_alias: &str,
) -> Result<&'a ColumnSchema, EngineError> {
if let Some(q) = &c.qualifier {
let composite = alloc::format!("{q}.{name}", name = c.name);
if let Some(s) = schema_cols.iter().find(|s| s.name == composite) {
return Ok(s);
}
if q == table_alias
&& let Some(s) = schema_cols.iter().find(|s| s.name == c.name)
{
return Ok(s);
}
let prefix = alloc::format!("{q}.");
let qualifier_known =
q == table_alias || schema_cols.iter().any(|s| s.name.starts_with(&prefix));
if !qualifier_known {
return Err(EngineError::Eval(EvalError::UnknownQualifier {
qualifier: q.clone(),
}));
}
return Err(EngineError::Eval(EvalError::ColumnNotFound {
name: c.name.clone(),
}));
}
if let Some(s) = schema_cols.iter().find(|s| s.name == c.name) {
return Ok(s);
}
let suffix = alloc::format!(".{name}", name = c.name);
let mut matches = schema_cols.iter().filter(|s| s.name.ends_with(&suffix));
let first = matches.next();
let extra = matches.next();
match (first, extra) {
(Some(s), None) => Ok(s),
(Some(_), Some(_)) => Err(EngineError::Eval(EvalError::TypeMismatch {
detail: alloc::format!("ambiguous column reference: {}", c.name),
})),
_ => Err(EngineError::Eval(EvalError::ColumnNotFound {
name: c.name.clone(),
})),
}
}
fn build_projection(
items: &[SelectItem],
schema_cols: &[ColumnSchema],
table_alias: &str,
) -> Result<Vec<ProjectedItem>, EngineError> {
let mut out = Vec::new();
for item in items {
match item {
SelectItem::Wildcard => {
for col in schema_cols {
out.push(ProjectedItem {
expr: Expr::Column(ColumnName {
qualifier: None,
name: col.name.clone(),
}),
output_name: col.name.clone(),
ty: col.ty,
nullable: col.nullable,
});
}
}
SelectItem::Expr { expr, alias } => {
if let Expr::Column(c) = expr {
let sch = resolve_projection_column(c, schema_cols, table_alias)?;
let output_name = alias.clone().unwrap_or_else(|| c.name.clone());
out.push(ProjectedItem {
expr: expr.clone(),
output_name,
ty: sch.ty,
nullable: sch.nullable,
});
} else {
let output_name = alias.clone().unwrap_or_else(|| expr.to_string());
out.push(ProjectedItem {
expr: expr.clone(),
output_name,
ty: DataType::Text,
nullable: true,
});
}
}
}
}
Ok(out)
}
fn numeric_from_integer(
n: i128,
precision: u8,
scale: u8,
col_name: &str,
) -> Result<Value, EngineError> {
let factor = pow10_i128(scale);
let scaled = n.checked_mul(factor).ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"integer overflow scaling value for column `{col_name}` to scale {scale}"
))
})?;
check_precision(scaled, precision, col_name)?;
Ok(Value::Numeric { scaled, scale })
}
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
fn numeric_from_float(
x: f64,
precision: u8,
scale: u8,
col_name: &str,
) -> Result<Value, EngineError> {
if !x.is_finite() {
return Err(EngineError::Unsupported(alloc::format!(
"cannot store non-finite float in NUMERIC column `{col_name}`"
)));
}
let mut factor = 1.0_f64;
for _ in 0..scale {
factor *= 10.0;
}
let shifted = x * factor;
let biased = if shifted >= 0.0 {
shifted + 0.5
} else {
shifted - 0.5
};
if !(-1e38..=1e38).contains(&biased) {
return Err(EngineError::Unsupported(alloc::format!(
"value {x} overflows NUMERIC range for column `{col_name}`"
)));
}
let scaled = biased as i128;
check_precision(scaled, precision, col_name)?;
Ok(Value::Numeric { scaled, scale })
}
fn numeric_rescale(
scaled: i128,
src_scale: u8,
precision: u8,
dst_scale: u8,
col_name: &str,
) -> Result<Value, EngineError> {
let new_scaled = if dst_scale >= src_scale {
let bump = pow10_i128(dst_scale - src_scale);
scaled.checked_mul(bump).ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"overflow rescaling NUMERIC for column `{col_name}`"
))
})?
} else {
let drop = pow10_i128(src_scale - dst_scale);
let half = drop / 2;
if scaled >= 0 {
(scaled + half) / drop
} else {
(scaled - half) / drop
}
};
check_precision(new_scaled, precision, col_name)?;
Ok(Value::Numeric {
scaled: new_scaled,
scale: dst_scale,
})
}
const fn numeric_truncate_to_integer(scaled: i128, scale: u8) -> i128 {
if scale == 0 {
return scaled;
}
let factor = pow10_i128_const(scale);
scaled / factor
}
fn check_precision(scaled: i128, precision: u8, col_name: &str) -> Result<(), EngineError> {
if precision == 0 {
return Ok(());
}
let limit = pow10_i128(precision);
if scaled.unsigned_abs() >= limit.unsigned_abs() {
return Err(EngineError::Unsupported(alloc::format!(
"NUMERIC value exceeds precision {precision} for column `{col_name}`"
)));
}
Ok(())
}
const fn pow10_i128_const(p: u8) -> i128 {
let mut acc: i128 = 1;
let mut i = 0;
while i < p {
acc *= 10;
i += 1;
}
acc
}
fn pow10_i128(p: u8) -> i128 {
pow10_i128_const(p)
}
impl Engine {
#[allow(
clippy::too_many_lines,
clippy::type_complexity,
clippy::needless_range_loop
)] fn exec_select_with_window(
&self,
stmt: &SelectStatement,
cancel: CancelToken<'_>,
) -> Result<QueryResult, EngineError> {
let from = stmt.from.as_ref().ok_or_else(|| {
EngineError::Unsupported("window functions require a FROM clause".into())
})?;
if !from.joins.is_empty() {
return Err(EngineError::Unsupported(
"JOIN with window functions not yet supported".into(),
));
}
let primary = &from.primary;
let table = self.active_catalog().get(&primary.name).ok_or_else(|| {
StorageError::TableNotFound {
name: primary.name.clone(),
}
})?;
let alias = primary.alias.as_deref().unwrap_or(primary.name.as_str());
let schema_cols = &table.schema().columns;
let ctx = EvalContext::new(schema_cols, Some(alias));
let mut filtered: Vec<&Row> = Vec::new();
for (i, row) in table.rows().iter().enumerate() {
if i.is_multiple_of(256) {
cancel.check()?;
}
if let Some(w) = &stmt.where_ {
let cond = eval::eval_expr(w, row, &ctx)?;
if !matches!(cond, Value::Bool(true)) {
continue;
}
}
filtered.push(row);
}
let n_rows = filtered.len();
let mut window_nodes: Vec<Expr> = Vec::new();
for item in &stmt.items {
if let SelectItem::Expr { expr, .. } = item {
collect_window_nodes(expr, &mut window_nodes);
}
}
let mut win_vals: Vec<Vec<Value>> = Vec::with_capacity(window_nodes.len());
for wnode in &window_nodes {
let Expr::WindowFunction {
name,
args,
partition_by,
order_by,
frame,
null_treatment,
} = wnode
else {
unreachable!("collect_window_nodes pushes only WindowFunction");
};
let mut indexed: Vec<(Vec<Value>, Vec<(Value, bool)>, usize)> =
Vec::with_capacity(n_rows);
for (i, row) in filtered.iter().enumerate() {
let pkey: Vec<Value> = partition_by
.iter()
.map(|p| eval::eval_expr(p, row, &ctx))
.collect::<Result<_, _>>()?;
let okey: Vec<(Value, bool)> = order_by
.iter()
.map(|(e, desc)| eval::eval_expr(e, row, &ctx).map(|v| (v, *desc)))
.collect::<Result<_, _>>()?;
indexed.push((pkey, okey, i));
}
indexed.sort_by(|a, b| {
let p_cmp = partition_key_cmp(&a.0, &b.0);
if p_cmp != core::cmp::Ordering::Equal {
return p_cmp;
}
order_key_cmp(&a.1, &b.1)
});
let mut out_vals: Vec<Value> = alloc::vec![Value::Null; n_rows];
let mut p_start = 0;
while p_start < indexed.len() {
let mut p_end = p_start + 1;
while p_end < indexed.len()
&& partition_key_cmp(&indexed[p_start].0, &indexed[p_end].0)
== core::cmp::Ordering::Equal
{
p_end += 1;
}
compute_window_partition(
name,
args,
!order_by.is_empty(),
frame.as_ref(),
*null_treatment,
&indexed[p_start..p_end],
&filtered,
&ctx,
&mut out_vals,
)?;
p_start = p_end;
}
win_vals.push(out_vals);
}
let mut ext_cols = schema_cols.clone();
for i in 0..window_nodes.len() {
ext_cols.push(ColumnSchema::new(
alloc::format!("__win_{i}"),
DataType::Text, true,
));
}
let mut ext_rows: Vec<Row> = Vec::with_capacity(n_rows);
for i in 0..n_rows {
let mut values = filtered[i].values.clone();
for w in 0..window_nodes.len() {
values.push(win_vals[w][i].clone());
}
ext_rows.push(Row::new(values));
}
let mut rewritten_items: Vec<SelectItem> = Vec::with_capacity(stmt.items.len());
for item in &stmt.items {
let new_item = match item {
SelectItem::Wildcard => SelectItem::Wildcard,
SelectItem::Expr { expr, alias } => {
let mut e = expr.clone();
rewrite_window_to_columns(&mut e, &window_nodes);
SelectItem::Expr {
expr: e,
alias: alias.clone(),
}
}
};
rewritten_items.push(new_item);
}
let ext_ctx = EvalContext::new(&ext_cols, Some(alias));
let projection = build_projection(&rewritten_items, &ext_cols, alias)?;
let mut tagged: Vec<(Vec<f64>, Row)> = Vec::with_capacity(n_rows);
for (i, row) in ext_rows.iter().enumerate() {
if i.is_multiple_of(256) {
cancel.check()?;
}
let mut values = Vec::with_capacity(projection.len());
for p in &projection {
values.push(eval::eval_expr(&p.expr, row, &ext_ctx)?);
}
let order_keys = if stmt.order_by.is_empty() {
Vec::new()
} else {
let mut keys = Vec::with_capacity(stmt.order_by.len());
for o in &stmt.order_by {
let mut e = o.expr.clone();
rewrite_window_to_columns(&mut e, &window_nodes);
let key = eval::eval_expr(&e, row, &ext_ctx)?;
keys.push(value_to_order_key(&key)?);
}
keys
};
tagged.push((order_keys, Row::new(values)));
}
if !stmt.order_by.is_empty() {
let descs: Vec<bool> = stmt.order_by.iter().map(|o| o.desc).collect();
sort_by_keys(&mut tagged, &descs);
}
let mut out_rows: Vec<Row> = tagged.into_iter().map(|(_, r)| r).collect();
apply_offset_and_limit(&mut out_rows, stmt.offset, stmt.limit);
let final_cols: Vec<ColumnSchema> = projection
.into_iter()
.map(|p| ColumnSchema::new(p.output_name, p.ty, p.nullable))
.collect();
Ok(QueryResult::Rows {
columns: final_cols,
rows: out_rows,
})
}
fn exec_with_ctes(
&self,
stmt: &SelectStatement,
cancel: CancelToken<'_>,
) -> Result<QueryResult, EngineError> {
cancel.check()?;
let mut catalog = self.active_catalog().clone();
for cte in &stmt.ctes {
if catalog.get(&cte.name).is_some() {
return Err(EngineError::Unsupported(alloc::format!(
"CTE name {:?} shadows an existing table; rename the CTE",
cte.name
)));
}
let (columns, rows) = if cte.recursive {
self.materialise_recursive_cte(cte, &catalog, cancel)?
} else {
let body_result = self.exec_select_cancel(&cte.body, cancel)?;
let QueryResult::Rows { columns, rows } = body_result else {
return Err(EngineError::Unsupported(alloc::format!(
"CTE {:?} body did not return rows",
cte.name
)));
};
(columns, rows)
};
let inferred = infer_column_types(&columns, &rows);
let mut columns = inferred;
if !cte.column_overrides.is_empty() {
if cte.column_overrides.len() != columns.len() {
return Err(EngineError::Unsupported(alloc::format!(
"CTE {:?} column list has {} names but body returns {} columns",
cte.name,
cte.column_overrides.len(),
columns.len()
)));
}
for (col, name) in columns.iter_mut().zip(cte.column_overrides.iter()) {
col.name.clone_from(name);
}
}
let schema = TableSchema::new(cte.name.clone(), columns);
catalog.create_table(schema).map_err(EngineError::Storage)?;
let table = catalog
.get_mut(&cte.name)
.expect("just-created CTE table must exist");
for row in rows {
table.insert(row).map_err(EngineError::Storage)?;
}
}
let mut body = stmt.clone();
body.ctes = Vec::new();
let mut temp = Engine::restore(catalog);
if let Some(c) = self.clock {
temp = temp.with_clock(c);
}
if let Some(f) = self.salt_fn {
temp = temp.with_salt_fn(f);
}
temp.exec_select_cancel(&body, cancel)
}
#[allow(clippy::too_many_lines)]
fn materialise_recursive_cte(
&self,
cte: &spg_sql::ast::Cte,
base_catalog: &Catalog,
cancel: CancelToken<'_>,
) -> Result<(Vec<ColumnSchema>, Vec<Row>), EngineError> {
const MAX_TOTAL_ROWS: usize = 1_000_000;
const MAX_ITERATIONS: usize = 100_000;
cancel.check()?;
if cte.body.unions.is_empty() {
return Err(EngineError::Unsupported(alloc::format!(
"WITH RECURSIVE {:?} body must be a UNION of an anchor and a recursive term",
cte.name
)));
}
let mut anchor = cte.body.clone();
let union_terms = core::mem::take(&mut anchor.unions);
anchor.ctes = Vec::new();
if select_refers_to(&anchor, &cte.name) {
return Err(EngineError::Unsupported(alloc::format!(
"WITH RECURSIVE {:?}: the anchor must not reference the CTE itself",
cte.name
)));
}
let anchor_result = self.exec_select_cancel(&anchor, cancel)?;
let QueryResult::Rows {
columns: anchor_cols,
rows: anchor_rows,
} = anchor_result
else {
return Err(EngineError::Unsupported(alloc::format!(
"WITH RECURSIVE {:?}: anchor did not return rows",
cte.name
)));
};
let mut columns = infer_column_types(&anchor_cols, &anchor_rows);
if !cte.column_overrides.is_empty() {
if cte.column_overrides.len() != columns.len() {
return Err(EngineError::Unsupported(alloc::format!(
"CTE {:?} column list has {} names but anchor returns {} columns",
cte.name,
cte.column_overrides.len(),
columns.len()
)));
}
for (col, name) in columns.iter_mut().zip(cte.column_overrides.iter()) {
col.name.clone_from(name);
}
}
let mut all_rows: Vec<Row> = anchor_rows.clone();
let mut working_set: Vec<Row> = anchor_rows;
let mut seen: alloc::collections::BTreeSet<Vec<u8>> = alloc::collections::BTreeSet::new();
let all_union_all = union_terms.iter().all(|(k, _)| matches!(k, UnionKind::All));
if !all_union_all {
for r in &all_rows {
seen.insert(encode_row_key(r));
}
}
for iter in 0..MAX_ITERATIONS {
cancel.check()?;
if working_set.is_empty() {
break;
}
let mut iter_catalog = base_catalog.clone();
let schema = TableSchema::new(cte.name.clone(), columns.clone());
iter_catalog
.create_table(schema)
.map_err(EngineError::Storage)?;
{
let table = iter_catalog.get_mut(&cte.name).expect("just-created");
for row in &working_set {
table.insert(row.clone()).map_err(EngineError::Storage)?;
}
}
let mut iter_engine = Engine::restore(iter_catalog);
if let Some(c) = self.clock {
iter_engine = iter_engine.with_clock(c);
}
if let Some(f) = self.salt_fn {
iter_engine = iter_engine.with_salt_fn(f);
}
let mut next_set: Vec<Row> = Vec::new();
for (_, term) in &union_terms {
let mut term = term.clone();
term.ctes = Vec::new();
let r = iter_engine.exec_select_cancel(&term, cancel)?;
let QueryResult::Rows {
columns: rc,
rows: rs,
} = r
else {
return Err(EngineError::Unsupported(alloc::format!(
"WITH RECURSIVE {:?}: recursive term did not return rows",
cte.name
)));
};
if rc.len() != columns.len() {
return Err(EngineError::Unsupported(alloc::format!(
"WITH RECURSIVE {:?}: column count of recursive term ({}) does not match anchor ({})",
cte.name,
rc.len(),
columns.len()
)));
}
for row in rs {
if !all_union_all {
let key = encode_row_key(&row);
if !seen.insert(key) {
continue;
}
}
next_set.push(row);
}
}
if next_set.is_empty() {
break;
}
all_rows.extend(next_set.iter().cloned());
working_set = next_set;
if all_rows.len() > MAX_TOTAL_ROWS {
return Err(EngineError::Unsupported(alloc::format!(
"WITH RECURSIVE {:?}: produced more than {MAX_TOTAL_ROWS} rows — likely runaway recursion",
cte.name
)));
}
if iter + 1 == MAX_ITERATIONS {
return Err(EngineError::Unsupported(alloc::format!(
"WITH RECURSIVE {:?}: exceeded {MAX_ITERATIONS} iterations",
cte.name
)));
}
}
Ok((columns, all_rows))
}
fn resolve_select_subqueries(
&self,
stmt: &mut SelectStatement,
cancel: CancelToken<'_>,
) -> Result<(), EngineError> {
for item in &mut stmt.items {
if let SelectItem::Expr { expr, .. } = item {
self.resolve_expr_subqueries(expr, cancel)?;
}
}
if let Some(w) = &mut stmt.where_ {
self.resolve_expr_subqueries(w, cancel)?;
}
if let Some(gs) = &mut stmt.group_by {
for g in gs {
self.resolve_expr_subqueries(g, cancel)?;
}
}
if let Some(h) = &mut stmt.having {
self.resolve_expr_subqueries(h, cancel)?;
}
for o in &mut stmt.order_by {
self.resolve_expr_subqueries(&mut o.expr, cancel)?;
}
for (_, peer) in &mut stmt.unions {
self.resolve_select_subqueries(peer, cancel)?;
}
Ok(())
}
#[allow(clippy::only_used_in_recursion)] fn resolve_expr_subqueries(
&self,
e: &mut Expr,
cancel: CancelToken<'_>,
) -> Result<(), EngineError> {
if let Some(replacement) = self.subquery_replacement(e, cancel)? {
*e = replacement;
return Ok(());
}
match e {
Expr::Binary { lhs, rhs, .. } => {
self.resolve_expr_subqueries(lhs, cancel)?;
self.resolve_expr_subqueries(rhs, cancel)?;
}
Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
self.resolve_expr_subqueries(expr, cancel)?;
}
Expr::FunctionCall { args, .. } => {
for a in args {
self.resolve_expr_subqueries(a, cancel)?;
}
}
Expr::Like { expr, pattern, .. } => {
self.resolve_expr_subqueries(expr, cancel)?;
self.resolve_expr_subqueries(pattern, cancel)?;
}
Expr::Extract { source, .. } => self.resolve_expr_subqueries(source, cancel)?,
Expr::WindowFunction {
args,
partition_by,
order_by,
..
} => {
for a in args {
self.resolve_expr_subqueries(a, cancel)?;
}
for p in partition_by {
self.resolve_expr_subqueries(p, cancel)?;
}
for (e, _) in order_by {
self.resolve_expr_subqueries(e, cancel)?;
}
}
Expr::ScalarSubquery(_)
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::Literal(_)
| Expr::Placeholder(_)
| Expr::Column(_) => {}
}
Ok(())
}
fn eval_expr_with_correlated(
&self,
expr: &Expr,
row: &Row,
ctx: &EvalContext<'_>,
cancel: CancelToken<'_>,
memo: Option<&mut memoize::MemoizeCache>,
) -> Result<Value, EngineError> {
if !expr_has_subquery(expr) {
return eval::eval_expr(expr, row, ctx).map_err(EngineError::Eval);
}
let mut e = expr.clone();
self.resolve_correlated_in_expr(&mut e, row, ctx, cancel, memo)?;
eval::eval_expr(&e, row, ctx).map_err(EngineError::Eval)
}
fn resolve_correlated_in_expr(
&self,
e: &mut Expr,
row: &Row,
ctx: &EvalContext<'_>,
cancel: CancelToken<'_>,
mut memo: Option<&mut memoize::MemoizeCache>,
) -> Result<(), EngineError> {
match e {
Expr::ScalarSubquery(inner) => {
let cache_key = memo.as_ref().map(|_| memoize::CacheKey {
subquery_repr: alloc::format!("{}", **inner),
outer_values: row.values.clone(),
});
if let (Some(cache), Some(k)) = (memo.as_deref_mut(), cache_key.as_ref())
&& let Some(cached) = cache.get(k)
{
*e = value_to_literal_expr(cached)?;
return Ok(());
}
let mut s = (**inner).clone();
substitute_outer_columns(&mut s, row, ctx);
let r = self.exec_select_cancel(&s, cancel)?;
let QueryResult::Rows { rows, .. } = r else {
return Err(EngineError::Unsupported(
"scalar subquery: inner did not return rows".into(),
));
};
let value = match rows.as_slice() {
[] => Value::Null,
[r0] => r0.values.first().cloned().unwrap_or(Value::Null),
_ => {
return Err(EngineError::Unsupported(alloc::format!(
"scalar subquery returned {} rows; expected 0 or 1",
rows.len()
)));
}
};
if let (Some(cache), Some(k)) = (memo.as_deref_mut(), cache_key) {
cache.insert(k, value.clone());
}
*e = value_to_literal_expr(value)?;
}
Expr::Exists { subquery, negated } => {
let mut s = (**subquery).clone();
substitute_outer_columns(&mut s, row, ctx);
let r = self.exec_select_cancel(&s, cancel)?;
let exists = matches!(r, QueryResult::Rows { rows, .. } if !rows.is_empty());
let bit = if *negated { !exists } else { exists };
*e = Expr::Literal(Literal::Bool(bit));
}
Expr::InSubquery {
expr: lhs,
subquery,
negated,
} => {
self.resolve_correlated_in_expr(lhs, row, ctx, cancel, memo.as_deref_mut())?;
let lhs_val = eval::eval_expr(lhs, row, ctx).map_err(EngineError::Eval)?;
let mut s = (**subquery).clone();
substitute_outer_columns(&mut s, row, ctx);
let r = self.exec_select_cancel(&s, cancel)?;
let QueryResult::Rows { columns, rows, .. } = r else {
return Err(EngineError::Unsupported(
"IN-subquery: inner did not return rows".into(),
));
};
if columns.len() != 1 {
return Err(EngineError::Unsupported(alloc::format!(
"IN-subquery must project exactly one column; got {}",
columns.len()
)));
}
let mut found = false;
let mut any_null = false;
for r0 in rows {
let v = r0.values.into_iter().next().unwrap_or(Value::Null);
if v.is_null() {
any_null = true;
continue;
}
if value_cmp(&v, &lhs_val) == core::cmp::Ordering::Equal {
found = true;
break;
}
}
let bit = if found {
!*negated
} else if any_null {
return Err(EngineError::Unsupported(
"IN-subquery with NULL in result and no match: NULL semantics not yet implemented".into(),
));
} else {
*negated
};
*e = Expr::Literal(Literal::Bool(bit));
}
Expr::Binary { lhs, rhs, .. } => {
self.resolve_correlated_in_expr(lhs, row, ctx, cancel, memo.as_deref_mut())?;
self.resolve_correlated_in_expr(rhs, row, ctx, cancel, memo.as_deref_mut())?;
}
Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
self.resolve_correlated_in_expr(expr, row, ctx, cancel, memo.as_deref_mut())?;
}
Expr::Like { expr, pattern, .. } => {
self.resolve_correlated_in_expr(expr, row, ctx, cancel, memo.as_deref_mut())?;
self.resolve_correlated_in_expr(pattern, row, ctx, cancel, memo.as_deref_mut())?;
}
Expr::FunctionCall { args, .. } => {
for a in args {
self.resolve_correlated_in_expr(a, row, ctx, cancel, memo.as_deref_mut())?;
}
}
Expr::Extract { source, .. } => {
self.resolve_correlated_in_expr(source, row, ctx, cancel, memo.as_deref_mut())?;
}
Expr::WindowFunction { .. } | Expr::Literal(_) | Expr::Placeholder(_) | Expr::Column(_) => {}
}
Ok(())
}
fn subquery_replacement(
&self,
e: &Expr,
cancel: CancelToken<'_>,
) -> Result<Option<Expr>, EngineError> {
match e {
Expr::ScalarSubquery(inner) => {
let mut s = (**inner).clone();
self.resolve_select_subqueries(&mut s, cancel)?;
let r = match self.exec_bare_select_cancel(&s, cancel) {
Ok(r) => r,
Err(e) if is_correlation_error(&e) => return Ok(None),
Err(e) => return Err(e),
};
let QueryResult::Rows { rows, .. } = r else {
return Err(EngineError::Unsupported(
"scalar subquery: inner statement did not return rows".into(),
));
};
let value = match rows.as_slice() {
[] => Value::Null,
[row] => row.values.first().cloned().unwrap_or(Value::Null),
_ => {
return Err(EngineError::Unsupported(alloc::format!(
"scalar subquery returned {} rows; expected 0 or 1",
rows.len()
)));
}
};
Ok(Some(value_to_literal_expr(value)?))
}
Expr::Exists { subquery, negated } => {
let mut s = (**subquery).clone();
self.resolve_select_subqueries(&mut s, cancel)?;
let r = match self.exec_bare_select_cancel(&s, cancel) {
Ok(r) => r,
Err(e) if is_correlation_error(&e) => return Ok(None),
Err(e) => return Err(e),
};
let exists = match r {
QueryResult::Rows { rows, .. } => !rows.is_empty(),
QueryResult::CommandOk { .. } => false,
};
let bit = if *negated { !exists } else { exists };
Ok(Some(Expr::Literal(Literal::Bool(bit))))
}
Expr::InSubquery {
expr,
subquery,
negated,
} => {
let mut s = (**subquery).clone();
self.resolve_select_subqueries(&mut s, cancel)?;
let r = match self.exec_bare_select_cancel(&s, cancel) {
Ok(r) => r,
Err(e) if is_correlation_error(&e) => return Ok(None),
Err(e) => return Err(e),
};
let QueryResult::Rows { columns, rows, .. } = r else {
return Err(EngineError::Unsupported(
"IN-subquery: inner statement did not return rows".into(),
));
};
if columns.len() != 1 {
return Err(EngineError::Unsupported(alloc::format!(
"IN-subquery must project exactly one column; got {}",
columns.len()
)));
}
let mut acc: Option<Expr> = None;
for row in rows {
let v = row.values.into_iter().next().unwrap_or(Value::Null);
let lit = value_to_literal_expr(v)?;
let cmp = Expr::Binary {
lhs: expr.clone(),
op: BinOp::Eq,
rhs: Box::new(lit),
};
acc = Some(match acc {
None => cmp,
Some(prev) => Expr::Binary {
lhs: Box::new(prev),
op: BinOp::Or,
rhs: Box::new(cmp),
},
});
}
let combined = acc.unwrap_or(Expr::Literal(Literal::Bool(false)));
let final_expr = if *negated {
Expr::Unary {
op: UnOp::Not,
expr: Box::new(combined),
}
} else {
combined
};
Ok(Some(final_expr))
}
_ => Ok(None),
}
}
}
fn select_refers_to(stmt: &SelectStatement, target: &str) -> bool {
if let Some(from) = &stmt.from
&& from_refers_to(from, target)
{
return true;
}
for (_, peer) in &stmt.unions {
if select_refers_to(peer, target) {
return true;
}
}
for item in &stmt.items {
if let SelectItem::Expr { expr, .. } = item
&& expr_refers_to(expr, target)
{
return true;
}
}
if let Some(w) = &stmt.where_
&& expr_refers_to(w, target)
{
return true;
}
false
}
fn from_refers_to(from: &FromClause, target: &str) -> bool {
if from.primary.name.eq_ignore_ascii_case(target) {
return true;
}
from.joins
.iter()
.any(|j| j.table.name.eq_ignore_ascii_case(target))
}
fn expr_refers_to(e: &Expr, target: &str) -> bool {
match e {
Expr::ScalarSubquery(s) => select_refers_to(s, target),
Expr::Exists { subquery, .. } | Expr::InSubquery { subquery, .. } => {
select_refers_to(subquery, target)
}
Expr::Binary { lhs, rhs, .. } => expr_refers_to(lhs, target) || expr_refers_to(rhs, target),
Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
expr_refers_to(expr, target)
}
Expr::Like { expr, pattern, .. } => {
expr_refers_to(expr, target) || expr_refers_to(pattern, target)
}
Expr::FunctionCall { args, .. } => args.iter().any(|a| expr_refers_to(a, target)),
Expr::Extract { source, .. } => expr_refers_to(source, target),
Expr::WindowFunction {
args,
partition_by,
order_by,
..
} => {
args.iter().any(|a| expr_refers_to(a, target))
|| partition_by.iter().any(|p| expr_refers_to(p, target))
|| order_by.iter().any(|(o, _)| expr_refers_to(o, target))
}
Expr::Literal(_) | Expr::Placeholder(_) | Expr::Column(_) => false,
}
}
fn infer_column_types(columns: &[ColumnSchema], rows: &[Row]) -> Vec<ColumnSchema> {
let mut out = columns.to_vec();
for (col_idx, col) in out.iter_mut().enumerate() {
if col.ty != DataType::Text {
continue;
}
let mut inferred: Option<DataType> = None;
let mut all_null = true;
for row in rows {
let Some(v) = row.values.get(col_idx) else {
continue;
};
let ty = match v {
Value::Null => continue,
Value::SmallInt(_) => DataType::SmallInt,
Value::Int(_) => DataType::Int,
Value::BigInt(_) => DataType::BigInt,
Value::Float(_) => DataType::Float,
Value::Bool(_) => DataType::Bool,
Value::Vector(_) => DataType::Vector {
dim: 0,
encoding: VecEncoding::F32,
},
_ => DataType::Text,
};
all_null = false;
inferred = Some(match inferred {
None => ty,
Some(prev) if prev == ty => prev,
Some(_) => DataType::Text,
});
}
if let Some(t) = inferred {
col.ty = t;
col.nullable = true;
} else if all_null {
col.nullable = true;
}
}
out
}
#[allow(clippy::too_many_lines, clippy::format_push_string)]
fn build_index_suggestions(stmt: &SelectStatement, engine: &Engine) -> Vec<String> {
use alloc::collections::BTreeSet;
let mut seen: BTreeSet<(String, String)> = BTreeSet::new();
let mut out: Vec<String> = Vec::new();
let cat = engine.active_catalog();
let Some(from) = &stmt.from else {
return out;
};
let mut tables: Vec<String> = Vec::new();
tables.push(from.primary.name.clone());
for j in &from.joins {
tables.push(j.table.name.clone());
}
let mut col_refs: Vec<spg_sql::ast::ColumnName> = Vec::new();
if let Some(w) = &stmt.where_ {
collect_column_refs(w, &mut col_refs);
}
for j in &from.joins {
if let Some(on) = &j.on {
collect_column_refs(on, &mut col_refs);
}
}
for cn in &col_refs {
let owner: Option<String> = if let Some(q) = &cn.qualifier {
tables.iter().find(|t| t == &q).cloned()
} else {
tables.iter().find_map(|t| {
cat.get(t).and_then(|tbl| {
if tbl.schema().column_position(&cn.name).is_some() {
Some(t.clone())
} else {
None
}
})
})
};
let Some(owner) = owner else {
continue;
};
let Some(tbl) = cat.get(&owner) else {
continue;
};
let Some(col_pos) = tbl.schema().column_position(&cn.name) else {
continue;
};
let already_indexed = tbl.indices().iter().any(|i| {
matches!(i.kind, spg_storage::IndexKind::BTree(_))
&& i.column_position == col_pos
&& i.expression.is_none()
&& i.partial_predicate.is_none()
});
if already_indexed {
continue;
}
if seen.insert((owner.clone(), cn.name.clone())) {
out.push(alloc::format!(
"SUGGEST: CREATE INDEX ix_{}_{} ON {} ({})",
owner,
cn.name,
owner,
cn.name
));
}
}
out
}
fn collect_column_refs(expr: &Expr, out: &mut Vec<spg_sql::ast::ColumnName>) {
match expr {
Expr::Column(cn) => out.push(cn.clone()),
Expr::FunctionCall { args, .. } => {
for a in args {
collect_column_refs(a, out);
}
}
Expr::Binary { lhs, rhs, .. } => {
collect_column_refs(lhs, out);
collect_column_refs(rhs, out);
}
Expr::Unary { expr: e, .. } => collect_column_refs(e, out),
_ => {}
}
}
fn annotate_explain_lines(lines: &mut [String], total_rows: usize, engine: &Engine) {
let catalog = engine.active_catalog();
let cold_ids = catalog.cold_segment_ids_global();
let any_cold = !cold_ids.is_empty();
let cold_ids_repr = if any_cold {
let mut s = alloc::string::String::from("[");
for (i, id) in cold_ids.iter().enumerate() {
if i > 0 {
s.push(',');
}
s.push_str(&alloc::format!("{id}"));
}
s.push(']');
s
} else {
alloc::string::String::new()
};
for (idx, line) in lines.iter_mut().enumerate() {
let trimmed = line.trim_start();
let is_top_level = idx == 0;
if is_top_level {
line.push_str(&alloc::format!(" (rows={total_rows})"));
continue;
}
if let Some(rest) = trimmed.strip_prefix("From: ") {
let (name, scan_kind) = match rest.split_once(" [") {
Some((n, k)) => (n.trim(), k.trim_end_matches(']')),
None => (rest.trim(), ""),
};
let bare = name.split_whitespace().next().unwrap_or(name);
let hot = catalog.get(bare).map(|t| t.rows().len());
let annot = match (hot, scan_kind) {
(Some(h), "full scan") => {
let mut s = alloc::format!(" (hot_rows={h}");
if any_cold {
s.push_str(&alloc::format!(
", cold_tier=present, cold_segments={cold_ids_repr}"
));
}
s.push(')');
s
}
(Some(h), "index seek") => {
let mut s = alloc::format!(" (hot_rows≤{h}");
if any_cold {
s.push_str(&alloc::format!(
", cold_tier=present, cold_segments={cold_ids_repr}"
));
}
s.push(')');
s
}
_ => " (rows=—)".to_string(),
};
line.push_str(&annot);
continue;
}
line.push_str(" (rows=—)");
}
}
fn explain_select(stmt: &SelectStatement, engine: &Engine, depth: usize, out: &mut Vec<String>) {
let pad = " ".repeat(depth);
let top = if !stmt.ctes.is_empty() {
if stmt.ctes.iter().any(|c| c.recursive) {
"CTEScan (WITH RECURSIVE)"
} else {
"CTEScan (WITH)"
}
} else if !stmt.unions.is_empty() {
"UnionScan"
} else if select_has_window(stmt) {
"WindowAgg"
} else if aggregate::uses_aggregate(stmt) {
"Aggregate"
} else if stmt.distinct {
"Distinct"
} else if stmt.from.is_some() {
"TableScan"
} else {
"Result"
};
out.push(alloc::format!("{pad}{top}"));
let child = " ".repeat(depth + 1);
for cte in &stmt.ctes {
let head = if cte.recursive {
alloc::format!("{child}CTE (recursive): {}", cte.name)
} else {
alloc::format!("{child}CTE: {}", cte.name)
};
out.push(head);
explain_select(&cte.body, engine, depth + 2, out);
}
if let Some(from) = &stmt.from {
let mut tag = alloc::format!("{child}From: {}", from.primary.name);
if let Some(alias) = &from.primary.alias {
tag.push_str(&alloc::format!(" AS {alias}"));
}
if let Some(w) = &stmt.where_
&& let Some(table) = engine.active_catalog().get(&from.primary.name)
{
let alias = from.primary.alias.as_deref().unwrap_or(&from.primary.name);
let cols = &table.schema().columns;
if try_index_seek(w, cols, engine.active_catalog(), table, alias).is_some() {
tag.push_str(" [index seek]");
} else {
tag.push_str(" [full scan]");
}
} else {
tag.push_str(" [full scan]");
}
out.push(tag);
for j in &from.joins {
let kind = match j.kind {
spg_sql::ast::JoinKind::Inner => "INNER JOIN",
spg_sql::ast::JoinKind::Left => "LEFT JOIN",
spg_sql::ast::JoinKind::Cross => "CROSS JOIN",
};
let mut s = alloc::format!("{child}{kind}: {}", j.table.name);
if let Some(alias) = &j.table.alias {
s.push_str(&alloc::format!(" AS {alias}"));
}
if j.on.is_some() {
s.push_str(" (ON …)");
}
out.push(s);
}
}
if let Some(w) = &stmt.where_ {
let mut s = alloc::format!("{child}Filter: {w}");
if expr_has_subquery(w) {
s.push_str(" [subquery]");
}
out.push(s);
}
if let Some(gs) = &stmt.group_by {
let mut parts = Vec::new();
for g in gs {
parts.push(alloc::format!("{g}"));
}
out.push(alloc::format!("{child}GroupBy: {}", parts.join(", ")));
}
if let Some(h) = &stmt.having {
out.push(alloc::format!("{child}Having: {h}"));
}
for o in &stmt.order_by {
let dir = if o.desc { "DESC" } else { "ASC" };
out.push(alloc::format!("{child}OrderBy: {} {dir}", o.expr));
}
if let Some(lim) = stmt.limit {
out.push(alloc::format!("{child}Limit: {lim}"));
}
if let Some(off) = stmt.offset {
out.push(alloc::format!("{child}Offset: {off}"));
}
if stmt
.items
.iter()
.any(|it| matches!(it, SelectItem::Wildcard))
{
out.push(alloc::format!("{child}Project: *"));
} else {
out.push(alloc::format!(
"{child}Project: {} item(s)",
stmt.items.len()
));
}
for (kind, peer) in &stmt.unions {
let label = match kind {
UnionKind::All => "UNION ALL",
UnionKind::Distinct => "UNION",
};
out.push(alloc::format!("{child}{label}"));
explain_select(peer, engine, depth + 2, out);
}
}
fn is_correlation_error(e: &EngineError) -> bool {
matches!(
e,
EngineError::Eval(
eval::EvalError::ColumnNotFound { .. } | eval::EvalError::UnknownQualifier { .. }
)
)
}
fn substitute_outer_columns(stmt: &mut SelectStatement, row: &Row, ctx: &EvalContext<'_>) {
let Some(outer_alias) = ctx.table_alias else {
return;
};
substitute_in_select(stmt, row, ctx, outer_alias);
}
fn substitute_in_select(
stmt: &mut SelectStatement,
row: &Row,
ctx: &EvalContext<'_>,
outer_alias: &str,
) {
for item in &mut stmt.items {
if let SelectItem::Expr { expr, .. } = item {
substitute_in_expr(expr, row, ctx, outer_alias);
}
}
if let Some(w) = &mut stmt.where_ {
substitute_in_expr(w, row, ctx, outer_alias);
}
if let Some(gs) = &mut stmt.group_by {
for g in gs {
substitute_in_expr(g, row, ctx, outer_alias);
}
}
if let Some(h) = &mut stmt.having {
substitute_in_expr(h, row, ctx, outer_alias);
}
for o in &mut stmt.order_by {
substitute_in_expr(&mut o.expr, row, ctx, outer_alias);
}
for (_, peer) in &mut stmt.unions {
substitute_in_select(peer, row, ctx, outer_alias);
}
}
fn substitute_in_expr(e: &mut Expr, row: &Row, ctx: &EvalContext<'_>, outer_alias: &str) {
if let Expr::Column(c) = e
&& let Some(qual) = &c.qualifier
&& qual.eq_ignore_ascii_case(outer_alias)
{
if let Some(idx) = ctx
.columns
.iter()
.position(|sc| sc.name.eq_ignore_ascii_case(&c.name))
{
let v = row.values.get(idx).cloned().unwrap_or(Value::Null);
if let Ok(lit) = value_to_literal_expr(v) {
*e = lit;
return;
}
}
}
match e {
Expr::Binary { lhs, rhs, .. } => {
substitute_in_expr(lhs, row, ctx, outer_alias);
substitute_in_expr(rhs, row, ctx, outer_alias);
}
Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
substitute_in_expr(expr, row, ctx, outer_alias);
}
Expr::Like { expr, pattern, .. } => {
substitute_in_expr(expr, row, ctx, outer_alias);
substitute_in_expr(pattern, row, ctx, outer_alias);
}
Expr::FunctionCall { args, .. } => {
for a in args {
substitute_in_expr(a, row, ctx, outer_alias);
}
}
Expr::Extract { source, .. } => substitute_in_expr(source, row, ctx, outer_alias),
Expr::WindowFunction {
args,
partition_by,
order_by,
..
} => {
for a in args {
substitute_in_expr(a, row, ctx, outer_alias);
}
for p in partition_by {
substitute_in_expr(p, row, ctx, outer_alias);
}
for (o, _) in order_by {
substitute_in_expr(o, row, ctx, outer_alias);
}
}
Expr::ScalarSubquery(s) => substitute_in_select(s, row, ctx, outer_alias),
Expr::Exists { subquery, .. } | Expr::InSubquery { subquery, .. } => {
substitute_in_select(subquery, row, ctx, outer_alias);
}
Expr::Literal(_) | Expr::Placeholder(_) | Expr::Column(_) => {}
}
}
fn encode_row_key(row: &Row) -> Vec<u8> {
let mut out = Vec::new();
for v in &row.values {
let s = alloc::format!("{v:?}|");
out.extend_from_slice(s.as_bytes());
}
out
}
fn select_has_window(stmt: &SelectStatement) -> bool {
for item in &stmt.items {
if let SelectItem::Expr { expr, .. } = item
&& expr_has_window(expr)
{
return true;
}
}
false
}
fn expr_has_window(e: &Expr) -> bool {
match e {
Expr::WindowFunction { .. } => true,
Expr::Binary { lhs, rhs, .. } => expr_has_window(lhs) || expr_has_window(rhs),
Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
expr_has_window(expr)
}
Expr::FunctionCall { args, .. } => args.iter().any(expr_has_window),
Expr::Like { expr, pattern, .. } => expr_has_window(expr) || expr_has_window(pattern),
Expr::Extract { source, .. } => expr_has_window(source),
Expr::ScalarSubquery(_)
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::Literal(_)
| Expr::Placeholder(_)
| Expr::Column(_) => false,
}
}
fn collect_window_nodes(e: &Expr, out: &mut Vec<Expr>) {
if let Expr::WindowFunction { .. } = e {
if !out.iter().any(|x| x == e) {
out.push(e.clone());
}
return;
}
match e {
Expr::WindowFunction { .. } => unreachable!(),
Expr::Binary { lhs, rhs, .. } => {
collect_window_nodes(lhs, out);
collect_window_nodes(rhs, out);
}
Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
collect_window_nodes(expr, out);
}
Expr::FunctionCall { args, .. } => {
for a in args {
collect_window_nodes(a, out);
}
}
Expr::Like { expr, pattern, .. } => {
collect_window_nodes(expr, out);
collect_window_nodes(pattern, out);
}
Expr::Extract { source, .. } => collect_window_nodes(source, out),
_ => {}
}
}
fn rewrite_window_to_columns(e: &mut Expr, window_nodes: &[Expr]) {
if let Expr::WindowFunction { .. } = e
&& let Some(idx) = window_nodes.iter().position(|w| w == e)
{
*e = Expr::Column(spg_sql::ast::ColumnName {
qualifier: None,
name: alloc::format!("__win_{idx}"),
});
return;
}
match e {
Expr::Binary { lhs, rhs, .. } => {
rewrite_window_to_columns(lhs, window_nodes);
rewrite_window_to_columns(rhs, window_nodes);
}
Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
rewrite_window_to_columns(expr, window_nodes);
}
Expr::FunctionCall { args, .. } => {
for a in args {
rewrite_window_to_columns(a, window_nodes);
}
}
Expr::Like { expr, pattern, .. } => {
rewrite_window_to_columns(expr, window_nodes);
rewrite_window_to_columns(pattern, window_nodes);
}
Expr::Extract { source, .. } => rewrite_window_to_columns(source, window_nodes),
_ => {}
}
}
fn partition_key_cmp(a: &[Value], b: &[Value]) -> core::cmp::Ordering {
for (x, y) in a.iter().zip(b.iter()) {
let c = value_cmp(x, y);
if c != core::cmp::Ordering::Equal {
return c;
}
}
a.len().cmp(&b.len())
}
fn order_key_cmp(a: &[(Value, bool)], b: &[(Value, bool)]) -> core::cmp::Ordering {
for ((va, desc), (vb, _)) in a.iter().zip(b.iter()) {
let c = value_cmp(va, vb);
let c = if *desc { c.reverse() } else { c };
if c != core::cmp::Ordering::Equal {
return c;
}
}
a.len().cmp(&b.len())
}
#[allow(clippy::match_same_arms)] fn value_cmp(a: &Value, b: &Value) -> core::cmp::Ordering {
use core::cmp::Ordering;
match (a, b) {
(Value::Null, Value::Null) => Ordering::Equal,
(Value::Null, _) => Ordering::Less,
(_, Value::Null) => Ordering::Greater,
(Value::Int(x), Value::Int(y)) => x.cmp(y),
(Value::BigInt(x), Value::BigInt(y)) => x.cmp(y),
(Value::SmallInt(x), Value::SmallInt(y)) => x.cmp(y),
(Value::Text(x), Value::Text(y)) => x.cmp(y),
(Value::Bool(x), Value::Bool(y)) => x.cmp(y),
(Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
(Value::Date(x), Value::Date(y)) => x.cmp(y),
(Value::Timestamp(x), Value::Timestamp(y)) => x.cmp(y),
_ => alloc::format!("{a:?}").cmp(&alloc::format!("{b:?}")),
}
}
#[allow(
clippy::too_many_arguments,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::doc_markdown,
clippy::too_many_lines,
clippy::type_complexity,
clippy::match_same_arms
)]
fn compute_window_partition(
name: &str,
args: &[Expr],
ordered: bool,
frame: Option<&WindowFrame>,
null_treatment: spg_sql::ast::NullTreatment,
slice: &[(Vec<Value>, Vec<(Value, bool)>, usize)],
filtered_rows: &[&Row],
ctx: &EvalContext<'_>,
out_vals: &mut [Value],
) -> Result<(), EngineError> {
let ignore_nulls = matches!(null_treatment, spg_sql::ast::NullTreatment::Ignore);
let lower = name.to_ascii_lowercase();
match lower.as_str() {
"row_number" => {
for (rank, (_, _, idx)) in slice.iter().enumerate() {
out_vals[*idx] = Value::BigInt((rank + 1) as i64);
}
Ok(())
}
"rank" => {
let mut prev_key: Option<&[(Value, bool)]> = None;
let mut current_rank: i64 = 1;
for (i, (_, okey, idx)) in slice.iter().enumerate() {
if let Some(p) = prev_key
&& order_key_cmp(p, okey) != core::cmp::Ordering::Equal
{
current_rank = (i + 1) as i64;
}
if prev_key.is_none() {
current_rank = 1;
}
out_vals[*idx] = Value::BigInt(current_rank);
prev_key = Some(okey.as_slice());
}
Ok(())
}
"dense_rank" => {
let mut prev_key: Option<&[(Value, bool)]> = None;
let mut current_rank: i64 = 0;
for (_, okey, idx) in slice {
if prev_key.is_none_or(|p| order_key_cmp(p, okey) != core::cmp::Ordering::Equal) {
current_rank += 1;
}
out_vals[*idx] = Value::BigInt(current_rank);
prev_key = Some(okey.as_slice());
}
Ok(())
}
"sum" | "avg" | "min" | "max" | "count" | "count_star" => {
let arg_values: Vec<Value> = if lower == "count_star" || args.is_empty() {
slice.iter().map(|_| Value::Null).collect()
} else {
slice
.iter()
.map(|(_, _, idx)| eval::eval_expr(&args[0], filtered_rows[*idx], ctx))
.collect::<Result<_, _>>()
.map_err(EngineError::Eval)?
};
let eff = effective_frame(frame, ordered)?;
#[allow(clippy::needless_range_loop)]
for i in 0..slice.len() {
let (lo, hi) = frame_bounds_for_row(&eff, i, slice);
let mut sum: f64 = 0.0;
let mut count: i64 = 0;
let mut min_v: Option<f64> = None;
let mut max_v: Option<f64> = None;
let mut row_count: i64 = 0;
if lo <= hi {
for j in lo..=hi {
let v = &arg_values[j];
match lower.as_str() {
"count_star" => row_count += 1,
"count" => {
if !v.is_null() {
count += 1;
}
}
_ => {
if let Some(x) = value_to_f64(v) {
sum += x;
count += 1;
min_v = Some(min_v.map_or(x, |m| m.min(x)));
max_v = Some(max_v.map_or(x, |m| m.max(x)));
}
}
}
}
}
let value = match lower.as_str() {
"count_star" => Value::BigInt(row_count),
"count" => Value::BigInt(count),
"sum" => Value::Float(sum),
"avg" => {
if count == 0 {
Value::Null
} else {
Value::Float(sum / count as f64)
}
}
"min" => min_v.map_or(Value::Null, Value::Float),
"max" => max_v.map_or(Value::Null, Value::Float),
_ => unreachable!(),
};
let (_, _, idx) = &slice[i];
out_vals[*idx] = value;
}
Ok(())
}
"lag" | "lead" => {
if args.is_empty() {
return Err(EngineError::Unsupported(alloc::format!(
"{lower}() requires at least one argument"
)));
}
let offset: i64 = if args.len() >= 2 {
let v = eval::eval_expr(&args[1], filtered_rows[slice[0].2], ctx)
.map_err(EngineError::Eval)?;
match v {
Value::SmallInt(n) => i64::from(n),
Value::Int(n) => i64::from(n),
Value::BigInt(n) => n,
_ => {
return Err(EngineError::Unsupported(alloc::format!(
"{lower}() offset must be integer"
)));
}
}
} else {
1
};
let default: Value = if args.len() >= 3 {
eval::eval_expr(&args[2], filtered_rows[slice[0].2], ctx)
.map_err(EngineError::Eval)?
} else {
Value::Null
};
let values: Vec<Value> = slice
.iter()
.map(|(_, _, idx)| eval::eval_expr(&args[0], filtered_rows[*idx], ctx))
.collect::<Result<_, _>>()
.map_err(EngineError::Eval)?;
let n = slice.len();
for (i, (_, _, idx)) in slice.iter().enumerate() {
let signed_offset = if lower == "lag" { -offset } else { offset };
let v = if ignore_nulls {
let step: i64 = if signed_offset >= 0 { 1 } else { -1 };
let needed: i64 = signed_offset.abs();
if needed == 0 {
values[i].clone()
} else {
let mut j: i64 = i as i64;
let mut hits: i64 = 0;
let mut found: Option<Value> = None;
loop {
j += step;
if j < 0 || j >= n as i64 {
break;
}
#[allow(clippy::cast_sign_loss)]
let v = &values[j as usize];
if !v.is_null() {
hits += 1;
if hits == needed {
found = Some(v.clone());
break;
}
}
}
found.unwrap_or_else(|| default.clone())
}
} else {
let target_signed = i64::try_from(i).unwrap_or(i64::MAX) + signed_offset;
if target_signed < 0
|| target_signed >= i64::try_from(n).unwrap_or(i64::MAX)
{
default.clone()
} else {
#[allow(clippy::cast_sign_loss)]
{
values[target_signed as usize].clone()
}
}
};
out_vals[*idx] = v;
}
Ok(())
}
"first_value" | "last_value" | "nth_value" => {
if args.is_empty() {
return Err(EngineError::Unsupported(alloc::format!(
"{lower}() requires at least one argument"
)));
}
let values: Vec<Value> = slice
.iter()
.map(|(_, _, idx)| eval::eval_expr(&args[0], filtered_rows[*idx], ctx))
.collect::<Result<_, _>>()
.map_err(EngineError::Eval)?;
let nth: usize = if lower == "nth_value" {
if args.len() < 2 {
return Err(EngineError::Unsupported(
"nth_value() requires (expr, n)".into(),
));
}
let v = eval::eval_expr(&args[1], filtered_rows[slice[0].2], ctx)
.map_err(EngineError::Eval)?;
let raw = match v {
Value::SmallInt(n) => i64::from(n),
Value::Int(n) => i64::from(n),
Value::BigInt(n) => n,
_ => {
return Err(EngineError::Unsupported(
"nth_value() n must be integer".into(),
));
}
};
if raw < 1 {
return Err(EngineError::Unsupported(
"nth_value() n must be >= 1".into(),
));
}
#[allow(clippy::cast_sign_loss)]
{
raw as usize
}
} else {
0
};
let eff = effective_frame(frame, ordered)?;
for i in 0..slice.len() {
let (lo, hi) = frame_bounds_for_row(&eff, i, slice);
let (_, _, idx) = &slice[i];
let v = if lo > hi {
Value::Null
} else if ignore_nulls && matches!(lower.as_str(), "first_value" | "last_value") {
if lower == "first_value" {
(lo..=hi)
.find_map(|j| {
let v = &values[j];
(!v.is_null()).then(|| v.clone())
})
.unwrap_or(Value::Null)
} else {
(lo..=hi)
.rev()
.find_map(|j| {
let v = &values[j];
(!v.is_null()).then(|| v.clone())
})
.unwrap_or(Value::Null)
}
} else {
match lower.as_str() {
"first_value" => values[lo].clone(),
"last_value" => values[hi].clone(),
"nth_value" => {
let pos = lo + nth - 1;
if pos > hi {
Value::Null
} else {
values[pos].clone()
}
}
_ => unreachable!(),
}
};
out_vals[*idx] = v;
}
Ok(())
}
"ntile" => {
if args.is_empty() {
return Err(EngineError::Unsupported(
"ntile(n) requires an integer argument".into(),
));
}
let v = eval::eval_expr(&args[0], filtered_rows[slice[0].2], ctx)
.map_err(EngineError::Eval)?;
let bucket_count: i64 = match v {
Value::SmallInt(n) => i64::from(n),
Value::Int(n) => i64::from(n),
Value::BigInt(n) => n,
_ => {
return Err(EngineError::Unsupported(
"ntile() argument must be integer".into(),
));
}
};
if bucket_count < 1 {
return Err(EngineError::Unsupported(
"ntile() argument must be >= 1".into(),
));
}
#[allow(clippy::cast_sign_loss)]
let buckets = bucket_count as usize;
let n = slice.len();
let base = n / buckets;
let extras = n % buckets;
let mut bucket: usize = 1;
let mut remaining_in_bucket = if extras > 0 { base + 1 } else { base };
let mut buckets_with_extra_remaining = extras;
for (_, _, idx) in slice {
if remaining_in_bucket == 0 {
bucket += 1;
buckets_with_extra_remaining = buckets_with_extra_remaining.saturating_sub(1);
remaining_in_bucket = if buckets_with_extra_remaining > 0 {
base + 1
} else {
base
};
if remaining_in_bucket == 0 {
remaining_in_bucket = 1;
}
}
out_vals[*idx] = Value::BigInt(i64::try_from(bucket).unwrap_or(i64::MAX));
remaining_in_bucket -= 1;
}
Ok(())
}
"percent_rank" => {
let n = slice.len();
let mut prev_key: Option<&[(Value, bool)]> = None;
let mut current_rank: i64 = 1;
for (i, (_, okey, idx)) in slice.iter().enumerate() {
if let Some(p) = prev_key
&& order_key_cmp(p, okey) != core::cmp::Ordering::Equal
{
current_rank = i64::try_from(i + 1).unwrap_or(i64::MAX);
}
if prev_key.is_none() {
current_rank = 1;
}
#[allow(clippy::cast_precision_loss)]
let pr = if n <= 1 {
0.0
} else {
(current_rank - 1) as f64 / (n - 1) as f64
};
out_vals[*idx] = Value::Float(pr);
prev_key = Some(okey.as_slice());
}
Ok(())
}
"cume_dist" => {
let n = slice.len();
for i in 0..slice.len() {
let peer_end = peer_group_end(slice, i);
#[allow(clippy::cast_precision_loss)]
let cd = (peer_end + 1) as f64 / n as f64;
let (_, _, idx) = &slice[i];
out_vals[*idx] = Value::Float(cd);
}
Ok(())
}
other => Err(EngineError::Unsupported(alloc::format!(
"window function {other:?} not supported (v4.21: row_number/rank/dense_rank/sum/avg/count/min/max/lag/lead/first_value/last_value/nth_value/ntile/percent_rank/cume_dist)"
))),
}
}
fn effective_frame(
frame: Option<&WindowFrame>,
ordered: bool,
) -> Result<(FrameKind, FrameBound, FrameBound), EngineError> {
match frame {
None => {
if ordered {
Ok((
FrameKind::Range,
FrameBound::UnboundedPreceding,
FrameBound::CurrentRow,
))
} else {
Ok((
FrameKind::Rows,
FrameBound::UnboundedPreceding,
FrameBound::UnboundedFollowing,
))
}
}
Some(fr) => {
let end = fr.end.clone().unwrap_or(FrameBound::CurrentRow);
if matches!(fr.start, FrameBound::UnboundedFollowing)
|| matches!(end, FrameBound::UnboundedPreceding)
{
return Err(EngineError::Unsupported(alloc::format!(
"invalid frame: start={:?} end={:?}",
fr.start,
end
)));
}
if fr.kind == FrameKind::Range
&& (matches!(
fr.start,
FrameBound::OffsetPreceding(_) | FrameBound::OffsetFollowing(_)
) || matches!(
end,
FrameBound::OffsetPreceding(_) | FrameBound::OffsetFollowing(_)
))
{
return Err(EngineError::Unsupported(
"RANGE with explicit offset bounds is not supported (v4.20: only UNBOUNDED / CURRENT ROW for RANGE)".into(),
));
}
Ok((fr.kind, fr.start.clone(), end))
}
}
}
#[allow(clippy::type_complexity)]
fn frame_bounds_for_row(
eff: &(FrameKind, FrameBound, FrameBound),
i: usize,
slice: &[(Vec<Value>, Vec<(Value, bool)>, usize)],
) -> (usize, usize) {
let (kind, start, end) = eff;
let n = slice.len();
let last = n.saturating_sub(1);
let (mut lo, mut hi) = match kind {
FrameKind::Rows => {
let lo = match start {
FrameBound::UnboundedPreceding => 0,
FrameBound::OffsetPreceding(k) => {
let k = usize::try_from(*k).unwrap_or(usize::MAX);
i.saturating_sub(k)
}
FrameBound::CurrentRow => i,
FrameBound::OffsetFollowing(k) => {
let k = usize::try_from(*k).unwrap_or(usize::MAX);
i.saturating_add(k).min(last)
}
FrameBound::UnboundedFollowing => last,
};
let hi = match end {
FrameBound::UnboundedPreceding => 0,
FrameBound::OffsetPreceding(k) => {
let k = usize::try_from(*k).unwrap_or(usize::MAX);
i.saturating_sub(k)
}
FrameBound::CurrentRow => i,
FrameBound::OffsetFollowing(k) => {
let k = usize::try_from(*k).unwrap_or(usize::MAX);
i.saturating_add(k).min(last)
}
FrameBound::UnboundedFollowing => last,
};
(lo, hi)
}
FrameKind::Range => {
let lo = match start {
FrameBound::UnboundedPreceding => 0,
FrameBound::CurrentRow => peer_group_start(slice, i),
FrameBound::UnboundedFollowing => last,
_ => unreachable!("offset bounds rejected for RANGE"),
};
let hi = match end {
FrameBound::UnboundedPreceding => 0,
FrameBound::CurrentRow => peer_group_end(slice, i),
FrameBound::UnboundedFollowing => last,
_ => unreachable!("offset bounds rejected for RANGE"),
};
(lo, hi)
}
};
if hi >= n {
hi = last;
}
if lo >= n {
lo = last;
}
(lo, hi)
}
#[allow(clippy::type_complexity)]
fn peer_group_start(slice: &[(Vec<Value>, Vec<(Value, bool)>, usize)], i: usize) -> usize {
let key = &slice[i].1;
let mut j = i;
while j > 0 && order_key_cmp(&slice[j - 1].1, key) == core::cmp::Ordering::Equal {
j -= 1;
}
j
}
#[allow(clippy::type_complexity)]
fn peer_group_end(slice: &[(Vec<Value>, Vec<(Value, bool)>, usize)], i: usize) -> usize {
let key = &slice[i].1;
let mut j = i;
while j + 1 < slice.len() && order_key_cmp(&slice[j + 1].1, key) == core::cmp::Ordering::Equal {
j += 1;
}
j
}
fn value_to_f64(v: &Value) -> Option<f64> {
match v {
Value::SmallInt(n) => Some(f64::from(*n)),
Value::Int(n) => Some(f64::from(*n)),
#[allow(clippy::cast_precision_loss)]
Value::BigInt(n) => Some(*n as f64),
Value::Float(x) => Some(*x),
_ => None,
}
}
fn expr_tree_has_subquery(stmt: &SelectStatement) -> bool {
let mut any = false;
for item in &stmt.items {
if let SelectItem::Expr { expr, .. } = item {
any = any || expr_has_subquery(expr);
}
}
if let Some(w) = &stmt.where_ {
any = any || expr_has_subquery(w);
}
if let Some(h) = &stmt.having {
any = any || expr_has_subquery(h);
}
for o in &stmt.order_by {
any = any || expr_has_subquery(&o.expr);
}
for (_, peer) in &stmt.unions {
any = any || expr_tree_has_subquery(peer);
}
any
}
fn expr_has_subquery(e: &Expr) -> bool {
match e {
Expr::ScalarSubquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. } => true,
Expr::Binary { lhs, rhs, .. } => expr_has_subquery(lhs) || expr_has_subquery(rhs),
Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
expr_has_subquery(expr)
}
Expr::FunctionCall { args, .. } => args.iter().any(expr_has_subquery),
Expr::Like { expr, pattern, .. } => expr_has_subquery(expr) || expr_has_subquery(pattern),
Expr::Extract { source, .. } => expr_has_subquery(source),
Expr::WindowFunction {
args,
partition_by,
order_by,
..
} => {
args.iter().any(expr_has_subquery)
|| partition_by.iter().any(expr_has_subquery)
|| order_by.iter().any(|(e, _)| expr_has_subquery(e))
}
Expr::Literal(_) | Expr::Placeholder(_) | Expr::Column(_) => false,
}
}
fn value_to_literal_expr(v: Value) -> Result<Expr, EngineError> {
let lit = match v {
Value::Null => Literal::Null,
Value::SmallInt(n) => Literal::Integer(i64::from(n)),
Value::Int(n) => Literal::Integer(i64::from(n)),
Value::BigInt(n) => Literal::Integer(n),
Value::Float(x) => Literal::Float(x),
Value::Text(s) | Value::Json(s) => Literal::String(s),
Value::Bool(b) => Literal::Bool(b),
other => {
return Err(EngineError::Unsupported(alloc::format!(
"subquery result type {:?} not yet materialisable; cast to text or integer in the inner SELECT",
other.data_type()
)));
}
};
Ok(Expr::Literal(lit))
}
fn substitute_placeholders(stmt: &mut Statement, params: &[Value]) -> Result<(), EngineError> {
match stmt {
Statement::Select(s) => substitute_select(s, params)?,
Statement::Insert(ins) => {
for row in &mut ins.rows {
for e in row {
substitute_expr(e, params)?;
}
}
}
Statement::Update(u) => {
for (_, e) in &mut u.assignments {
substitute_expr(e, params)?;
}
if let Some(w) = &mut u.where_ {
substitute_expr(w, params)?;
}
}
Statement::Delete(d) => {
if let Some(w) = &mut d.where_ {
substitute_expr(w, params)?;
}
}
Statement::Explain(e) => substitute_select(&mut e.inner, params)?,
_ => {}
}
Ok(())
}
fn substitute_select(
s: &mut SelectStatement,
params: &[Value],
) -> Result<(), EngineError> {
for item in &mut s.items {
if let SelectItem::Expr { expr, .. } = item {
substitute_expr(expr, params)?;
}
}
if let Some(w) = &mut s.where_ {
substitute_expr(w, params)?;
}
if let Some(gs) = &mut s.group_by {
for g in gs {
substitute_expr(g, params)?;
}
}
if let Some(h) = &mut s.having {
substitute_expr(h, params)?;
}
for o in &mut s.order_by {
substitute_expr(&mut o.expr, params)?;
}
for (_, peer) in &mut s.unions {
substitute_select(peer, params)?;
}
Ok(())
}
fn substitute_expr(e: &mut Expr, params: &[Value]) -> Result<(), EngineError> {
if let Expr::Placeholder(n) = e {
let idx = usize::from(*n).saturating_sub(1);
let v = params.get(idx).ok_or_else(|| {
EngineError::Eval(EvalError::PlaceholderOutOfRange {
n: *n,
bound: u16::try_from(params.len()).unwrap_or(u16::MAX),
})
})?;
*e = Expr::Literal(value_to_literal(v.clone()));
return Ok(());
}
match e {
Expr::Binary { lhs, rhs, .. } => {
substitute_expr(lhs, params)?;
substitute_expr(rhs, params)?;
}
Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
substitute_expr(expr, params)?;
}
Expr::FunctionCall { args, .. } => {
for a in args {
substitute_expr(a, params)?;
}
}
Expr::Like { expr, pattern, .. } => {
substitute_expr(expr, params)?;
substitute_expr(pattern, params)?;
}
Expr::Extract { source, .. } => substitute_expr(source, params)?,
Expr::ScalarSubquery(s) => substitute_select(s, params)?,
Expr::Exists { subquery, .. } => substitute_select(subquery, params)?,
Expr::InSubquery { expr, subquery, .. } => {
substitute_expr(expr, params)?;
substitute_select(subquery, params)?;
}
Expr::WindowFunction {
args,
partition_by,
order_by,
..
} => {
for a in args {
substitute_expr(a, params)?;
}
for p in partition_by {
substitute_expr(p, params)?;
}
for (e, _) in order_by {
substitute_expr(e, params)?;
}
}
Expr::Literal(_) | Expr::Column(_) => {}
Expr::Placeholder(_) => unreachable!("Placeholder handled at top of fn"),
}
Ok(())
}
fn sort_values_for_histogram(a: &Value, b: &Value) -> core::cmp::Ordering {
use core::cmp::Ordering;
match (a, b) {
(Value::SmallInt(a), Value::SmallInt(b)) => a.cmp(b),
(Value::Int(a), Value::Int(b)) => a.cmp(b),
(Value::BigInt(a), Value::BigInt(b)) => a.cmp(b),
(Value::SmallInt(a), Value::Int(b)) => i32::from(*a).cmp(b),
(Value::Int(a), Value::SmallInt(b)) => a.cmp(&i32::from(*b)),
(Value::Int(a), Value::BigInt(b)) => i64::from(*a).cmp(b),
(Value::BigInt(a), Value::Int(b)) => a.cmp(&i64::from(*b)),
(Value::SmallInt(a), Value::BigInt(b)) => i64::from(*a).cmp(b),
(Value::BigInt(a), Value::SmallInt(b)) => a.cmp(&i64::from(*b)),
(Value::Float(a), Value::Float(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
(Value::Text(a), Value::Text(b)) | (Value::Json(a), Value::Json(b)) => a.cmp(b),
(Value::Bool(a), Value::Bool(b)) => a.cmp(b),
(Value::Date(a), Value::Date(b)) => a.cmp(b),
(Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
(Value::SmallInt(n), Value::Float(x)) => {
(f64::from(*n)).partial_cmp(x).unwrap_or(Ordering::Equal)
}
(Value::Float(x), Value::SmallInt(n)) => {
x.partial_cmp(&f64::from(*n)).unwrap_or(Ordering::Equal)
}
(Value::Int(n), Value::Float(x)) => {
(f64::from(*n)).partial_cmp(x).unwrap_or(Ordering::Equal)
}
(Value::Float(x), Value::Int(n)) => {
x.partial_cmp(&f64::from(*n)).unwrap_or(Ordering::Equal)
}
(Value::BigInt(n), Value::Float(x)) => {
#[allow(clippy::cast_precision_loss)]
let nf = *n as f64;
nf.partial_cmp(x).unwrap_or(Ordering::Equal)
}
(Value::Float(x), Value::BigInt(n)) => {
#[allow(clippy::cast_precision_loss)]
let nf = *n as f64;
x.partial_cmp(&nf).unwrap_or(Ordering::Equal)
}
_ => canonical_value_repr(a).cmp(&canonical_value_repr(b)),
}
}
fn render_histogram_bounds(bounds: &[alloc::string::String]) -> alloc::string::String {
let mut out = alloc::string::String::with_capacity(bounds.len() * 8 + 2);
out.push('[');
for (i, b) in bounds.iter().enumerate() {
if i > 0 {
out.push_str(", ");
}
let needs_quote = b.contains([',', '[', ']', '"']) || b.is_empty();
if needs_quote {
out.push('"');
for ch in b.chars() {
if ch == '"' || ch == '\\' {
out.push('\\');
}
out.push(ch);
}
out.push('"');
} else {
out.push_str(b);
}
}
out.push(']');
out
}
pub(crate) fn canonical_value_repr(v: &Value) -> alloc::string::String {
match v {
Value::Null => "NULL".to_string(),
Value::SmallInt(n) => alloc::format!("{n}"),
Value::Int(n) => alloc::format!("{n}"),
Value::BigInt(n) => alloc::format!("{n}"),
Value::Float(x) => alloc::format!("{x:?}"),
Value::Text(s) | Value::Json(s) => s.clone(),
Value::Bool(b) => if *b { "t" } else { "f" }.to_string(),
Value::Date(d) => eval::format_date(*d),
Value::Timestamp(t) => eval::format_timestamp(*t),
Value::Interval { months, micros } => eval::format_interval(*months, *micros),
Value::Numeric { scaled, scale } => eval::format_numeric(*scaled, *scale),
Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_) => {
alloc::format!("{v:?}")
}
_ => alloc::format!("{v:?}"),
}
}
const fn is_internal_table_name(_name: &str) -> bool {
false
}
fn value_to_literal(v: Value) -> Literal {
match v {
Value::Null => Literal::Null,
Value::SmallInt(n) => Literal::Integer(i64::from(n)),
Value::Int(n) => Literal::Integer(i64::from(n)),
Value::BigInt(n) => Literal::Integer(n),
Value::Float(x) => Literal::Float(x),
Value::Text(s) | Value::Json(s) => Literal::String(s),
Value::Bool(b) => Literal::Bool(b),
Value::Vector(v) => Literal::Vector(v),
Value::Numeric { scaled, scale } => {
Literal::String(eval::format_numeric(scaled, scale))
}
Value::Date(d) => Literal::String(eval::format_date(d)),
Value::Timestamp(t) => Literal::String(eval::format_timestamp(t)),
Value::Interval { months, micros } => Literal::Interval {
months,
micros,
text: eval::format_interval(months, micros),
},
Value::Sq8Vector(q) => Literal::Vector(spg_storage::quantize::dequantize(&q)),
Value::HalfVector(h) => Literal::Vector(h.to_f32_vec()),
v => Literal::String(alloc::format!("{v:?}")),
}
}
fn rewrite_clock_calls(stmt: &mut Statement, now_micros: Option<i64>) {
let Some(now) = now_micros else {
return;
};
match stmt {
Statement::Select(s) => rewrite_select_clock(s, now),
Statement::Insert(ins) => {
for row in &mut ins.rows {
for e in row {
rewrite_expr_clock(e, now);
}
}
}
_ => {}
}
}
fn rewrite_select_clock(s: &mut SelectStatement, now: i64) {
for item in &mut s.items {
if let SelectItem::Expr { expr, .. } = item {
rewrite_expr_clock(expr, now);
}
}
if let Some(w) = &mut s.where_ {
rewrite_expr_clock(w, now);
}
if let Some(gs) = &mut s.group_by {
for g in gs {
rewrite_expr_clock(g, now);
}
}
if let Some(h) = &mut s.having {
rewrite_expr_clock(h, now);
}
for o in &mut s.order_by {
rewrite_expr_clock(&mut o.expr, now);
}
for (_, peer) in &mut s.unions {
rewrite_select_clock(peer, now);
}
}
fn rewrite_expr_clock(e: &mut Expr, now: i64) {
if let Some(replacement) = clock_replacement_for(e, now) {
*e = replacement;
return;
}
match e {
Expr::Binary { lhs, rhs, .. } => {
rewrite_expr_clock(lhs, now);
rewrite_expr_clock(rhs, now);
}
Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
rewrite_expr_clock(expr, now);
}
Expr::FunctionCall { args, .. } => {
for a in args {
rewrite_expr_clock(a, now);
}
}
Expr::Like { expr, pattern, .. } => {
rewrite_expr_clock(expr, now);
rewrite_expr_clock(pattern, now);
}
Expr::Extract { source, .. } => rewrite_expr_clock(source, now),
Expr::ScalarSubquery(s) => rewrite_select_clock(s, now),
Expr::Exists { subquery, .. } => rewrite_select_clock(subquery, now),
Expr::InSubquery { expr, subquery, .. } => {
rewrite_expr_clock(expr, now);
rewrite_select_clock(subquery, now);
}
Expr::WindowFunction {
args,
partition_by,
order_by,
..
} => {
for a in args {
rewrite_expr_clock(a, now);
}
for p in partition_by {
rewrite_expr_clock(p, now);
}
for (e, _) in order_by {
rewrite_expr_clock(e, now);
}
}
Expr::Literal(_) | Expr::Placeholder(_) | Expr::Column(_) => {}
}
}
fn clock_replacement_for(e: &Expr, now: i64) -> Option<Expr> {
let (kind, name) = match e {
Expr::FunctionCall { name, args } if args.is_empty() => (ClockSite::Fn, name.as_str()),
Expr::Column(c) if c.qualifier.is_none() => (ClockSite::BareIdent, c.name.as_str()),
_ => return None,
};
let matched = match name.len() {
3 if kind == ClockSite::Fn && name.eq_ignore_ascii_case("now") => Some(true),
12 if name.eq_ignore_ascii_case("current_date") => Some(false),
17 if name.eq_ignore_ascii_case("current_timestamp") => Some(true),
_ => None,
};
let is_timestamp = matched?;
let payload = if is_timestamp {
now
} else {
now.div_euclid(86_400_000_000)
};
let target = if is_timestamp {
spg_sql::ast::CastTarget::Timestamp
} else {
spg_sql::ast::CastTarget::Date
};
Some(Expr::Cast {
expr: alloc::boxed::Box::new(Expr::Literal(spg_sql::ast::Literal::Integer(payload))),
target,
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ClockSite {
Fn,
BareIdent,
}
fn expand_group_by_all(s: &mut SelectStatement) {
if !s.group_by_all {
for (_, peer) in &mut s.unions {
expand_group_by_all(peer);
}
return;
}
let mut groups: Vec<Expr> = Vec::new();
for item in &s.items {
if let SelectItem::Expr { expr, .. } = item
&& !aggregate::contains_aggregate(expr)
{
groups.push(expr.clone());
}
}
s.group_by = Some(groups);
s.group_by_all = false;
for (_, peer) in &mut s.unions {
expand_group_by_all(peer);
}
}
fn resolve_order_by_position(s: &mut SelectStatement) {
for order in &mut s.order_by {
match &order.expr {
Expr::Literal(Literal::Integer(n)) if *n >= 1 => {
if let Ok(idx_one_based) = usize::try_from(*n) {
let idx = idx_one_based - 1;
if idx < s.items.len()
&& let SelectItem::Expr { expr, .. } = &s.items[idx]
{
order.expr = expr.clone();
}
}
}
Expr::Column(c) if c.qualifier.is_none() => {
for item in &s.items {
if let SelectItem::Expr {
expr,
alias: Some(a),
} = item
&& a == &c.name
{
order.expr = expr.clone();
break;
}
}
}
_ => {}
}
}
for (_, peer) in &mut s.unions {
resolve_order_by_position(peer);
}
}
fn partial_sort_tagged(
tagged: &mut Vec<(Vec<f64>, Row)>,
keep: Option<usize>,
descs: &[bool],
) {
let cmp = |a: &(Vec<f64>, Row), b: &(Vec<f64>, Row)| cmp_multi_key(&a.0, &b.0, descs);
match keep {
Some(k) if k < tagged.len() && k > 0 => {
let pivot = k - 1;
tagged.select_nth_unstable_by(pivot, cmp);
tagged[..k].sort_by(cmp);
tagged.truncate(k);
}
_ => {
tagged.sort_by(cmp);
}
}
}
fn sort_by_keys(tagged: &mut [(Vec<f64>, Row)], descs: &[bool]) {
tagged.sort_by(|a, b| cmp_multi_key(&a.0, &b.0, descs));
}
fn cmp_multi_key(a: &[f64], b: &[f64], descs: &[bool]) -> core::cmp::Ordering {
use core::cmp::Ordering;
for (i, (ka, kb)) in a.iter().zip(b.iter()).enumerate() {
let ord = ka.partial_cmp(kb).unwrap_or(Ordering::Equal);
let ord = if descs.get(i).copied().unwrap_or(false) {
ord.reverse()
} else {
ord
};
if ord != Ordering::Equal {
return ord;
}
}
Ordering::Equal
}
fn build_order_keys(
order_by: &[OrderBy],
row: &Row,
ctx: &EvalContext,
) -> Result<Vec<f64>, EngineError> {
let mut keys = Vec::with_capacity(order_by.len());
for o in order_by {
let v = eval::eval_expr(&o.expr, row, ctx)?;
keys.push(value_to_order_key(&v)?);
}
Ok(keys)
}
fn apply_offset_and_limit(rows: &mut Vec<Row>, offset: Option<u32>, limit: Option<u32>) {
if let Some(off) = offset {
let off = off as usize;
if off >= rows.len() {
rows.clear();
} else {
rows.drain(..off);
}
}
if let Some(n) = limit {
rows.truncate(n as usize);
}
}
fn resolve_foreign_key(
local_table_name: &str,
local_cols: &[ColumnSchema],
fk: spg_sql::ast::ForeignKeyConstraint,
catalog: &Catalog,
) -> Result<spg_storage::ForeignKeyConstraint, EngineError> {
let mut local_columns = Vec::with_capacity(fk.columns.len());
for name in &fk.columns {
let pos = local_cols
.iter()
.position(|c| c.name == *name)
.ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"FOREIGN KEY references unknown local column {name:?}"
))
})?;
local_columns.push(pos);
}
let is_self_ref = fk.parent_table == local_table_name;
let (parent_cols_for_lookup, parent_table_str): (&[ColumnSchema], &str) = if is_self_ref {
(local_cols, local_table_name)
} else {
let parent_table = catalog.get(&fk.parent_table).ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: fk.parent_table.clone(),
})
})?;
(parent_table.schema().columns.as_slice(), fk.parent_table.as_str())
};
let parent_columns: Vec<usize> = if fk.parent_columns.is_empty() {
if fk.columns.len() != 1 {
return Err(EngineError::Unsupported(
"composite FOREIGN KEY without explicit parent column list is not supported \
— list the parent columns explicitly"
.into(),
));
}
let pos = pick_pk_index_column(catalog, parent_table_str, is_self_ref, local_cols)
.ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"parent table {parent_table_str:?} has no PRIMARY-key / UNIQUE BTree index \
to default the FOREIGN KEY against"
))
})?;
alloc::vec![pos]
} else {
let mut out = Vec::with_capacity(fk.parent_columns.len());
for name in &fk.parent_columns {
let pos = parent_cols_for_lookup
.iter()
.position(|c| c.name == *name)
.ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"FOREIGN KEY references unknown parent column \
{name:?} on table {parent_table_str:?}"
))
})?;
out.push(pos);
}
out
};
if parent_columns.len() != local_columns.len() {
return Err(EngineError::Unsupported(alloc::format!(
"FOREIGN KEY arity mismatch: {} local columns vs {} parent columns",
local_columns.len(),
parent_columns.len()
)));
}
if !is_self_ref {
let parent_table = catalog
.get(&fk.parent_table)
.expect("checked above");
let primary_parent_col = parent_columns[0];
let has_btree = parent_table.schema().columns.get(primary_parent_col).is_some()
&& parent_table
.indices()
.iter()
.any(|idx| {
matches!(idx.kind, spg_storage::IndexKind::BTree(_))
&& idx.column_position == primary_parent_col
&& idx.partial_predicate.is_none()
});
if !has_btree {
return Err(EngineError::Unsupported(alloc::format!(
"FOREIGN KEY parent column on {:?} is not covered by an unconditional BTree \
index — create one with `CREATE INDEX ... ON {} ({})` first",
parent_table_str,
parent_table_str,
parent_table.schema().columns[primary_parent_col].name,
)));
}
}
let on_delete = fk_action_sql_to_storage(fk.on_delete);
let on_update = fk_action_sql_to_storage(fk.on_update);
Ok(spg_storage::ForeignKeyConstraint {
name: fk.name,
local_columns,
parent_table: fk.parent_table,
parent_columns,
on_delete,
on_update,
})
}
fn pick_pk_index_column(
catalog: &Catalog,
parent_name: &str,
is_self_ref: bool,
local_cols: &[ColumnSchema],
) -> Option<usize> {
if is_self_ref {
let _ = local_cols;
return Some(0);
}
let parent = catalog.get(parent_name)?;
parent.indices().iter().find_map(|idx| {
if matches!(idx.kind, spg_storage::IndexKind::BTree(_))
&& idx.partial_predicate.is_none()
&& idx.included_columns.is_empty()
&& idx.expression.is_none()
{
Some(idx.column_position)
} else {
None
}
})
}
fn resolve_on_conflict_columns(
catalog: &Catalog,
table_name: &str,
target: &[String],
) -> Result<Vec<usize>, EngineError> {
let table = catalog.get(table_name).ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: table_name.into(),
})
})?;
if target.is_empty() {
let pos = table
.indices()
.iter()
.find_map(|idx| {
if matches!(idx.kind, spg_storage::IndexKind::BTree(_))
&& idx.partial_predicate.is_none()
&& idx.included_columns.is_empty()
&& idx.expression.is_none()
{
Some(idx.column_position)
} else {
None
}
})
.ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"ON CONFLICT without target requires a UNIQUE BTree index on {table_name:?}"
))
})?;
return Ok(alloc::vec![pos]);
}
let mut out = Vec::with_capacity(target.len());
for name in target {
let pos = table
.schema()
.columns
.iter()
.position(|c| c.name == *name)
.ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"ON CONFLICT target column {name:?} not found on {table_name:?}"
))
})?;
out.push(pos);
}
Ok(out)
}
fn on_conflict_key_exists(
catalog: &Catalog,
table_name: &str,
column_pos: usize,
key: &Value,
) -> bool {
let Some(table) = catalog.get(table_name) else {
return false;
};
let Some(idx_key) = spg_storage::IndexKey::from_value(key) else {
return false;
};
table.indices().iter().any(|idx| {
matches!(idx.kind, spg_storage::IndexKind::BTree(_))
&& idx.column_position == column_pos
&& idx.partial_predicate.is_none()
&& !idx.lookup_eq(&idx_key).is_empty()
})
}
fn lookup_row_position_by_keys(
catalog: &Catalog,
table_name: &str,
column_positions: &[usize],
key: &[&Value],
) -> Option<usize> {
let table = catalog.get(table_name)?;
table.rows().iter().position(|r| {
column_positions
.iter()
.enumerate()
.all(|(i, &pos)| r.values.get(pos) == Some(key[i]))
})
}
fn on_conflict_keys_exist(
catalog: &Catalog,
table_name: &str,
column_positions: &[usize],
key: &[&Value],
) -> bool {
if column_positions.len() == 1 {
return on_conflict_key_exists(
catalog,
table_name,
column_positions[0],
key[0],
);
}
let Some(table) = catalog.get(table_name) else {
return false;
};
table.rows().iter().any(|r| {
column_positions
.iter()
.enumerate()
.all(|(i, &pos)| r.values.get(pos) == Some(key[i]))
})
}
fn apply_on_conflict_assignments(
catalog: &Catalog,
table_name: &str,
target_pos: usize,
incoming: &[Value],
assignments: &[(String, Expr)],
where_: Option<&Expr>,
) -> Result<Option<Vec<Value>>, EngineError> {
let table = catalog.get(table_name).ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: table_name.into(),
})
})?;
let schema_cols = table.schema().columns.clone();
let existing = table
.rows()
.get(target_pos)
.ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"ON CONFLICT DO UPDATE: row position {target_pos} out of bounds on {table_name:?}"
))
})?
.clone();
let ctx = eval::EvalContext::new(&schema_cols, Some(table_name));
if let Some(w) = where_ {
let pred = w.clone();
let pred = substitute_excluded_refs(pred, &schema_cols, incoming);
let v = eval::eval_expr(&pred, &existing, &ctx)?;
if !matches!(v, Value::Bool(true)) {
return Ok(None);
}
}
let mut new_values = existing.values.clone();
for (col_name, expr) in assignments {
let target_idx = schema_cols
.iter()
.position(|c| c.name == *col_name)
.ok_or_else(|| {
EngineError::Eval(EvalError::ColumnNotFound {
name: col_name.clone(),
})
})?;
let sub = substitute_excluded_refs(expr.clone(), &schema_cols, incoming);
let v = eval::eval_expr(&sub, &existing, &ctx)?;
new_values[target_idx] =
coerce_value(v, schema_cols[target_idx].ty, col_name, target_idx)?;
}
Ok(Some(new_values))
}
fn substitute_excluded_refs(
expr: Expr,
schema_cols: &[ColumnSchema],
incoming: &[Value],
) -> Expr {
use spg_sql::ast::ColumnName;
match expr {
Expr::Column(ColumnName { qualifier, name })
if qualifier
.as_deref()
.is_some_and(|q| q.eq_ignore_ascii_case("excluded")) =>
{
let pos = schema_cols.iter().position(|c| c.name == name);
match pos {
Some(p) => {
let v = incoming.get(p).cloned().unwrap_or(Value::Null);
value_to_literal_expr(v).unwrap_or_else(|_| {
Expr::Literal(spg_sql::ast::Literal::Null)
})
}
None => Expr::Column(ColumnName { qualifier, name }),
}
}
Expr::Binary { op, lhs, rhs } => Expr::Binary {
op,
lhs: Box::new(substitute_excluded_refs(*lhs, schema_cols, incoming)),
rhs: Box::new(substitute_excluded_refs(*rhs, schema_cols, incoming)),
},
Expr::Unary { op, expr } => Expr::Unary {
op,
expr: Box::new(substitute_excluded_refs(*expr, schema_cols, incoming)),
},
Expr::FunctionCall { name, args } => Expr::FunctionCall {
name,
args: args
.into_iter()
.map(|a| substitute_excluded_refs(a, schema_cols, incoming))
.collect(),
},
other => other,
}
}
fn enforce_uniqueness_inserts(
catalog: &Catalog,
child_table: &str,
constraints: &[spg_storage::UniquenessConstraint],
rows: &[Vec<Value>],
) -> Result<(), EngineError> {
if constraints.is_empty() {
return Ok(());
}
let table = catalog.get(child_table).ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: child_table.into(),
})
})?;
for uc in constraints {
for (batch_idx, row_values) in rows.iter().enumerate() {
let key: Vec<&Value> = uc.columns.iter().map(|&i| &row_values[i]).collect();
let has_null = key.iter().any(|v| matches!(v, Value::Null));
if has_null {
continue;
}
let collides_in_table = table.rows().iter().any(|prow| {
uc.columns
.iter()
.enumerate()
.all(|(i, &p)| prow.values.get(p) == Some(key[i]))
});
let collides_in_batch = rows[..batch_idx].iter().any(|earlier| {
uc.columns
.iter()
.enumerate()
.all(|(i, &p)| earlier.get(p) == Some(key[i]))
});
if collides_in_table || collides_in_batch {
let kind = if uc.is_primary_key { "PRIMARY KEY" } else { "UNIQUE" };
let col_names: Vec<String> = uc
.columns
.iter()
.map(|&i| table.schema().columns[i].name.clone())
.collect();
return Err(EngineError::Unsupported(alloc::format!(
"{kind} violation on {child_table:?} columns {col_names:?}: \
row #{batch_idx} duplicates an existing key"
)));
}
}
}
Ok(())
}
fn enforce_fk_inserts(
catalog: &Catalog,
child_table: &str,
fks: &[spg_storage::ForeignKeyConstraint],
rows: &[Vec<Value>],
) -> Result<(), EngineError> {
for fk in fks {
let parent_is_self = fk.parent_table == child_table;
let parent = if parent_is_self {
catalog.get(child_table).ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: child_table.into(),
})
})?
} else {
catalog.get(&fk.parent_table).ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: fk.parent_table.clone(),
})
})?
};
for (batch_idx, row_values) in rows.iter().enumerate() {
if fk.local_columns.len() == 1 {
let v = &row_values[fk.local_columns[0]];
if matches!(v, Value::Null) {
continue;
}
let parent_col = fk.parent_columns[0];
let key = spg_storage::IndexKey::from_value(v).ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"FOREIGN KEY column value of type {:?} is not index-eligible",
v.data_type()
))
})?;
let present_committed = parent.indices().iter().any(|idx| {
matches!(idx.kind, spg_storage::IndexKind::BTree(_))
&& idx.column_position == parent_col
&& idx.partial_predicate.is_none()
&& !idx.lookup_eq(&key).is_empty()
});
let present_in_batch = parent_is_self
&& rows[..batch_idx].iter().any(|earlier| {
earlier.get(parent_col) == Some(v)
});
if !(present_committed || present_in_batch) {
return Err(EngineError::Unsupported(alloc::format!(
"FOREIGN KEY violation: no parent row in {:?} where {} = {:?}",
fk.parent_table,
parent
.schema()
.columns
.get(parent_col)
.map_or("?", |c| c.name.as_str()),
v,
)));
}
} else {
if fk.local_columns
.iter()
.all(|&i| matches!(row_values.get(i), Some(Value::Null)))
{
continue;
}
let local: Vec<&Value> = fk.local_columns.iter().map(|&i| &row_values[i]).collect();
let parent_match_committed = parent.rows().iter().any(|prow| {
fk.parent_columns
.iter()
.enumerate()
.all(|(i, &pi)| prow.values.get(pi) == Some(local[i]))
});
let parent_match_in_batch = parent_is_self
&& rows[..batch_idx].iter().any(|earlier| {
fk.parent_columns
.iter()
.enumerate()
.all(|(i, &pi)| earlier.get(pi) == Some(local[i]))
});
if !(parent_match_committed || parent_match_in_batch) {
return Err(EngineError::Unsupported(alloc::format!(
"FOREIGN KEY violation: no parent row in {:?} matching composite key",
fk.parent_table,
)));
}
}
}
}
Ok(())
}
#[derive(Debug, Clone)]
struct FkChildStep {
child_table: String,
action: FkChildAction,
}
#[derive(Debug, Clone)]
enum FkChildAction {
Delete { positions: Vec<usize> },
SetNull {
positions: Vec<usize>,
columns: Vec<usize>,
},
SetDefault {
positions: Vec<usize>,
columns: Vec<usize>,
defaults: Vec<Value>,
},
}
fn plan_fk_parent_deletions(
catalog: &Catalog,
parent_table_name: &str,
to_delete_positions: &[usize],
to_delete_rows: &[Vec<Value>],
) -> Result<Vec<FkChildStep>, EngineError> {
use alloc::collections::{BTreeMap, BTreeSet};
if to_delete_rows.is_empty() {
return Ok(Vec::new());
}
let mut delete_plan: BTreeMap<String, BTreeSet<usize>> = BTreeMap::new();
let mut setnull_plan: BTreeMap<String, BTreeSet<(usize, usize)>> = BTreeMap::new();
let mut setdefault_plan: BTreeMap<String, BTreeMap<(usize, usize), Value>> =
BTreeMap::new();
let mut visited: BTreeSet<(String, usize)> = BTreeSet::new();
for &p in to_delete_positions {
visited.insert((parent_table_name.to_string(), p));
}
let mut work: Vec<(String, Vec<Value>)> = to_delete_rows
.iter()
.map(|r| (parent_table_name.to_string(), r.clone()))
.collect();
while let Some((cur_parent, parent_row)) = work.pop() {
for child_name in catalog.table_names() {
let child = catalog
.get(&child_name)
.expect("table_names → catalog.get round-trip is total");
for fk in &child.schema().foreign_keys {
if fk.parent_table != cur_parent {
continue;
}
let parent_key: Vec<&Value> = fk
.parent_columns
.iter()
.map(|&pi| &parent_row[pi])
.collect();
if parent_key.iter().any(|v| matches!(v, Value::Null)) {
continue;
}
for (child_row_idx, child_row) in child.rows().iter().enumerate() {
if child_name == cur_parent
&& visited.contains(&(child_name.clone(), child_row_idx))
{
continue;
}
let matches_key = fk
.local_columns
.iter()
.enumerate()
.all(|(i, &li)| child_row.values.get(li) == Some(parent_key[i]));
if !matches_key {
continue;
}
match fk.on_delete {
spg_storage::FkAction::Restrict
| spg_storage::FkAction::NoAction => {
return Err(EngineError::Unsupported(alloc::format!(
"FOREIGN KEY violation: DELETE on {cur_parent:?} is \
restricted by FK from {child_name:?}.{:?}",
fk.local_columns,
)));
}
spg_storage::FkAction::Cascade => {
if visited.insert((child_name.clone(), child_row_idx)) {
delete_plan
.entry(child_name.clone())
.or_default()
.insert(child_row_idx);
work.push((child_name.clone(), child_row.values.clone()));
}
}
spg_storage::FkAction::SetNull => {
for &li in &fk.local_columns {
let col = child.schema().columns.get(li).ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"FK local column {li} missing in {child_name:?}"
))
})?;
if !col.nullable {
return Err(EngineError::Unsupported(alloc::format!(
"FOREIGN KEY ON DELETE SET NULL: column \
{child_name:?}.{:?} is NOT NULL — cannot SET NULL",
col.name,
)));
}
}
let entry = setnull_plan.entry(child_name.clone()).or_default();
for &li in &fk.local_columns {
entry.insert((child_row_idx, li));
}
}
spg_storage::FkAction::SetDefault => {
let entry =
setdefault_plan.entry(child_name.clone()).or_default();
for &li in &fk.local_columns {
let col = child.schema().columns.get(li).ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"FK local column {li} missing in {child_name:?}"
))
})?;
let default = col.default.clone().ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"FOREIGN KEY ON DELETE SET DEFAULT: column \
{child_name:?}.{:?} has no DEFAULT declared",
col.name,
))
})?;
entry.insert((child_row_idx, li), default);
}
}
}
}
}
}
}
let mut steps: Vec<FkChildStep> = Vec::new();
for (child_table, entries) in setnull_plan {
let (positions, columns): (Vec<usize>, Vec<usize>) = entries.into_iter().unzip();
steps.push(FkChildStep {
child_table,
action: FkChildAction::SetNull { positions, columns },
});
}
for (child_table, entries) in setdefault_plan {
let mut positions = Vec::with_capacity(entries.len());
let mut columns = Vec::with_capacity(entries.len());
let mut defaults = Vec::with_capacity(entries.len());
for ((p, c), v) in entries {
positions.push(p);
columns.push(c);
defaults.push(v);
}
steps.push(FkChildStep {
child_table,
action: FkChildAction::SetDefault {
positions,
columns,
defaults,
},
});
}
for (child_table, positions) in delete_plan {
steps.push(FkChildStep {
child_table,
action: FkChildAction::Delete {
positions: positions.into_iter().collect(),
},
});
}
Ok(steps)
}
fn plan_fk_parent_updates(
catalog: &Catalog,
parent_table_name: &str,
plan_with_old: &[(usize, Vec<Value>, Vec<Value>)],
) -> Result<Vec<FkChildStep>, EngineError> {
use alloc::collections::BTreeMap;
if plan_with_old.is_empty() {
return Ok(Vec::new());
}
let delete_plan: BTreeMap<String, alloc::collections::BTreeSet<usize>> = BTreeMap::new();
let mut setnull_plan: BTreeMap<
String,
alloc::collections::BTreeSet<(usize, usize)>,
> = BTreeMap::new();
let mut setdefault_plan: BTreeMap<String, BTreeMap<(usize, usize), Value>> =
BTreeMap::new();
let mut cascade_plan: BTreeMap<String, BTreeMap<(usize, usize), Value>> = BTreeMap::new();
for child_name in catalog.table_names() {
let child = catalog
.get(&child_name)
.expect("table_names → catalog.get total");
for fk in &child.schema().foreign_keys {
if fk.parent_table != parent_table_name {
continue;
}
for (_pos, old_row, new_row) in plan_with_old {
let key_changed = fk
.parent_columns
.iter()
.any(|&pi| old_row.get(pi) != new_row.get(pi));
if !key_changed {
continue;
}
let old_key: Vec<&Value> = fk
.parent_columns
.iter()
.map(|&pi| &old_row[pi])
.collect();
if old_key.iter().any(|v| matches!(v, Value::Null)) {
continue;
}
let new_key: Vec<&Value> = fk
.parent_columns
.iter()
.map(|&pi| &new_row[pi])
.collect();
for (child_row_idx, child_row) in child.rows().iter().enumerate() {
if child_name == parent_table_name
&& plan_with_old
.iter()
.any(|(p, _, _)| *p == child_row_idx)
{
continue;
}
let matches_key = fk
.local_columns
.iter()
.enumerate()
.all(|(i, &li)| child_row.values.get(li) == Some(old_key[i]));
if !matches_key {
continue;
}
match fk.on_update {
spg_storage::FkAction::Restrict
| spg_storage::FkAction::NoAction => {
return Err(EngineError::Unsupported(alloc::format!(
"FOREIGN KEY violation: UPDATE on {parent_table_name:?} PK is \
restricted by FK from {child_name:?}.{:?}",
fk.local_columns,
)));
}
spg_storage::FkAction::Cascade => {
let entry = cascade_plan.entry(child_name.clone()).or_default();
for (i, &li) in fk.local_columns.iter().enumerate() {
entry.insert((child_row_idx, li), new_key[i].clone());
}
}
spg_storage::FkAction::SetNull => {
for &li in &fk.local_columns {
let col = child.schema().columns.get(li).ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"FK local column {li} missing in {child_name:?}"
))
})?;
if !col.nullable {
return Err(EngineError::Unsupported(alloc::format!(
"FOREIGN KEY ON UPDATE SET NULL: column \
{child_name:?}.{:?} is NOT NULL",
col.name,
)));
}
}
let entry = setnull_plan.entry(child_name.clone()).or_default();
for &li in &fk.local_columns {
entry.insert((child_row_idx, li));
}
}
spg_storage::FkAction::SetDefault => {
let entry =
setdefault_plan.entry(child_name.clone()).or_default();
for &li in &fk.local_columns {
let col = child.schema().columns.get(li).ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"FK local column {li} missing in {child_name:?}"
))
})?;
let default = col.default.clone().ok_or_else(|| {
EngineError::Unsupported(alloc::format!(
"FOREIGN KEY ON UPDATE SET DEFAULT: column \
{child_name:?}.{:?} has no DEFAULT",
col.name,
))
})?;
entry.insert((child_row_idx, li), default);
}
}
}
}
}
}
}
let mut steps: Vec<FkChildStep> = Vec::new();
for (child_table, entries) in cascade_plan {
let mut positions = Vec::with_capacity(entries.len());
let mut columns = Vec::with_capacity(entries.len());
let mut defaults = Vec::with_capacity(entries.len());
for ((p, c), v) in entries {
positions.push(p);
columns.push(c);
defaults.push(v);
}
steps.push(FkChildStep {
child_table,
action: FkChildAction::SetDefault {
positions,
columns,
defaults,
},
});
}
for (child_table, entries) in setnull_plan {
let (positions, columns): (Vec<usize>, Vec<usize>) = entries.into_iter().unzip();
steps.push(FkChildStep {
child_table,
action: FkChildAction::SetNull { positions, columns },
});
}
for (child_table, entries) in setdefault_plan {
let mut positions = Vec::with_capacity(entries.len());
let mut columns = Vec::with_capacity(entries.len());
let mut defaults = Vec::with_capacity(entries.len());
for ((p, c), v) in entries {
positions.push(p);
columns.push(c);
defaults.push(v);
}
steps.push(FkChildStep {
child_table,
action: FkChildAction::SetDefault {
positions,
columns,
defaults,
},
});
}
let _ = delete_plan; Ok(steps)
}
fn apply_fk_child_step(
catalog: &mut Catalog,
step: &FkChildStep,
) -> Result<(), EngineError> {
let child = catalog.get_mut(&step.child_table).ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: step.child_table.clone(),
})
})?;
match &step.action {
FkChildAction::Delete { positions } => {
let _ = child.delete_rows(positions);
}
FkChildAction::SetNull { positions, columns } => {
apply_per_cell_writes(child, positions, columns, |_| Value::Null)?;
}
FkChildAction::SetDefault {
positions,
columns,
defaults,
} => {
apply_per_cell_writes(child, positions, columns, |i| defaults[i].clone())?;
}
}
Ok(())
}
fn apply_per_cell_writes(
child: &mut spg_storage::Table,
positions: &[usize],
columns: &[usize],
mut value_for: impl FnMut(usize) -> Value,
) -> Result<(), EngineError> {
use alloc::collections::BTreeMap;
let mut by_row: BTreeMap<usize, Vec<(usize, Value)>> = BTreeMap::new();
for i in 0..positions.len() {
by_row
.entry(positions[i])
.or_default()
.push((columns[i], value_for(i)));
}
for (pos, mutations) in by_row {
let mut new_values = child.rows()[pos].values.clone();
for (col, v) in mutations {
if let Some(slot) = new_values.get_mut(col) {
*slot = v;
}
}
child
.update_row(pos, new_values)
.map_err(EngineError::Storage)?;
}
Ok(())
}
fn fk_action_sql_to_storage(a: spg_sql::ast::FkAction) -> spg_storage::FkAction {
match a {
spg_sql::ast::FkAction::Restrict => spg_storage::FkAction::Restrict,
spg_sql::ast::FkAction::Cascade => spg_storage::FkAction::Cascade,
spg_sql::ast::FkAction::SetNull => spg_storage::FkAction::SetNull,
spg_sql::ast::FkAction::SetDefault => spg_storage::FkAction::SetDefault,
spg_sql::ast::FkAction::NoAction => spg_storage::FkAction::NoAction,
}
}
fn resolve_column_default_free(
col: &ColumnSchema,
clock_fn: Option<ClockFn>,
) -> Result<Value, EngineError> {
if let Some(rt) = &col.runtime_default {
return eval_runtime_default_free(rt, col.ty, clock_fn);
}
Ok(col.default.clone().unwrap_or(Value::Null))
}
fn eval_runtime_default_free(
rt: &str,
ty: DataType,
clock_fn: Option<ClockFn>,
) -> Result<Value, EngineError> {
let s = rt.trim().to_ascii_lowercase();
let canonical = s.trim_end_matches("()");
let now_us = match clock_fn {
Some(f) => f(),
None => 0,
};
let v = match canonical {
"now" | "current_timestamp" | "localtimestamp" => {
Value::Timestamp(now_us)
}
"current_date" => Value::Date((now_us / 86_400_000_000) as i32),
"current_time" | "localtime" => Value::Timestamp(now_us),
other => {
return Err(EngineError::Unsupported(alloc::format!(
"runtime DEFAULT expression {other:?} not supported \
(v7.9.21 whitelist: now() / current_timestamp / \
current_date / current_time / localtimestamp / \
localtime)"
)));
}
};
coerce_value(v, ty, "DEFAULT", 0)
}
fn is_runtime_default_expr(expr: &Expr) -> bool {
match expr {
Expr::FunctionCall { .. } => true,
Expr::Unary { expr, .. } => is_runtime_default_expr(expr),
_ => false,
}
}
fn column_def_to_schema(c: ColumnDef) -> Result<ColumnSchema, EngineError> {
let ty = column_type_to_data_type(c.ty);
let mut schema = ColumnSchema::new(c.name.clone(), ty, c.nullable);
if let Some(default_expr) = c.default {
if is_runtime_default_expr(&default_expr) {
let display = alloc::format!("{default_expr}");
schema = schema.with_runtime_default(display);
} else {
let raw = literal_expr_to_value(default_expr)?;
let coerced = coerce_value(raw, ty, &c.name, 0)?;
schema = schema.with_default(coerced);
}
}
if c.auto_increment {
if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
return Err(EngineError::Unsupported(alloc::format!(
"AUTO_INCREMENT requires an integer column type, got {ty:?}"
)));
}
schema = schema.with_auto_increment();
}
Ok(schema)
}
const fn column_type_to_data_type(t: ColumnTypeName) -> DataType {
match t {
ColumnTypeName::SmallInt => DataType::SmallInt,
ColumnTypeName::Int => DataType::Int,
ColumnTypeName::BigInt => DataType::BigInt,
ColumnTypeName::Float => DataType::Float,
ColumnTypeName::Text => DataType::Text,
ColumnTypeName::Varchar(n) => DataType::Varchar(n),
ColumnTypeName::Char(n) => DataType::Char(n),
ColumnTypeName::Bool => DataType::Bool,
ColumnTypeName::Vector { dim, encoding } => DataType::Vector {
dim,
encoding: match encoding {
SqlVecEncoding::F32 => VecEncoding::F32,
SqlVecEncoding::Sq8 => VecEncoding::Sq8,
SqlVecEncoding::F16 => VecEncoding::F16,
},
},
ColumnTypeName::Numeric(precision, scale) => DataType::Numeric { precision, scale },
ColumnTypeName::Date => DataType::Date,
ColumnTypeName::Timestamp => DataType::Timestamp,
ColumnTypeName::Timestamptz => DataType::Timestamptz,
ColumnTypeName::Json => DataType::Json,
ColumnTypeName::Jsonb => DataType::Jsonb,
}
}
fn literal_expr_to_value(expr: Expr) -> Result<Value, EngineError> {
match expr {
Expr::Literal(l) => Ok(literal_to_value(l)),
Expr::Cast { expr, target } => {
let inner_value = literal_expr_to_value(*expr)?;
crate::eval::cast_value(inner_value, target).map_err(EngineError::Eval)
}
Expr::Unary {
op: UnOp::Neg,
expr,
} => match *expr {
Expr::Literal(Literal::Integer(n)) => {
let neg = n.checked_neg().ok_or_else(|| {
EngineError::Unsupported("integer literal overflow on negation".into())
})?;
Ok(int_value_for(neg))
}
Expr::Literal(Literal::Float(x)) => Ok(Value::Float(-x)),
other => Err(EngineError::Unsupported(alloc::format!(
"unary minus over non-literal expression: {other:?}"
))),
},
other => Err(EngineError::Unsupported(alloc::format!(
"non-literal INSERT value expression: {other:?}"
))),
}
}
fn literal_to_value(l: Literal) -> Value {
match l {
Literal::Integer(n) => int_value_for(n),
Literal::Float(x) => Value::Float(x),
Literal::String(s) => Value::Text(s),
Literal::Bool(b) => Value::Bool(b),
Literal::Null => Value::Null,
Literal::Vector(v) => Value::Vector(v),
Literal::Interval { months, micros, .. } => Value::Interval { months, micros },
}
}
fn int_value_for(n: i64) -> Value {
if let Ok(small) = i32::try_from(n) {
Value::Int(small)
} else {
Value::BigInt(n)
}
}
#[allow(clippy::too_many_lines)]
fn coerce_value(
v: Value,
expected: DataType,
col_name: &str,
position: usize,
) -> Result<Value, EngineError> {
if v.is_null() {
return Ok(Value::Null);
}
let actual = v.data_type().expect("non-null");
if actual == expected {
return Ok(v);
}
let coerced =
match (v, expected) {
(Value::Int(n), DataType::BigInt) => Some(Value::BigInt(i64::from(n))),
(Value::Int(n), DataType::Float) => Some(Value::Float(f64::from(n))),
(Value::Int(n), DataType::SmallInt) => i16::try_from(n).ok().map(Value::SmallInt),
(Value::Int(n), DataType::Numeric { precision, scale }) => Some(numeric_from_integer(
i128::from(n),
precision,
scale,
col_name,
)?),
(Value::SmallInt(n), DataType::Int) => Some(Value::Int(i32::from(n))),
(Value::SmallInt(n), DataType::BigInt) => Some(Value::BigInt(i64::from(n))),
(Value::SmallInt(n), DataType::Float) => Some(Value::Float(f64::from(n))),
(Value::SmallInt(n), DataType::Numeric { precision, scale }) => Some(
numeric_from_integer(i128::from(n), precision, scale, col_name)?,
),
(Value::BigInt(n), DataType::Int) => i32::try_from(n).ok().map(Value::Int),
(Value::BigInt(n), DataType::SmallInt) => i16::try_from(n).ok().map(Value::SmallInt),
#[allow(clippy::cast_precision_loss)]
(Value::BigInt(n), DataType::Float) => Some(Value::Float(n as f64)),
(Value::BigInt(n), DataType::Numeric { precision, scale }) => Some(
numeric_from_integer(i128::from(n), precision, scale, col_name)?,
),
(Value::Float(x), DataType::Numeric { precision, scale }) => {
Some(numeric_from_float(x, precision, scale, col_name)?)
}
(Value::Text(s), DataType::Date) => {
let d = eval::parse_date_literal(&s).ok_or_else(|| {
EngineError::Eval(EvalError::TypeMismatch {
detail: alloc::format!(
"cannot parse {s:?} as DATE for column `{col_name}`"
),
})
})?;
Some(Value::Date(d))
}
(Value::Text(s), DataType::Json | DataType::Jsonb) => Some(Value::Json(s)),
(Value::Json(s), DataType::Text) => Some(Value::Text(s)),
(Value::Text(s), DataType::Timestamp | DataType::Timestamptz) => {
let t = eval::parse_timestamp_literal(&s).ok_or_else(|| {
EngineError::Eval(EvalError::TypeMismatch {
detail: alloc::format!(
"cannot parse {s:?} as TIMESTAMP for column `{col_name}`"
),
})
})?;
Some(Value::Timestamp(t))
}
(Value::Date(d), DataType::Timestamp | DataType::Timestamptz) => {
Some(Value::Timestamp(i64::from(d) * 86_400_000_000))
}
(Value::Timestamp(t), DataType::Timestamptz) => Some(Value::Timestamp(t)),
(Value::Timestamp(t), DataType::Date) => {
let days = t.div_euclid(86_400_000_000);
i32::try_from(days).ok().map(Value::Date)
}
(
Value::Numeric {
scaled,
scale: src_scale,
},
DataType::Numeric { precision, scale },
) => Some(numeric_rescale(
scaled, src_scale, precision, scale, col_name,
)?),
#[allow(clippy::cast_precision_loss)]
(Value::Numeric { scaled, scale }, DataType::Float) => {
let mut div = 1.0_f64;
for _ in 0..scale {
div *= 10.0;
}
Some(Value::Float((scaled as f64) / div))
}
(Value::Numeric { scaled, scale }, DataType::Int) => {
let truncated = numeric_truncate_to_integer(scaled, scale);
i32::try_from(truncated).ok().map(Value::Int)
}
(Value::Numeric { scaled, scale }, DataType::BigInt) => {
let truncated = numeric_truncate_to_integer(scaled, scale);
i64::try_from(truncated).ok().map(Value::BigInt)
}
(Value::Numeric { scaled, scale }, DataType::SmallInt) => {
let truncated = numeric_truncate_to_integer(scaled, scale);
i16::try_from(truncated).ok().map(Value::SmallInt)
}
(Value::Text(s), DataType::Varchar(max)) => {
if u32::try_from(s.chars().count()).unwrap_or(u32::MAX) <= max {
Some(Value::Text(s))
} else {
return Err(EngineError::Unsupported(alloc::format!(
"value for VARCHAR({max}) column `{col_name}` exceeds length: \
{} chars",
s.chars().count()
)));
}
}
(
Value::Vector(v),
DataType::Vector {
dim,
encoding: VecEncoding::Sq8,
},
) if v.len() == dim as usize => {
Some(Value::Sq8Vector(spg_storage::quantize::quantize(&v)))
}
(
Value::Vector(v),
DataType::Vector {
dim,
encoding: VecEncoding::F16,
},
) if v.len() == dim as usize => Some(Value::HalfVector(
spg_storage::halfvec::HalfVector::from_f32_slice(&v),
)),
(Value::Text(s), DataType::Char(size)) => {
let len = u32::try_from(s.chars().count()).unwrap_or(u32::MAX);
if len > size {
return Err(EngineError::Unsupported(alloc::format!(
"value for CHAR({size}) column `{col_name}` exceeds length: \
{len} chars"
)));
}
let need = (size - len) as usize;
let mut padded = s;
padded.reserve(need);
for _ in 0..need {
padded.push(' ');
}
Some(Value::Text(padded))
}
_ => None,
};
coerced.ok_or(EngineError::Storage(StorageError::TypeMismatch {
column: col_name.into(),
expected,
actual,
position,
}))
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::vec;
fn unwrap_command_ok(r: &QueryResult) -> usize {
match r {
QueryResult::CommandOk { affected, .. } => *affected,
QueryResult::Rows { .. } => panic!("expected CommandOk, got Rows"),
}
}
#[test]
fn create_table_registers_schema() {
let mut e = Engine::new();
e.execute("CREATE TABLE foo (a INT NOT NULL, b TEXT)")
.unwrap();
assert_eq!(e.catalog().table_count(), 1);
let t = e.catalog().get("foo").unwrap();
assert_eq!(t.schema().columns.len(), 2);
assert_eq!(t.schema().columns[0].ty, DataType::Int);
assert!(!t.schema().columns[0].nullable);
assert_eq!(t.schema().columns[1].ty, DataType::Text);
}
#[test]
fn create_table_vector_default_is_f32_encoded() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (v VECTOR(8))").unwrap();
let t = e.catalog().get("t").unwrap();
assert_eq!(
t.schema().columns[0].ty,
DataType::Vector {
dim: 8,
encoding: VecEncoding::F32,
},
);
}
#[test]
fn create_table_vector_using_sq8_succeeds() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (v VECTOR(8) USING SQ8)").unwrap();
let t = e.catalog().get("t").unwrap();
assert_eq!(
t.schema().columns[0].ty,
DataType::Vector {
dim: 8,
encoding: VecEncoding::Sq8,
},
);
}
#[test]
fn insert_into_sq8_column_quantises_f32_payload() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (v VECTOR(4) USING SQ8)").unwrap();
e.execute("INSERT INTO t VALUES ([0.0, 0.25, 0.5, 1.0])")
.unwrap();
let t = e.catalog().get("t").unwrap();
assert_eq!(t.rows().len(), 1);
match &t.rows()[0].values[0] {
Value::Sq8Vector(q) => {
assert_eq!(q.bytes.len(), 4);
assert!((q.min - 0.0).abs() < 1e-6);
assert!((q.max - 1.0).abs() < 1e-6);
}
other => panic!("expected Sq8Vector cell, got {other:?}"),
}
}
#[test]
fn create_table_vector_using_half_succeeds_and_insert_converts_to_f16() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (v VECTOR(4) USING HALF)")
.unwrap();
e.execute("INSERT INTO t VALUES ([0.0, 0.25, 0.5, 1.0])")
.unwrap();
let t = e.catalog().get("t").unwrap();
assert_eq!(t.rows().len(), 1);
match &t.rows()[0].values[0] {
Value::HalfVector(h) => {
assert_eq!(h.dim(), 4);
let back = h.to_f32_vec();
let expected = alloc::vec![0.0_f32, 0.25, 0.5, 1.0];
for (g, e) in back.iter().zip(expected.iter()) {
assert!(
(g - e).abs() < 1e-6,
"{g} vs {e} should be exact on f16 grid"
);
}
}
other => panic!("expected HalfVector cell, got {other:?}"),
}
}
#[test]
fn alter_index_rebuild_in_place_succeeds() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL, v VECTOR(3) NOT NULL)")
.unwrap();
for i in 0..8_i32 {
#[allow(clippy::cast_precision_loss)]
let base = (i as f32) * 0.1;
e.execute(&alloc::format!(
"INSERT INTO t VALUES ({i}, [{base}, {b1}, {b2}])",
b1 = base + 0.01,
b2 = base + 0.02,
))
.unwrap();
}
e.execute("CREATE INDEX t_idx ON t USING hnsw (v)").unwrap();
e.execute("ALTER INDEX t_idx REBUILD").unwrap();
assert_eq!(
e.catalog().get("t").unwrap().schema().columns[1].ty,
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32,
},
);
}
#[test]
fn alter_index_rebuild_with_encoding_switches_cell_type() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL, v VECTOR(4) NOT NULL)")
.unwrap();
e.execute("INSERT INTO t VALUES (1, [0.0, 0.25, 0.5, 1.0])")
.unwrap();
e.execute("CREATE INDEX t_idx ON t USING hnsw (v)").unwrap();
e.execute("ALTER INDEX t_idx REBUILD WITH (encoding = SQ8)")
.unwrap();
let t = e.catalog().get("t").unwrap();
assert_eq!(
t.schema().columns[1].ty,
DataType::Vector {
dim: 4,
encoding: VecEncoding::Sq8,
},
);
assert!(matches!(t.rows()[0].values[1], Value::Sq8Vector(_)));
}
#[test]
fn alter_index_rebuild_unknown_index_errors() {
let mut e = Engine::new();
let err = e.execute("ALTER INDEX nope REBUILD").unwrap_err();
assert!(
matches!(
&err,
EngineError::Storage(StorageError::IndexNotFound { name }) if name == "nope"
),
"got: {err}"
);
}
#[test]
fn alter_index_rebuild_on_btree_index_errors() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
e.execute("INSERT INTO t VALUES (1)").unwrap();
e.execute("CREATE INDEX t_idx ON t (id)").unwrap();
let err = e.execute("ALTER INDEX t_idx REBUILD").unwrap_err();
assert!(
matches!(&err, EngineError::Storage(StorageError::Unsupported(_))),
"got: {err}"
);
}
#[test]
fn prepared_insert_substitutes_placeholders() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL, name TEXT NOT NULL)")
.unwrap();
let stmt = e.prepare("INSERT INTO t VALUES ($1, $2)").unwrap();
for (id, name) in [(1, "alice"), (2, "bob"), (3, "carol")] {
e.execute_prepared(
stmt.clone(),
&[Value::Int(id), Value::Text(name.into())],
)
.unwrap();
}
let rows_result = e.execute("SELECT id, name FROM t").unwrap();
let QueryResult::Rows { rows, .. } = rows_result else {
panic!("expected Rows")
};
assert_eq!(rows.len(), 3);
}
#[test]
fn prepared_select_with_placeholder_filters_rows() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL, v INT NOT NULL)")
.unwrap();
for i in 0..10_i32 {
e.execute(&alloc::format!("INSERT INTO t VALUES ({i}, {})", i * 7))
.unwrap();
}
let stmt = e
.prepare("SELECT id FROM t WHERE v = $1")
.unwrap();
let QueryResult::Rows { rows, .. } = e
.execute_prepared(stmt, &[Value::Int(35)])
.unwrap()
else {
panic!("expected Rows")
};
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].values[0], Value::Int(5));
}
#[test]
fn prepared_too_few_params_errors() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
let stmt = e.prepare("INSERT INTO t VALUES ($1)").unwrap();
let err = e.execute_prepared(stmt, &[]).unwrap_err();
assert!(
matches!(
&err,
EngineError::Eval(EvalError::PlaceholderOutOfRange { n: 1, bound: 0 })
),
"got: {err}"
);
}
#[test]
fn insert_into_half_column_dim_mismatch_errors() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (v VECTOR(4) USING HALF)")
.unwrap();
let err = e.execute("INSERT INTO t VALUES ([1.0, 2.0])").unwrap_err();
assert!(matches!(
&err,
EngineError::Storage(StorageError::TypeMismatch { .. })
));
}
#[test]
fn insert_into_sq8_column_dim_mismatch_errors() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (v VECTOR(4) USING SQ8)").unwrap();
let err = e.execute("INSERT INTO t VALUES ([1.0, 2.0])").unwrap_err();
assert!(
matches!(
&err,
EngineError::Storage(StorageError::TypeMismatch { .. })
),
"got: {err}",
);
}
#[test]
fn create_table_duplicate_errors() {
let mut e = Engine::new();
e.execute("CREATE TABLE foo (a INT)").unwrap();
let err = e.execute("CREATE TABLE foo (a INT)").unwrap_err();
assert!(matches!(
err,
EngineError::Storage(StorageError::DuplicateTable { ref name }) if name == "foo"
));
}
#[test]
fn insert_into_unknown_table_errors() {
let mut e = Engine::new();
let err = e.execute("INSERT INTO ghost VALUES (1)").unwrap_err();
assert!(matches!(
err,
EngineError::Storage(StorageError::TableNotFound { ref name }) if name == "ghost"
));
}
#[test]
fn insert_happy_path_reports_one_affected() {
let mut e = Engine::new();
e.execute("CREATE TABLE foo (a INT NOT NULL)").unwrap();
let r = e.execute("INSERT INTO foo VALUES (42)").unwrap();
assert_eq!(unwrap_command_ok(&r), 1);
assert_eq!(e.catalog().get("foo").unwrap().row_count(), 1);
}
#[test]
fn insert_arity_mismatch_propagates() {
let mut e = Engine::new();
e.execute("CREATE TABLE foo (a INT, b TEXT)").unwrap();
let err = e.execute("INSERT INTO foo VALUES (1)").unwrap_err();
assert!(matches!(
err,
EngineError::Storage(StorageError::ArityMismatch { .. })
));
}
#[test]
fn insert_negative_integer_via_unary_minus() {
let mut e = Engine::new();
e.execute("CREATE TABLE foo (a INT NOT NULL)").unwrap();
e.execute("INSERT INTO foo VALUES (-7)").unwrap();
let rows = e.catalog().get("foo").unwrap().rows();
assert_eq!(rows[0].values[0], Value::Int(-7));
}
#[test]
fn insert_non_literal_expr_unsupported() {
let mut e = Engine::new();
e.execute("CREATE TABLE foo (a INT NOT NULL)").unwrap();
let err = e.execute("INSERT INTO foo VALUES (1 + 2)").unwrap_err();
assert!(matches!(err, EngineError::Unsupported(_)));
}
#[test]
fn select_star_returns_all_rows_in_insertion_order() {
let mut e = Engine::new();
e.execute("CREATE TABLE foo (a INT NOT NULL, b TEXT NOT NULL)")
.unwrap();
e.execute("INSERT INTO foo VALUES (1, 'one')").unwrap();
e.execute("INSERT INTO foo VALUES (2, 'two')").unwrap();
e.execute("INSERT INTO foo VALUES (3, 'three')").unwrap();
let r = e.execute("SELECT * FROM foo").unwrap();
let QueryResult::Rows { columns, rows } = r else {
panic!("expected Rows")
};
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].name, "a");
assert_eq!(rows.len(), 3);
assert_eq!(
rows[1].values,
vec![Value::Int(2), Value::Text("two".into())]
);
}
#[test]
fn select_star_on_empty_table_returns_zero_rows() {
let mut e = Engine::new();
e.execute("CREATE TABLE foo (a INT)").unwrap();
let r = e.execute("SELECT * FROM foo").unwrap();
match r {
QueryResult::Rows { rows, .. } => assert!(rows.is_empty()),
QueryResult::CommandOk { .. } => panic!("expected Rows"),
}
}
fn make_three_row_users(e: &mut Engine) {
e.execute("CREATE TABLE users (id INT NOT NULL, name TEXT NOT NULL, score INT)")
.unwrap();
e.execute("INSERT INTO users VALUES (1, 'alice', 90)")
.unwrap();
e.execute("INSERT INTO users VALUES (2, 'bob', NULL)")
.unwrap();
e.execute("INSERT INTO users VALUES (3, 'cara', 70)")
.unwrap();
}
fn unwrap_rows(r: QueryResult) -> (Vec<ColumnSchema>, Vec<Row>) {
match r {
QueryResult::Rows { columns, rows } => (columns, rows),
QueryResult::CommandOk { .. } => panic!("expected Rows"),
}
}
#[test]
fn where_filter_passes_only_true_rows() {
let mut e = Engine::new();
make_three_row_users(&mut e);
let r = e.execute("SELECT * FROM users WHERE id > 1").unwrap();
let (_, rows) = unwrap_rows(r);
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].values[0], Value::Int(2));
assert_eq!(rows[1].values[0], Value::Int(3));
}
#[test]
fn where_with_null_result_filters_out_row() {
let mut e = Engine::new();
make_three_row_users(&mut e);
let r = e.execute("SELECT * FROM users WHERE score > 80").unwrap();
let (_, rows) = unwrap_rows(r);
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].values[1], Value::Text("alice".into()));
}
#[test]
fn projection_named_columns() {
let mut e = Engine::new();
make_three_row_users(&mut e);
let r = e.execute("SELECT name, score FROM users").unwrap();
let (cols, rows) = unwrap_rows(r);
assert_eq!(cols.len(), 2);
assert_eq!(cols[0].name, "name");
assert_eq!(cols[1].name, "score");
assert_eq!(rows.len(), 3);
assert_eq!(
rows[0].values,
vec![Value::Text("alice".into()), Value::Int(90)]
);
}
#[test]
fn projection_with_column_alias() {
let mut e = Engine::new();
make_three_row_users(&mut e);
let r = e
.execute("SELECT name AS who FROM users WHERE id = 1")
.unwrap();
let (cols, rows) = unwrap_rows(r);
assert_eq!(cols[0].name, "who");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].values[0], Value::Text("alice".into()));
}
#[test]
fn qualified_column_with_table_alias_resolves() {
let mut e = Engine::new();
make_three_row_users(&mut e);
let r = e
.execute("SELECT u.id, u.name FROM users AS u WHERE u.id < 3")
.unwrap();
let (cols, rows) = unwrap_rows(r);
assert_eq!(cols.len(), 2);
assert_eq!(rows.len(), 2);
}
#[test]
fn qualified_column_with_wrong_alias_errors() {
let mut e = Engine::new();
make_three_row_users(&mut e);
let err = e.execute("SELECT x.id FROM users AS u").unwrap_err();
assert!(matches!(
err,
EngineError::Eval(EvalError::UnknownQualifier { ref qualifier }) if qualifier == "x"
));
}
#[test]
fn select_unknown_column_errors_in_projection() {
let mut e = Engine::new();
make_three_row_users(&mut e);
let err = e.execute("SELECT ghost FROM users").unwrap_err();
assert!(matches!(
err,
EngineError::Eval(EvalError::ColumnNotFound { ref name }) if name == "ghost"
));
}
#[test]
fn where_unknown_column_errors() {
let mut e = Engine::new();
make_three_row_users(&mut e);
let err = e
.execute("SELECT * FROM users WHERE ghost = 1")
.unwrap_err();
assert!(matches!(
err,
EngineError::Eval(EvalError::ColumnNotFound { .. })
));
}
#[test]
fn expression_projection_evaluates_and_renders() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (a INT NOT NULL)").unwrap();
e.execute("INSERT INTO t VALUES (3)").unwrap();
let (_, rows) = unwrap_rows(e.execute("SELECT 1 + 2 FROM t").unwrap());
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].values[0], Value::Int(3));
}
#[test]
fn select_unknown_table_errors() {
let mut e = Engine::new();
let err = e.execute("SELECT * FROM ghost").unwrap_err();
assert!(matches!(
err,
EngineError::Storage(StorageError::TableNotFound { .. })
));
}
#[test]
fn invalid_sql_returns_parse_error() {
let mut e = Engine::new();
let err = e.execute("THIS_IS_NOT_A_KEYWORD foo bar baz").unwrap_err();
assert!(matches!(err, EngineError::Parse(_)));
}
#[test]
fn create_index_registers_on_table() {
let mut e = Engine::new();
make_three_row_users(&mut e);
e.execute("CREATE INDEX by_name ON users (name)").unwrap();
let t = e.catalog().get("users").unwrap();
assert_eq!(t.indices().len(), 1);
assert_eq!(t.indices()[0].name, "by_name");
}
#[test]
fn create_index_on_unknown_table_errors() {
let mut e = Engine::new();
let err = e.execute("CREATE INDEX i ON ghost (a)").unwrap_err();
assert!(matches!(
err,
EngineError::Storage(StorageError::TableNotFound { .. })
));
}
#[test]
fn create_index_on_unknown_column_errors() {
let mut e = Engine::new();
make_three_row_users(&mut e);
let err = e.execute("CREATE INDEX i ON users (ghost)").unwrap_err();
assert!(matches!(
err,
EngineError::Storage(StorageError::ColumnNotFound { .. })
));
}
#[test]
fn select_eq_uses_index_returns_same_rows_as_scan() {
let mut without = Engine::new();
make_three_row_users(&mut without);
let mut with = Engine::new();
make_three_row_users(&mut with);
with.execute("CREATE INDEX by_id ON users (id)").unwrap();
let q = "SELECT * FROM users WHERE id = 2";
let (_, no_idx_rows) = unwrap_rows(without.execute(q).unwrap());
let (_, idx_rows) = unwrap_rows(with.execute(q).unwrap());
assert_eq!(no_idx_rows, idx_rows);
assert_eq!(idx_rows.len(), 1);
}
#[test]
fn select_eq_with_no_matching_index_value_returns_empty() {
let mut e = Engine::new();
make_three_row_users(&mut e);
e.execute("CREATE INDEX by_id ON users (id)").unwrap();
let (_, rows) = unwrap_rows(e.execute("SELECT * FROM users WHERE id = 999").unwrap());
assert_eq!(rows.len(), 0);
}
#[test]
fn begin_sets_in_transaction_flag() {
let mut e = Engine::new();
assert!(!e.in_transaction());
e.execute("BEGIN").unwrap();
assert!(e.in_transaction());
}
#[test]
fn double_begin_errors() {
let mut e = Engine::new();
e.execute("BEGIN").unwrap();
let err = e.execute("BEGIN").unwrap_err();
assert_eq!(err, EngineError::TransactionAlreadyOpen);
}
#[test]
fn commit_without_begin_errors() {
let mut e = Engine::new();
let err = e.execute("COMMIT").unwrap_err();
assert_eq!(err, EngineError::NoActiveTransaction);
}
#[test]
fn rollback_without_begin_errors() {
let mut e = Engine::new();
let err = e.execute("ROLLBACK").unwrap_err();
assert_eq!(err, EngineError::NoActiveTransaction);
}
#[test]
fn commit_applies_shadow_to_committed_catalog() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (v INT NOT NULL)").unwrap();
e.execute("BEGIN").unwrap();
e.execute("INSERT INTO t VALUES (1)").unwrap();
e.execute("INSERT INTO t VALUES (2)").unwrap();
e.execute("COMMIT").unwrap();
assert!(!e.in_transaction());
assert_eq!(e.catalog().get("t").unwrap().row_count(), 2);
}
#[test]
fn rollback_discards_shadow() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (v INT NOT NULL)").unwrap();
e.execute("BEGIN").unwrap();
e.execute("INSERT INTO t VALUES (1)").unwrap();
e.execute("INSERT INTO t VALUES (2)").unwrap();
e.execute("ROLLBACK").unwrap();
assert!(!e.in_transaction());
assert_eq!(e.catalog().get("t").unwrap().row_count(), 0);
}
#[test]
fn select_during_tx_sees_uncommitted_writes_own_session() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (v INT NOT NULL)").unwrap();
e.execute("BEGIN").unwrap();
e.execute("INSERT INTO t VALUES (42)").unwrap();
let (_, rows) = unwrap_rows(e.execute("SELECT * FROM t").unwrap());
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].values[0], Value::Int(42));
}
#[test]
fn snapshot_with_no_users_is_bare_catalog_format() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
let bytes = e.snapshot();
assert_eq!(
&bytes[..8],
b"SPGDB001",
"must be the bare v3.x catalog magic"
);
let e2 = Engine::restore_envelope(&bytes).unwrap();
assert!(e2.users().is_empty());
assert_eq!(e2.catalog().table_count(), 1);
}
#[test]
fn snapshot_with_users_round_trips_both_via_envelope() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
e.create_user("alice", "pw1", Role::Admin, [9; 16]).unwrap();
e.create_user("bob", "pw2", Role::ReadOnly, [5; 16])
.unwrap();
let bytes = e.snapshot();
assert_eq!(&bytes[..8], b"SPGENV01", "must be the v4.1 envelope magic");
let e2 = Engine::restore_envelope(&bytes).unwrap();
assert_eq!(e2.users().len(), 2);
assert_eq!(e2.verify_user("alice", "pw1"), Some(Role::Admin));
assert_eq!(e2.verify_user("bob", "pw2"), Some(Role::ReadOnly));
assert_eq!(e2.verify_user("alice", "wrong"), None);
assert_eq!(e2.catalog().table_count(), 1);
}
#[test]
fn ddl_inside_tx_also_rolled_back() {
let mut e = Engine::new();
e.execute("BEGIN").unwrap();
e.execute("CREATE TABLE t (v INT)").unwrap();
e.execute("SELECT * FROM t").unwrap();
e.execute("ROLLBACK").unwrap();
let err = e.execute("SELECT * FROM t").unwrap_err();
assert!(matches!(
err,
EngineError::Storage(StorageError::TableNotFound { .. })
));
}
#[test]
fn create_publication_lands_in_catalog() {
let mut e = Engine::new();
assert!(e.publications().is_empty());
e.execute("CREATE PUBLICATION pub_a").unwrap();
assert_eq!(e.publications().len(), 1);
assert!(e.publications().contains("pub_a"));
}
#[test]
fn create_publication_duplicate_errors() {
let mut e = Engine::new();
e.execute("CREATE PUBLICATION pub_a").unwrap();
let err = e.execute("CREATE PUBLICATION pub_a").unwrap_err();
assert!(
alloc::format!("{err:?}").contains("DuplicateName"),
"got {err:?}"
);
}
#[test]
fn drop_publication_silent_when_absent() {
let mut e = Engine::new();
let r = e.execute("DROP PUBLICATION nope").unwrap();
match r {
QueryResult::CommandOk { affected, .. } => assert_eq!(affected, 0),
other => panic!("expected CommandOk, got {other:?}"),
}
}
#[test]
fn drop_publication_present_reports_one_affected() {
let mut e = Engine::new();
e.execute("CREATE PUBLICATION pub_a").unwrap();
let r = e.execute("DROP PUBLICATION pub_a").unwrap();
match r {
QueryResult::CommandOk {
affected,
modified_catalog,
} => {
assert_eq!(affected, 1);
assert!(modified_catalog);
}
other => panic!("expected CommandOk, got {other:?}"),
}
assert!(e.publications().is_empty());
}
#[test]
fn publications_persist_across_snapshot_restore() {
let mut e = Engine::new();
e.execute("CREATE PUBLICATION pub_a").unwrap();
e.execute("CREATE PUBLICATION pub_b FOR ALL TABLES").unwrap();
let snap = e.snapshot();
let e2 = Engine::restore_envelope(&snap).unwrap();
assert_eq!(e2.publications().len(), 2);
assert!(e2.publications().contains("pub_a"));
assert!(e2.publications().contains("pub_b"));
}
#[test]
fn create_publication_allowed_inside_transaction() {
let mut e = Engine::new();
e.execute("BEGIN").unwrap();
e.execute("CREATE PUBLICATION pub_a").unwrap();
e.execute("COMMIT").unwrap();
assert!(e.publications().contains("pub_a"));
}
#[test]
fn create_publication_for_table_list_lands_with_scope() {
let mut e = Engine::new();
e.execute("CREATE TABLE t1 (id INT NOT NULL)").unwrap();
e.execute("CREATE TABLE t2 (id INT NOT NULL)").unwrap();
e.execute("CREATE PUBLICATION pub_a FOR TABLE t1, t2")
.unwrap();
let scope = e.publications().get("pub_a").cloned();
let Some(spg_sql::ast::PublicationScope::ForTables(ts)) = scope else {
panic!("expected ForTables scope, got {scope:?}")
};
assert_eq!(ts, alloc::vec!["t1".to_string(), "t2".to_string()]);
}
#[test]
fn create_publication_all_tables_except_lands_with_scope() {
let mut e = Engine::new();
e.execute("CREATE PUBLICATION pub_a FOR ALL TABLES EXCEPT t3")
.unwrap();
let scope = e.publications().get("pub_a").cloned();
let Some(spg_sql::ast::PublicationScope::AllTablesExcept(ts)) = scope else {
panic!("expected AllTablesExcept scope, got {scope:?}")
};
assert_eq!(ts, alloc::vec!["t3".to_string()]);
}
#[test]
fn show_publications_empty_returns_zero_rows() {
let e = Engine::new();
let r = e.execute_readonly("SHOW PUBLICATIONS").unwrap();
let QueryResult::Rows { rows, columns } = r else {
panic!()
};
assert!(rows.is_empty());
assert_eq!(columns.len(), 3);
assert_eq!(columns[0].name, "name");
assert_eq!(columns[1].name, "scope");
assert_eq!(columns[2].name, "table_count");
}
#[test]
fn show_publications_returns_one_row_per_publication_ordered_by_name() {
let mut e = Engine::new();
e.execute("CREATE PUBLICATION z_pub").unwrap();
e.execute("CREATE PUBLICATION a_pub FOR TABLE t1, t2")
.unwrap();
e.execute("CREATE PUBLICATION m_pub FOR ALL TABLES EXCEPT bad")
.unwrap();
let r = e.execute_readonly("SHOW PUBLICATIONS").unwrap();
let QueryResult::Rows { rows, .. } = r else {
panic!()
};
assert_eq!(rows.len(), 3);
let names: Vec<&str> = rows
.iter()
.map(|r| {
if let Value::Text(s) = &r.values[0] {
s.as_str()
} else {
panic!()
}
})
.collect();
assert_eq!(names, alloc::vec!["a_pub", "m_pub", "z_pub"]);
match &rows[0].values[1] {
Value::Text(s) => assert_eq!(s, "FOR TABLE t1, t2"),
other => panic!("expected Text, got {other:?}"),
}
assert_eq!(rows[0].values[2], Value::Int(2));
match &rows[1].values[1] {
Value::Text(s) => assert_eq!(s, "FOR ALL TABLES EXCEPT bad"),
other => panic!("expected Text, got {other:?}"),
}
assert_eq!(rows[1].values[2], Value::Int(1));
match &rows[2].values[1] {
Value::Text(s) => assert_eq!(s, "FOR ALL TABLES"),
other => panic!("expected Text, got {other:?}"),
}
assert_eq!(rows[2].values[2], Value::Null);
}
#[test]
fn for_list_scopes_persist_across_snapshot() {
let mut e = Engine::new();
e.execute("CREATE PUBLICATION p1 FOR TABLE t1, t2").unwrap();
e.execute("CREATE PUBLICATION p2 FOR ALL TABLES EXCEPT bad, worse")
.unwrap();
let snap = e.snapshot();
let e2 = Engine::restore_envelope(&snap).unwrap();
assert_eq!(e2.publications().len(), 2);
let p1 = e2.publications().get("p1").cloned();
let Some(spg_sql::ast::PublicationScope::ForTables(ts)) = p1 else {
panic!("p1 scope lost: {p1:?}")
};
assert_eq!(ts, alloc::vec!["t1".to_string(), "t2".to_string()]);
let p2 = e2.publications().get("p2").cloned();
let Some(spg_sql::ast::PublicationScope::AllTablesExcept(ts)) = p2 else {
panic!("p2 scope lost: {p2:?}")
};
assert_eq!(ts, alloc::vec!["bad".to_string(), "worse".to_string()]);
}
#[test]
fn create_subscription_lands_in_catalog_with_defaults() {
let mut e = Engine::new();
e.execute(
"CREATE SUBSCRIPTION sub_a CONNECTION 'host=127.0.0.1 port=20002' PUBLICATION pub_a",
)
.unwrap();
let s = e.subscriptions().get("sub_a").cloned().expect("present");
assert_eq!(s.conn_str, "host=127.0.0.1 port=20002");
assert_eq!(s.publications, alloc::vec!["pub_a".to_string()]);
assert!(s.enabled);
assert_eq!(s.last_received_pos, 0);
}
#[test]
fn create_subscription_duplicate_name_errors() {
let mut e = Engine::new();
e.execute("CREATE SUBSCRIPTION s CONNECTION 'host=x' PUBLICATION p")
.unwrap();
let err = e
.execute("CREATE SUBSCRIPTION s CONNECTION 'host=y' PUBLICATION p")
.unwrap_err();
assert!(
alloc::format!("{err:?}").contains("DuplicateName"),
"got {err:?}"
);
}
#[test]
fn drop_subscription_silent_when_absent() {
let mut e = Engine::new();
let r = e.execute("DROP SUBSCRIPTION never").unwrap();
match r {
QueryResult::CommandOk { affected, .. } => assert_eq!(affected, 0),
other => panic!("expected CommandOk, got {other:?}"),
}
}
#[test]
fn subscription_advance_updates_last_pos_monotone() {
let mut e = Engine::new();
e.execute("CREATE SUBSCRIPTION s CONNECTION 'h=x' PUBLICATION p")
.unwrap();
assert!(e.subscription_advance("s", 100));
assert_eq!(e.subscriptions().get("s").unwrap().last_received_pos, 100);
assert!(e.subscription_advance("s", 50)); assert_eq!(e.subscriptions().get("s").unwrap().last_received_pos, 100);
assert!(e.subscription_advance("s", 200));
assert_eq!(e.subscriptions().get("s").unwrap().last_received_pos, 200);
assert!(!e.subscription_advance("missing", 1));
}
#[test]
fn show_subscriptions_returns_rows_ordered_by_name() {
let mut e = Engine::new();
e.execute("CREATE SUBSCRIPTION z_sub CONNECTION 'h=x' PUBLICATION p1, p2")
.unwrap();
e.execute("CREATE SUBSCRIPTION a_sub CONNECTION 'h=y' PUBLICATION p3")
.unwrap();
let r = e.execute_readonly("SHOW SUBSCRIPTIONS").unwrap();
let QueryResult::Rows { rows, columns } = r else {
panic!()
};
assert_eq!(rows.len(), 2);
assert_eq!(columns.len(), 5);
assert_eq!(columns[0].name, "name");
assert_eq!(columns[4].name, "last_received_pos");
let names: Vec<&str> = rows
.iter()
.map(|r| {
if let Value::Text(s) = &r.values[0] {
s.as_str()
} else {
panic!()
}
})
.collect();
assert_eq!(names, alloc::vec!["a_sub", "z_sub"]);
assert_eq!(rows[0].values[1], Value::Text("h=y".to_string()));
assert_eq!(rows[0].values[2], Value::Text("p3".to_string()));
assert_eq!(rows[0].values[3], Value::Bool(true));
assert_eq!(rows[0].values[4], Value::BigInt(0));
assert_eq!(rows[1].values[2], Value::Text("p1, p2".to_string()));
}
#[test]
fn subscriptions_persist_across_snapshot_envelope_v4() {
let mut e = Engine::new();
e.execute("CREATE SUBSCRIPTION s1 CONNECTION 'h=A' PUBLICATION p1, p2")
.unwrap();
e.execute("CREATE SUBSCRIPTION s2 CONNECTION 'h=B' PUBLICATION p3")
.unwrap();
e.subscription_advance("s2", 42);
let snap = e.snapshot();
let e2 = Engine::restore_envelope(&snap).unwrap();
assert_eq!(e2.subscriptions().len(), 2);
let s1 = e2.subscriptions().get("s1").unwrap();
assert_eq!(s1.conn_str, "h=A");
assert_eq!(s1.publications, alloc::vec!["p1".to_string(), "p2".to_string()]);
assert_eq!(s1.last_received_pos, 0);
let s2 = e2.subscriptions().get("s2").unwrap();
assert_eq!(s2.last_received_pos, 42);
}
#[test]
fn v3_envelope_loads_with_empty_subscriptions() {
let mut e = Engine::new();
e.execute("CREATE PUBLICATION pub_legacy").unwrap();
let catalog = e.catalog.serialize();
let users = crate::users::serialize_users(&e.users);
let pubs = e.publications.serialize();
let mut buf = Vec::new();
buf.extend_from_slice(b"SPGENV01");
buf.push(3u8); buf.extend_from_slice(&u32::try_from(catalog.len()).unwrap().to_le_bytes());
buf.extend_from_slice(&catalog);
buf.extend_from_slice(&u32::try_from(users.len()).unwrap().to_le_bytes());
buf.extend_from_slice(&users);
buf.extend_from_slice(&u32::try_from(pubs.len()).unwrap().to_le_bytes());
buf.extend_from_slice(&pubs);
let crc = spg_crypto::crc32::crc32(&buf);
buf.extend_from_slice(&crc.to_le_bytes());
let e2 = Engine::restore_envelope(&buf).expect("v3 envelope restores under v4 reader");
assert!(e2.subscriptions().is_empty());
assert!(e2.publications().contains("pub_legacy"));
}
#[test]
fn create_subscription_allowed_inside_transaction() {
let mut e = Engine::new();
e.execute("BEGIN").unwrap();
e.execute("CREATE SUBSCRIPTION s CONNECTION 'h=x' PUBLICATION p")
.unwrap();
e.execute("COMMIT").unwrap();
assert!(e.subscriptions().contains("s"));
}
#[test]
#[test]
fn analyze_populates_histogram_bounds() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)").unwrap();
for i in 0..50 {
e.execute(&alloc::format!(
"INSERT INTO t VALUES ({i}, 'name{i}')"
))
.unwrap();
}
e.execute("ANALYZE t").unwrap();
let stats = e.statistics();
let id_stats = stats.get("t", "id").unwrap();
assert!(id_stats.histogram_bounds.len() >= 2);
assert_eq!(id_stats.histogram_bounds.first().unwrap(), "0");
assert_eq!(id_stats.histogram_bounds.last().unwrap(), "49");
assert!((id_stats.null_frac - 0.0).abs() < 1e-6);
assert_eq!(id_stats.n_distinct, 50);
}
#[test]
fn reanalyze_overwrites_prior_stats() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
for i in 0..10 {
e.execute(&alloc::format!("INSERT INTO t VALUES ({i})")).unwrap();
}
e.execute("ANALYZE t").unwrap();
let n1 = e.statistics().get("t", "id").unwrap().n_distinct;
assert_eq!(n1, 10);
for i in 10..30 {
e.execute(&alloc::format!("INSERT INTO t VALUES ({i})")).unwrap();
}
e.execute("ANALYZE t").unwrap();
let n2 = e.statistics().get("t", "id").unwrap().n_distinct;
assert_eq!(n2, 30);
}
#[test]
fn analyze_unknown_table_errors() {
let mut e = Engine::new();
let err = e.execute("ANALYZE nonexistent").unwrap_err();
assert!(matches!(err, EngineError::Storage(StorageError::TableNotFound { .. })));
}
#[test]
fn bare_analyze_covers_all_user_tables() {
let mut e = Engine::new();
e.execute("CREATE TABLE t1 (id INT NOT NULL)").unwrap();
e.execute("CREATE TABLE t2 (name TEXT NOT NULL)").unwrap();
e.execute("INSERT INTO t1 VALUES (1)").unwrap();
e.execute("INSERT INTO t2 VALUES ('alice')").unwrap();
let r = e.execute("ANALYZE").unwrap();
match r {
QueryResult::CommandOk { affected, modified_catalog } => {
assert_eq!(affected, 2);
assert!(modified_catalog);
}
other => panic!("expected CommandOk, got {other:?}"),
}
assert!(e.statistics().get("t1", "id").is_some());
assert!(e.statistics().get("t2", "name").is_some());
}
#[test]
fn select_from_spg_statistic_returns_rows_per_column() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL, label TEXT)")
.unwrap();
e.execute("INSERT INTO t VALUES (1, 'a')").unwrap();
e.execute("INSERT INTO t VALUES (2, 'b')").unwrap();
e.execute("ANALYZE t").unwrap();
let r = e.execute_readonly("SELECT * FROM spg_statistic").unwrap();
let QueryResult::Rows { rows, columns } = r else {
panic!()
};
assert_eq!(columns.len(), 6);
assert_eq!(columns[0].name, "table_name");
assert_eq!(columns[4].name, "histogram_bounds");
assert_eq!(columns[5].name, "cold_row_count");
assert_eq!(rows.len(), 2, "one row per column of t");
match (&rows[0].values[0], &rows[0].values[1]) {
(Value::Text(t), Value::Text(c)) => {
assert_eq!(t, "t");
assert_eq!(c, "id");
}
_ => panic!(),
}
}
#[test]
fn analyze_skips_vector_columns() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL, v VECTOR(3) NOT NULL)")
.unwrap();
e.execute("INSERT INTO t VALUES (1, [1, 2, 3])").unwrap();
e.execute("ANALYZE t").unwrap();
assert!(e.statistics().get("t", "id").is_some());
assert!(e.statistics().get("t", "v").is_none());
}
#[test]
fn statistics_persist_across_envelope_v5_round_trip() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
for i in 0..20 {
e.execute(&alloc::format!("INSERT INTO t VALUES ({i})")).unwrap();
}
e.execute("ANALYZE").unwrap();
let snap = e.snapshot();
let e2 = Engine::restore_envelope(&snap).unwrap();
let s = e2.statistics().get("t", "id").unwrap();
assert_eq!(s.n_distinct, 20);
}
#[test]
fn auto_analyze_threshold_fires_after_10pct_of_min_rows_on_small_table() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
for i in 0..9 {
e.execute(&alloc::format!("INSERT INTO t VALUES ({i})")).unwrap();
}
assert!(e.tables_needing_analyze().is_empty(), "9 < threshold");
e.execute("INSERT INTO t VALUES (9)").unwrap();
let needs = e.tables_needing_analyze();
assert_eq!(needs, alloc::vec!["t".to_string()]);
}
#[test]
fn auto_analyze_threshold_uses_10pct_of_row_count_for_large_tables() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
for i in 0..1000 {
e.execute(&alloc::format!("INSERT INTO t VALUES ({i})")).unwrap();
}
e.execute("ANALYZE t").unwrap();
assert!(e.tables_needing_analyze().is_empty(), "fresh ANALYZE");
for i in 1000..1050 {
e.execute(&alloc::format!("INSERT INTO t VALUES ({i})")).unwrap();
}
assert!(
e.tables_needing_analyze().is_empty(),
"50 inserts < threshold of ~105"
);
for i in 1050..1200 {
e.execute(&alloc::format!("INSERT INTO t VALUES ({i})")).unwrap();
}
assert_eq!(
e.tables_needing_analyze(),
alloc::vec!["t".to_string()],
"200 inserts > 0.1 × 1200 threshold"
);
}
#[test]
fn auto_analyze_threshold_resets_after_analyze() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
for i in 0..200 {
e.execute(&alloc::format!("INSERT INTO t VALUES ({i})")).unwrap();
}
assert!(!e.tables_needing_analyze().is_empty());
e.execute("ANALYZE").unwrap();
assert!(
e.tables_needing_analyze().is_empty(),
"ANALYZE must reset the counter"
);
}
#[test]
fn auto_analyze_threshold_tracks_updates_and_deletes() {
let mut e = Engine::new();
e.execute("CREATE TABLE t (id INT NOT NULL, label TEXT)").unwrap();
for i in 0..50 {
e.execute(&alloc::format!("INSERT INTO t VALUES ({i}, 'x')"))
.unwrap();
}
e.execute("ANALYZE t").unwrap();
e.execute("UPDATE t SET label = 'y' WHERE id < 20").unwrap();
e.execute("DELETE FROM t WHERE id >= 45").unwrap();
assert_eq!(
e.tables_needing_analyze(),
alloc::vec!["t".to_string()]
);
}
#[test]
fn v4_envelope_loads_with_empty_statistics() {
let mut e = Engine::new();
e.create_user("alice", "secret", crate::users::Role::ReadOnly, [0u8; 16])
.unwrap();
let catalog = e.catalog.serialize();
let users = crate::users::serialize_users(&e.users);
let pubs = e.publications.serialize();
let subs = e.subscriptions.serialize();
let mut buf = Vec::new();
buf.extend_from_slice(b"SPGENV01");
buf.push(4u8);
buf.extend_from_slice(&u32::try_from(catalog.len()).unwrap().to_le_bytes());
buf.extend_from_slice(&catalog);
buf.extend_from_slice(&u32::try_from(users.len()).unwrap().to_le_bytes());
buf.extend_from_slice(&users);
buf.extend_from_slice(&u32::try_from(pubs.len()).unwrap().to_le_bytes());
buf.extend_from_slice(&pubs);
buf.extend_from_slice(&u32::try_from(subs.len()).unwrap().to_le_bytes());
buf.extend_from_slice(&subs);
let crc = spg_crypto::crc32::crc32(&buf);
buf.extend_from_slice(&crc.to_le_bytes());
let e2 = Engine::restore_envelope(&buf).expect("v4 envelope restores");
assert!(e2.statistics().is_empty());
}
#[test]
fn v1_v2_envelope_loads_with_empty_publications() {
let mut e = Engine::new();
e.create_user(
"alice",
"secret",
crate::users::Role::ReadOnly,
[0u8; 16],
)
.unwrap();
let catalog = e.catalog.serialize();
let users = crate::users::serialize_users(&e.users);
let mut buf = Vec::new();
buf.extend_from_slice(b"SPGENV01");
buf.push(2u8); buf.extend_from_slice(
&u32::try_from(catalog.len()).unwrap().to_le_bytes(),
);
buf.extend_from_slice(&catalog);
buf.extend_from_slice(
&u32::try_from(users.len()).unwrap().to_le_bytes(),
);
buf.extend_from_slice(&users);
let crc = spg_crypto::crc32::crc32(&buf);
buf.extend_from_slice(&crc.to_le_bytes());
let e2 = Engine::restore_envelope(&buf).expect("v2 envelope restores");
assert!(e2.publications().is_empty());
}
}