#![no_std]
extern crate alloc;
pub mod aggregate;
mod bytebudget;
mod cancel;
mod clock;
mod constraints;
mod conversions;
pub mod copy;
mod ddl;
pub mod describe;
mod dml;
mod envelope;
pub mod eval;
mod execute;
mod explain;
mod expr_analysis;
pub mod fts;
mod index_access;
mod join;
pub mod json;
mod maintenance;
pub mod memoize;
mod numeric;
mod orderby;
pub mod plan_cache;
mod plpgsql;
pub mod publications;
pub mod query_stats;
mod readonly;
pub mod reorder;
mod select;
pub mod selectivity;
mod sequence;
mod session;
mod show;
mod spg_admin;
pub mod statistics;
mod subquery;
pub mod subscriptions;
mod substitute;
mod system_catalog;
mod table_access;
mod transaction;
pub mod triggers;
pub mod users;
mod window;
pub use crate::users::{Role, ScramSecrets, UserError, UserStore};
pub use cancel::{CancelToken, MonotonicNowFn};
use bytebudget::*;
pub(crate) use clock::{rewrite_clock_calls, value_to_literal};
use constraints::*;
use conversions::*;
pub use conversions::{
format_bigint_2d_text_pub, format_hstore_text, format_int_2d_text_pub, format_range_text,
format_text_2d_text_pub,
};
pub(crate) use ddl::{
canonicalize_set_value, enforce_enum_label, eval_runtime_default_free,
resolve_column_default_free,
};
pub(crate) use envelope::{EnvelopeParse, build_envelope, split_envelope};
use expr_analysis::*;
use index_access::*;
pub(crate) use orderby::{
apply_offset_and_limit, apply_offset_and_limit_tagged, build_order_keys, canonical_value_repr,
expand_group_by_all, order_by_value_cmp, partial_sort_tagged, render_histogram_bounds,
resolve_order_by_position, sort_by_keys, sort_values_for_histogram, value_cmp, value_to_f64,
};
pub(crate) use select::{build_projection, infer_column_types, value_to_order_key};
pub(crate) use show::render_create_table;
pub(crate) use subquery::{
build_in_list_set, collect_scalar_subqueries, expr_has_subquery, expr_tree_has_subquery,
};
pub use substitute::substitute_placeholders;
use substitute::*;
use system_catalog::*;
use window::*;
use alloc::collections::BTreeMap;
use alloc::string::String;
use alloc::vec::Vec;
use core::fmt;
pub use spg_sql::ast::Statement as ParsedStatement;
use spg_sql::parser::ParseError;
use spg_storage::{Catalog, ColumnSchema, Row, StorageError};
use crate::eval::EvalError;
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum QueryResult {
CommandOk {
affected: usize,
modified_catalog: bool,
},
Rows {
columns: Vec<ColumnSchema>,
rows: Vec<Row>,
},
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum EngineError {
Parse(ParseError),
Storage(StorageError),
Eval(EvalError),
Unsupported(String),
TransactionAlreadyOpen,
NoActiveTransaction,
WriteRequired,
RowLimitExceeded(usize),
QueryBytesExceeded(usize),
Cancelled,
}
impl fmt::Display for EngineError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Parse(e) => write!(f, "parse: {e}"),
Self::Storage(e) => write!(f, "storage: {e}"),
Self::Eval(e) => write!(f, "eval: {e}"),
Self::Unsupported(s) => write!(f, "unsupported: {s}"),
Self::TransactionAlreadyOpen => f.write_str("a transaction is already open"),
Self::NoActiveTransaction => f.write_str("no active transaction"),
Self::WriteRequired => {
f.write_str("statement requires a write lock (use execute, not execute_readonly)")
}
Self::RowLimitExceeded(n) => {
write!(f, "query exceeded max_query_rows={n}")
}
Self::QueryBytesExceeded(n) => {
write!(
f,
"query materialisation exceeded max_query_bytes={n} (set SPG_MAX_QUERY_BYTES to raise, 0 to disable)"
)
}
Self::Cancelled => f.write_str("query cancelled (timeout or client request)"),
}
}
}
impl From<ParseError> for EngineError {
fn from(e: ParseError) -> Self {
Self::Parse(e)
}
}
impl From<StorageError> for EngineError {
fn from(e: StorageError) -> Self {
Self::Storage(e)
}
}
impl From<EvalError> for EngineError {
fn from(e: EvalError) -> Self {
Self::Eval(e)
}
}
pub type ClockFn = fn() -> i64;
pub type SaltFn = fn() -> [u8; 16];
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TxId(pub u64);
pub const IMPLICIT_TX: TxId = TxId(0);
pub const COMPACTION_TARGET_DEFAULT_BYTES: u64 = 4 * 1024 * 1024;
#[derive(Debug, Default, Clone)]
struct TxState {
catalog: Catalog,
savepoints: Vec<(String, Catalog)>,
}
#[derive(Debug, Clone)]
pub struct CatalogSnapshot {
catalog: Catalog,
statistics: statistics::Statistics,
clock: Option<ClockFn>,
max_query_rows: Option<usize>,
}
#[derive(Debug, Default)]
pub struct Engine {
catalog: Catalog,
tx_catalogs: BTreeMap<TxId, TxState>,
current_tx: Option<TxId>,
next_tx_id: u64,
backslash_escapes: bool,
clock: Option<ClockFn>,
salt_fn: Option<SaltFn>,
max_query_rows: Option<usize>,
pub(crate) max_query_bytes: Option<usize>,
pub(crate) users: UserStore,
publications: publications::Publications,
subscriptions: subscriptions::Subscriptions,
statistics: statistics::Statistics,
plan_cache: plan_cache::PlanCache,
query_stats: query_stats::QueryStats,
activity_provider: Option<ActivityProvider>,
audit_chain_provider: Option<AuditChainProvider>,
audit_verifier: Option<AuditVerifier>,
slow_query_threshold_us: Option<u64>,
slow_query_logger: Option<SlowQueryLogger>,
pub(crate) session_params: BTreeMap<String, String>,
trigger_recursion_depth: u32,
foreign_key_checks: bool,
meta_views_materialised: bool,
pending_foreign_keys: Vec<(alloc::string::String, spg_sql::ast::ForeignKeyConstraint)>,
}
const MAX_TRIGGER_RECURSION: u32 = 16;
pub type SlowQueryLogger = fn(&str, u64);
#[derive(Debug, Clone)]
pub struct ActivityRow {
pub pid: u32,
pub user: String,
pub started_at_us: i64,
pub current_sql: String,
pub wait_event: String,
pub elapsed_us: i64,
pub in_transaction: bool,
pub application_name: String,
}
pub type ActivityProvider = fn() -> Vec<ActivityRow>;
#[derive(Debug, Clone)]
pub struct AuditRow {
pub seq: i64,
pub ts_ms: i64,
pub prev_hash_hex: String,
pub entry_hash_hex: String,
pub sql: String,
}
pub type AuditChainProvider = fn() -> Vec<AuditRow>;
pub type AuditVerifier = fn() -> (i64, i64);
impl Engine {
pub fn new() -> Self {
Self {
catalog: Catalog::new(),
tx_catalogs: BTreeMap::new(),
current_tx: None,
backslash_escapes: false,
next_tx_id: 1,
clock: None,
salt_fn: None,
max_query_rows: None,
max_query_bytes: None,
users: UserStore::new(),
publications: publications::Publications::new(),
subscriptions: subscriptions::Subscriptions::new(),
statistics: statistics::Statistics::new(),
plan_cache: plan_cache::PlanCache::new(),
query_stats: query_stats::QueryStats::new(),
activity_provider: None,
audit_chain_provider: None,
audit_verifier: None,
slow_query_threshold_us: None,
slow_query_logger: None,
session_params: BTreeMap::new(),
trigger_recursion_depth: 0,
foreign_key_checks: true,
meta_views_materialised: false,
pending_foreign_keys: Vec::new(),
}
}
#[must_use]
pub fn clone_snapshot(&self) -> CatalogSnapshot {
CatalogSnapshot {
catalog: self.active_catalog().clone(),
statistics: self.statistics.clone(),
clock: self.clock,
max_query_rows: self.max_query_rows,
}
}
pub fn restore(catalog: Catalog) -> Self {
Self {
catalog,
tx_catalogs: BTreeMap::new(),
current_tx: None,
backslash_escapes: false,
next_tx_id: 1,
clock: None,
salt_fn: None,
max_query_rows: None,
max_query_bytes: None,
users: UserStore::new(),
publications: publications::Publications::new(),
subscriptions: subscriptions::Subscriptions::new(),
statistics: statistics::Statistics::new(),
plan_cache: plan_cache::PlanCache::new(),
query_stats: query_stats::QueryStats::new(),
activity_provider: None,
audit_chain_provider: None,
audit_verifier: None,
slow_query_threshold_us: None,
slow_query_logger: None,
session_params: BTreeMap::new(),
trigger_recursion_depth: 0,
foreign_key_checks: true,
meta_views_materialised: false,
pending_foreign_keys: Vec::new(),
}
}
pub fn restore_envelope(buf: &[u8]) -> Result<Self, EngineError> {
match split_envelope(buf) {
EnvelopeParse::Pair {
catalog: catalog_bytes,
users: user_bytes,
publications: pub_bytes,
subscriptions: sub_bytes,
statistics: stats_bytes,
} => {
let catalog = Catalog::deserialize(catalog_bytes).map_err(EngineError::Storage)?;
let users = users::deserialize_users(user_bytes)
.map_err(|e| EngineError::Unsupported(alloc::format!("users restore: {e}")))?;
let publications = match pub_bytes {
Some(b) => publications::Publications::deserialize(b).map_err(|e| {
EngineError::Unsupported(alloc::format!("publications restore: {e:?}"))
})?,
None => publications::Publications::new(),
};
let subscriptions = match sub_bytes {
Some(b) => subscriptions::Subscriptions::deserialize(b).map_err(|e| {
EngineError::Unsupported(alloc::format!("subscriptions restore: {e:?}"))
})?,
None => subscriptions::Subscriptions::new(),
};
let statistics = match stats_bytes {
Some(b) => statistics::Statistics::deserialize(b).map_err(|e| {
EngineError::Unsupported(alloc::format!("statistics restore: {e:?}"))
})?,
None => statistics::Statistics::new(),
};
Ok(Self {
catalog,
tx_catalogs: BTreeMap::new(),
current_tx: None,
backslash_escapes: false,
next_tx_id: 1,
clock: None,
salt_fn: None,
max_query_rows: None,
max_query_bytes: None,
users,
publications,
subscriptions,
statistics,
plan_cache: plan_cache::PlanCache::new(),
query_stats: query_stats::QueryStats::new(),
activity_provider: None,
audit_chain_provider: None,
audit_verifier: None,
slow_query_threshold_us: None,
slow_query_logger: None,
session_params: BTreeMap::new(),
trigger_recursion_depth: 0,
foreign_key_checks: true,
meta_views_materialised: false,
pending_foreign_keys: Vec::new(),
})
}
EnvelopeParse::CrcMismatch { expected, computed } => {
Err(EngineError::Storage(StorageError::Corrupt(alloc::format!(
"snapshot envelope CRC32 mismatch (expected={expected:#010x}, computed={computed:#010x})"
))))
}
EnvelopeParse::Bare => {
let catalog = Catalog::deserialize(buf).map_err(EngineError::Storage)?;
Ok(Self::restore(catalog))
}
}
}
pub const fn users(&self) -> &UserStore {
&self.users
}
#[must_use]
pub const fn with_clock(mut self, clock: ClockFn) -> Self {
self.clock = Some(clock);
self
}
#[must_use]
pub const fn with_salt_fn(mut self, f: SaltFn) -> Self {
self.salt_fn = Some(f);
self
}
#[must_use]
pub const fn with_max_query_rows(mut self, n: usize) -> Self {
self.max_query_rows = Some(n);
self
}
#[must_use]
pub const fn with_max_query_bytes(mut self, n: usize) -> Self {
self.max_query_bytes = Some(n);
self
}
pub const fn catalog(&self) -> &Catalog {
&self.catalog
}
pub fn snapshot(&self) -> Vec<u8> {
if self.users.is_empty()
&& self.publications.is_empty()
&& self.subscriptions.is_empty()
&& self.statistics.is_empty()
{
self.catalog.serialize()
} else {
build_envelope(
&self.catalog.serialize(),
&users::serialize_users(&self.users),
&self.publications.serialize(),
&self.subscriptions.serialize(),
&self.statistics.serialize(),
)
}
}
pub fn in_transaction(&self) -> bool {
!self.tx_catalogs.is_empty()
}
pub fn alloc_tx_id(&mut self) -> TxId {
let id = TxId(self.next_tx_id);
self.next_tx_id = self.next_tx_id.saturating_add(1);
id
}
pub fn replace_catalog(&mut self, catalog: Catalog) {
self.catalog = catalog;
}
pub fn freeze_oldest_to_cold(
&mut self,
table_name: &str,
index_name: &str,
max_rows: usize,
) -> Result<spg_storage::FreezeReport, EngineError> {
let report = self
.active_catalog_mut()
.freeze_oldest_to_cold(table_name, index_name, max_rows)
.map_err(EngineError::Storage)?;
if let Some(t) = self.active_catalog_mut().get_mut(table_name) {
t.mark_cold_row_count_stale();
}
Ok(report)
}
pub fn receive_cold_segment(
&mut self,
segment_id: u32,
bytes: Vec<u8>,
) -> Result<(), EngineError> {
let mut new_cat = self.catalog.clone();
match new_cat.load_segment_bytes_at(segment_id, bytes) {
Ok(()) => {
self.replace_catalog(new_cat);
Ok(())
}
Err(StorageError::Corrupt(msg)) if msg.contains("already occupied") => Ok(()),
Err(e) => Err(EngineError::Storage(e)),
}
}
pub(crate) fn active_catalog(&self) -> &Catalog {
match self.current_tx {
Some(t) => self
.tx_catalogs
.get(&t)
.map_or(&self.catalog, |s| &s.catalog),
None => &self.catalog,
}
}
fn active_catalog_mut(&mut self) -> &mut Catalog {
let tx = self.current_tx;
match tx {
Some(t) => match self.tx_catalogs.get_mut(&t) {
Some(s) => &mut s.catalog,
None => &mut self.catalog,
},
None => &mut self.catalog,
}
}
fn enforce_row_limit(
&self,
result: Result<QueryResult, EngineError>,
) -> Result<QueryResult, EngineError> {
if let Ok(QueryResult::Rows { rows, .. }) = &result {
if let Some(cap) = self.max_query_rows
&& rows.len() > cap
{
return Err(EngineError::RowLimitExceeded(cap));
}
if let Some(byte_cap) = self.max_query_bytes
&& approx_rows_bytes(rows) > byte_cap
{
return Err(EngineError::QueryBytesExceeded(byte_cap));
}
}
result
}
}
#[derive(Debug, Clone)]
pub struct TableMemoryStats {
pub name: String,
pub hot_rows: u64,
pub cold_rows: u64,
pub hot_encoded_bytes: u64,
pub approx_resident_bytes: u64,
pub index_count: u64,
pub approx_index_bytes: u64,
}
#[derive(Debug, Clone)]
pub struct MemoryStats {
pub tables: Vec<TableMemoryStats>,
pub total_hot_encoded_bytes: u64,
pub total_approx_resident_bytes: u64,
pub total_approx_index_bytes: u64,
pub max_query_bytes: Option<usize>,
pub wal_bytes: Option<u64>,
}
const fn is_internal_table_name(_name: &str) -> bool {
false
}
#[cfg(test)]
mod tests;