use crate::function::{Deterministic, Func};
use crate::incremental::view::IncrementalView;
use crate::incremental::{compiler::DBSP_CIRCUIT_VERSION, operator::create_dbsp_state_index};
use crate::index_method::{IndexMethodAttachment, IndexMethodConfiguration};
use crate::return_if_io;
use crate::stats::AnalyzeStats;
use crate::sync::RwLock;
use crate::translate::emitter::Resolver;
use crate::translate::expr::{
bind_and_rewrite_expr, walk_expr, walk_expr_mut, BindingBehavior, WalkControl,
};
use crate::translate::index::{resolve_index_method_parameters, resolve_sorted_columns};
use crate::translate::planner::ROWID_STRS;
use crate::types::IOResult;
use crate::util::{exprs_are_equivalent, normalize_ident};
use crate::vdbe::affinity::Affinity;
use crate::vdbe::CursorID;
use crate::{turso_assert, turso_debug_assert};
use smallvec::SmallVec;
use turso_macros::AtomicEnum;
#[derive(Debug, Clone, AtomicEnum)]
pub enum ViewState {
Ready,
InProgress,
}
#[derive(Debug)]
pub struct View {
pub name: String,
pub sql: String,
pub select_stmt: ast::Select,
pub columns: Vec<Column>,
pub state: AtomicViewState,
}
impl View {
fn new(name: String, sql: String, select_stmt: ast::Select, columns: Vec<Column>) -> Self {
Self {
name,
sql,
select_stmt,
columns,
state: AtomicViewState::new(ViewState::Ready),
}
}
pub fn process(&self) -> Result<()> {
let state = self.state.get();
match state {
ViewState::InProgress => {
bail_parse_error!("view {} is circularly defined", self.name)
}
ViewState::Ready => {
self.state.set(ViewState::InProgress);
Ok(())
}
}
}
pub fn done(&self) {
let state = self.state.get();
match state {
ViewState::InProgress => {
self.state.set(ViewState::Ready);
}
ViewState::Ready => {}
}
}
}
impl Clone for View {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
sql: self.sql.clone(),
select_stmt: self.select_stmt.clone(),
columns: self.columns.clone(),
state: AtomicViewState::new(ViewState::Ready),
}
}
}
pub type ViewsMap = HashMap<String, Arc<View>>;
#[derive(Debug, Clone)]
pub struct Trigger {
pub name: String,
pub sql: String,
pub table_name: String,
pub time: turso_parser::ast::TriggerTime,
pub event: turso_parser::ast::TriggerEvent,
pub for_each_row: bool,
pub when_clause: Option<turso_parser::ast::Expr>,
pub commands: Vec<turso_parser::ast::TriggerCmd>,
pub temporary: bool,
pub target_database_id: Option<usize>,
}
impl Trigger {
#[allow(clippy::too_many_arguments)]
pub fn new(
name: String,
sql: String,
table_name: String,
time: Option<turso_parser::ast::TriggerTime>,
event: turso_parser::ast::TriggerEvent,
for_each_row: bool,
when_clause: Option<turso_parser::ast::Expr>,
commands: Vec<turso_parser::ast::TriggerCmd>,
temporary: bool,
target_database_id: Option<usize>,
) -> Self {
Self {
name,
sql,
table_name,
time: time.unwrap_or(turso_parser::ast::TriggerTime::Before),
event,
for_each_row,
when_clause,
commands,
temporary,
target_database_id,
}
}
}
use crate::storage::btree::{BTreeCursor, CursorTrait};
use crate::sync::Arc;
use crate::sync::Mutex;
use crate::translate::collate::CollationSeq;
use crate::translate::plan::{BitSet, ColumnMask, Plan, TableReferences};
use crate::util::{
module_args_from_sql, module_name_from_sql, type_from_name, UnparsedFromSqlIndex,
};
use crate::Result;
use crate::{
bail_parse_error, contains_ignore_ascii_case, eq_ignore_ascii_case, match_ignore_ascii_case,
LimboError, MvCursor, Pager, SymbolTable, ValueRef, VirtualTable,
};
use bitflags::bitflags;
use core::fmt;
use rustc_hash::{FxBuildHasher, FxHashMap as HashMap, FxHashSet as HashSet};
use std::collections::VecDeque;
use std::ops::Deref;
use std::sync::OnceLock;
use tracing::trace;
use turso_parser::ast::{
self, ColumnDefinition, Expr, InitDeferredPred, Literal, Name, RefAct, ResolveType, SortOrder,
TableInternalId, TypeOperator,
};
use turso_parser::{
ast::{Cmd, CreateTableBody, ResultColumn, Stmt},
parser::Parser,
};
pub const SCHEMA_TABLE_NAME: &str = "sqlite_schema";
pub const SCHEMA_TABLE_NAME_ALT: &str = "sqlite_master";
pub const TEMP_SCHEMA_TABLE_NAME: &str = "sqlite_temp_schema";
pub const TEMP_SCHEMA_TABLE_NAME_ALT: &str = "sqlite_temp_master";
pub const SQLITE_SEQUENCE_TABLE_NAME: &str = "sqlite_sequence";
pub const TURSO_TYPES_TABLE_NAME: &str = "__turso_internal_types";
pub const DBSP_TABLE_PREFIX: &str = "__turso_internal_dbsp_state_v";
pub const TURSO_INTERNAL_PREFIX: &str = "__turso_internal_";
use crate::util::quote_identifier as quote_ident;
pub fn rewrite_value_to_column(expr: &ast::Expr, col_name: &str) -> Box<ast::Expr> {
let mut cloned = expr.clone();
let _ = walk_expr_mut(&mut cloned, &mut |e| {
if let ast::Expr::Id(name) = e {
if name.as_str().eq_ignore_ascii_case("value") {
*e = ast::Expr::Id(ast::Name::exact(col_name.to_string()));
}
}
Ok(WalkControl::Continue)
});
Box::new(cloned)
}
#[derive(Debug, Clone)]
pub struct StructFieldDef {
pub name: String,
pub base_affinity: Affinity,
pub type_name: String,
}
#[derive(Debug, Clone)]
pub struct StructDef {
pub fields: Vec<StructFieldDef>,
}
#[derive(Debug, Clone)]
pub struct UnionVariantDef {
pub tag_name: String,
pub tag_index: u8,
pub base_affinity: Affinity,
pub type_name: String,
}
#[derive(Debug, Clone)]
pub struct UnionDef {
pub variants: Vec<UnionVariantDef>,
pub tag_names: Arc<[String]>,
}
#[derive(Debug, Clone)]
pub enum TypeDefKind {
Custom {
params: Vec<ast::TypeParam>,
base: String,
encode: Option<Box<ast::Expr>>,
decode: Option<Box<ast::Expr>>,
operators: Vec<TypeOperator>,
default: Option<Box<ast::Expr>>,
},
Struct(StructDef),
Union(UnionDef),
}
#[derive(Debug, Clone)]
pub struct ResolvedType {
pub primitive: String,
pub chain: Vec<Arc<TypeDef>>,
}
impl ResolvedType {
pub fn leaf(&self) -> &TypeDef {
&self.chain[0]
}
pub fn is_domain(&self) -> bool {
self.chain[0].is_domain
}
pub fn default_expr(&self) -> Option<&ast::Expr> {
self.chain.iter().find_map(|td| td.default_expr())
}
}
#[derive(Debug, Clone)]
pub struct TypeDef {
pub name: String,
pub is_builtin: bool,
pub not_null: bool,
pub is_domain: bool,
pub sql: String,
pub domain_checks: Vec<ast::DomainConstraint>,
pub kind: TypeDefKind,
}
impl TypeDef {
pub fn is_struct(&self) -> bool {
matches!(self.kind, TypeDefKind::Struct(_))
}
pub fn is_union(&self) -> bool {
matches!(self.kind, TypeDefKind::Union(_))
}
pub fn struct_def(&self) -> Option<&StructDef> {
match &self.kind {
TypeDefKind::Struct(sd) => Some(sd),
_ => None,
}
}
pub fn union_def(&self) -> Option<&UnionDef> {
match &self.kind {
TypeDefKind::Union(ud) => Some(ud),
_ => None,
}
}
pub fn encode(&self) -> Option<&ast::Expr> {
match &self.kind {
TypeDefKind::Custom { encode, .. } => encode.as_deref(),
_ => None,
}
}
pub fn decode(&self) -> Option<&ast::Expr> {
match &self.kind {
TypeDefKind::Custom { decode, .. } => decode.as_deref(),
_ => None,
}
}
pub fn base(&self) -> &str {
match &self.kind {
TypeDefKind::Custom { base, .. } => base,
TypeDefKind::Struct(_) | TypeDefKind::Union(_) => "blob",
}
}
pub fn params(&self) -> &[ast::TypeParam] {
match &self.kind {
TypeDefKind::Custom { params, .. } => params,
_ => &[],
}
}
pub fn operators(&self) -> &[TypeOperator] {
match &self.kind {
TypeDefKind::Custom { operators, .. } => operators,
_ => &[],
}
}
pub fn default_expr(&self) -> Option<&ast::Expr> {
match &self.kind {
TypeDefKind::Custom { default, .. } => default.as_deref(),
_ => None,
}
}
pub fn find_struct_field(&self, name: &str) -> Option<(usize, &StructFieldDef)> {
self.struct_def().and_then(|sd| {
sd.fields
.iter()
.enumerate()
.find(|(_, f)| f.name.eq_ignore_ascii_case(name))
})
}
pub fn resolve_union_tag_index(&self, tag_name: &str) -> Option<u8> {
self.find_union_variant(tag_name).map(|(idx, _)| idx)
}
pub fn find_union_variant(&self, name: &str) -> Option<(u8, &UnionVariantDef)> {
self.union_def().and_then(|ud| {
ud.variants
.iter()
.find(|v| v.tag_name.eq_ignore_ascii_case(name))
.map(|v| (v.tag_index, v))
})
}
pub fn from_create_type(
type_name: &str,
body: &ast::CreateTypeBody,
is_builtin: bool,
sql: String,
) -> crate::Result<Self> {
Ok(match body {
ast::CreateTypeBody::CustomType {
params,
base,
encode,
decode,
operators,
default,
} => Self {
name: type_name.to_string(),
is_builtin,
not_null: false,
is_domain: false,
sql,
domain_checks: Vec::new(),
kind: TypeDefKind::Custom {
params: params.clone(),
base: base.clone(),
encode: encode.clone(),
decode: decode.clone(),
operators: operators.clone(),
default: default.clone(),
},
},
ast::CreateTypeBody::Struct(fields) => {
let struct_fields: Vec<StructFieldDef> = fields
.iter()
.map(|f| StructFieldDef {
name: f.name.to_string(),
base_affinity: Affinity::affinity(&f.field_type.name),
type_name: f.field_type.name.clone(),
})
.collect();
Self {
name: type_name.to_string(),
is_builtin,
not_null: false,
is_domain: false,
sql,
domain_checks: Vec::new(),
kind: TypeDefKind::Struct(StructDef {
fields: struct_fields,
}),
}
}
ast::CreateTypeBody::Union(fields) => {
if fields.len() > 256 {
return Err(crate::LimboError::ParseError(format!(
"UNION type cannot have more than 256 variants (got {})",
fields.len()
)));
}
let variants: Vec<UnionVariantDef> = fields
.iter()
.enumerate()
.map(|(i, f)| UnionVariantDef {
tag_name: f.name.to_string(),
tag_index: i as u8,
base_affinity: Affinity::affinity(&f.field_type.name),
type_name: f.field_type.name.clone(),
})
.collect();
Self {
name: type_name.to_string(),
is_builtin,
not_null: false,
is_domain: false,
sql,
domain_checks: Vec::new(),
kind: TypeDefKind::Union(UnionDef {
tag_names: variants
.iter()
.map(|v| v.tag_name.clone())
.collect::<Vec<_>>()
.into(),
variants,
}),
}
}
})
}
pub fn from_domain(
domain_name: &str,
base_type: &str,
not_null: bool,
constraints: &[ast::DomainConstraint],
default: Option<Box<ast::Expr>>,
sql: String,
) -> Self {
Self {
name: domain_name.to_string(),
is_builtin: false,
not_null,
is_domain: true,
sql,
domain_checks: constraints.to_vec(),
kind: TypeDefKind::Custom {
params: Vec::new(),
base: base_type.to_string(),
encode: None,
decode: None,
operators: Vec::new(),
default,
},
}
}
pub fn value_input_type(&self) -> &str {
for p in self.params() {
if p.name.eq_ignore_ascii_case("value") {
return p.ty.as_deref().unwrap_or_else(|| self.base());
}
}
self.base()
}
pub fn user_params(&self) -> impl Iterator<Item = &turso_parser::ast::TypeParam> {
self.params()
.iter()
.filter(|p| !p.name.eq_ignore_ascii_case("value"))
}
pub fn to_sql(&self) -> &str {
&self.sql
}
}
struct MakeFromBtreeAccumulators {
from_sql_indexes: Vec<UnparsedFromSqlIndex>,
automatic_indices: HashMap<String, Vec<(String, i64)>>,
dbsp_state_roots: HashMap<String, i64>,
dbsp_state_index_roots: HashMap<String, i64>,
materialized_view_info: HashMap<String, (String, i64)>,
}
#[derive(Default, Debug)]
pub enum MakeFromBtreePhase {
#[default]
Init,
Rewinding,
FetchingRecord,
Advancing,
Done,
}
pub struct MakeFromBtreeState {
phase: MakeFromBtreePhase,
cursor: Option<BTreeCursor>,
accumulators: Option<MakeFromBtreeAccumulators>,
read_tx_active: bool,
}
impl Default for MakeFromBtreeState {
fn default() -> Self {
Self::new()
}
}
impl MakeFromBtreeState {
pub fn new() -> Self {
Self {
phase: MakeFromBtreePhase::Init,
cursor: None,
accumulators: None,
read_tx_active: false,
}
}
pub fn cleanup(&mut self, pager: &Pager) {
if self.read_tx_active {
pager.end_read_tx();
self.read_tx_active = false;
}
self.cursor = None;
self.accumulators = None;
}
}
pub const ROWID_SENTINEL: usize = usize::MAX;
pub const EXPR_INDEX_SENTINEL: usize = usize::MAX;
pub const RESERVED_TABLE_PREFIXES: [&str; 2] = ["sqlite_", "__turso_internal_"];
pub fn is_system_table(table_name: &str) -> bool {
RESERVED_TABLE_PREFIXES
.iter()
.any(|prefix| table_name.to_lowercase().starts_with(prefix))
}
pub fn allow_user_dml(table_name: &str) -> bool {
const NAMES: [&str; 2] = [SCHEMA_TABLE_NAME, SCHEMA_TABLE_NAME_ALT];
!(NAMES.iter().any(|n| n.eq_ignore_ascii_case(table_name))
|| table_name.starts_with(TURSO_INTERNAL_PREFIX)) }
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SchemaObjectType {
Table,
View,
Index,
}
#[derive(Debug)]
pub struct Schema {
pub tables: HashMap<String, Arc<Table>>,
pub materialized_view_names: HashSet<String>,
pub materialized_view_sql: HashMap<String, String>,
pub incremental_views: HashMap<String, Arc<Mutex<IncrementalView>>>,
pub views: ViewsMap,
pub triggers: HashMap<String, VecDeque<Arc<Trigger>>>,
pub indexes: HashMap<String, VecDeque<Arc<Index>>>,
pub has_indexes: HashSet<String>,
pub schema_version: u32,
pub analyze_stats: AnalyzeStats,
pub table_to_materialized_views: HashMap<String, Vec<String>>,
pub incompatible_views: HashSet<String>,
pub dropped_root_pages: HashSet<i64>,
pub type_registry: HashMap<String, Arc<TypeDef>>,
pub generated_columns_enabled: bool,
}
impl Default for Schema {
fn default() -> Self {
Self::new()
}
}
fn bootstrap_builtin_types(registry: &mut HashMap<String, Arc<TypeDef>>) -> crate::Result<()> {
use turso_parser::ast::{Cmd, Stmt};
use turso_parser::parser::Parser;
let type_sqls: &[&str] = &[
#[cfg(feature = "uuid")]
"CREATE TYPE uuid(value text) BASE blob ENCODE uuid_blob(value) DECODE uuid_str(value) DEFAULT uuid4_str() OPERATOR '<'",
"CREATE TYPE boolean(value any) BASE integer ENCODE boolean_to_int(value) DECODE CASE WHEN value THEN 1 ELSE 0 END OPERATOR '<'",
#[cfg(feature = "json")]
"CREATE TYPE json(value text) BASE text ENCODE json(value) DECODE value",
#[cfg(feature = "json")]
"CREATE TYPE jsonb(value text) BASE blob ENCODE jsonb(value) DECODE json(value)",
"CREATE TYPE varchar(value text, maxlen integer) BASE text ENCODE CASE WHEN length(value) <= maxlen THEN value ELSE RAISE(ABORT, 'value too long for varchar') END DECODE value OPERATOR '<'",
"CREATE TYPE date(value text) BASE text ENCODE CASE WHEN value IS NULL THEN NULL WHEN date(value) IS NULL THEN RAISE(ABORT, 'invalid date value') ELSE date(value) END DECODE value OPERATOR '<'",
"CREATE TYPE time(value text) BASE text ENCODE CASE WHEN value IS NULL THEN NULL WHEN time(value) IS NULL THEN RAISE(ABORT, 'invalid time value') ELSE time(value) END DECODE value OPERATOR '<'",
"CREATE TYPE timestamp(value text) BASE text ENCODE CASE WHEN value IS NULL THEN NULL WHEN datetime(value) IS NULL THEN RAISE(ABORT, 'invalid timestamp value') ELSE datetime(value) END DECODE value OPERATOR '<'",
"CREATE TYPE smallint(value integer) BASE integer ENCODE CASE WHEN value BETWEEN -32768 AND 32767 THEN value ELSE RAISE(ABORT, 'integer out of range for smallint') END DECODE value OPERATOR '<'",
"CREATE TYPE bigint(value integer) BASE integer",
"CREATE TYPE inet(value text) BASE text ENCODE validate_ipaddr(value) DECODE value",
"CREATE TYPE bytea(value blob) BASE blob OPERATOR '<'",
"CREATE TYPE numeric(value any, precision integer, scale integer) BASE blob ENCODE numeric_encode(value, precision, scale) DECODE numeric_decode(value) OPERATOR '+' numeric_add OPERATOR '-' numeric_sub OPERATOR '*' numeric_mul OPERATOR '/' numeric_div OPERATOR '<' numeric_lt OPERATOR '=' numeric_eq",
];
for sql in type_sqls {
let mut parser = Parser::new(sql.as_bytes());
let Ok(Some(Cmd::Stmt(Stmt::CreateType {
type_name, body, ..
}))) = parser.next_cmd()
else {
return Err(crate::LimboError::InternalError(format!(
"failed to parse built-in type SQL: {sql}"
)));
};
let type_def = TypeDef::from_create_type(&type_name, &body, true, sql.to_string())?;
registry.insert(type_name.to_lowercase(), Arc::new(type_def));
}
let aliases: &[(&str, &str)] = &[
("bool", "boolean"),
("int2", "smallint"),
("int8", "bigint"),
];
for (alias, target) in aliases {
if let Some(type_def) = registry.get(*target).cloned() {
registry.insert(alias.to_string(), type_def);
}
}
Ok(())
}
impl Schema {
fn normalize_table_lookup_name(&self, name: &str) -> String {
let name = normalize_ident(name);
if name.eq(SCHEMA_TABLE_NAME_ALT)
|| name.eq(TEMP_SCHEMA_TABLE_NAME)
|| name.eq(TEMP_SCHEMA_TABLE_NAME_ALT)
{
SCHEMA_TABLE_NAME.to_string()
} else {
name
}
}
pub fn new() -> Self {
Self::with_options(true).expect("built-in type definitions are malformed")
}
pub fn with_options(enable_custom_types: bool) -> crate::Result<Self> {
let mut tables: HashMap<String, Arc<Table>> = HashMap::default();
let has_indexes = HashSet::default();
let indexes: HashMap<String, VecDeque<Arc<Index>>> = HashMap::default();
#[allow(clippy::arc_with_non_send_sync)]
tables.insert(
SCHEMA_TABLE_NAME.to_string(),
Arc::new(Table::BTree(sqlite_schema_table().into())),
);
for function in VirtualTable::builtin_functions(enable_custom_types) {
tables.insert(
function.name.to_owned(),
Arc::new(Table::Virtual(Arc::new((*function).clone()))),
);
}
let materialized_view_names = HashSet::default();
let materialized_view_sql = HashMap::default();
let incremental_views = HashMap::default();
let views: ViewsMap = HashMap::default();
let triggers = HashMap::default();
let table_to_materialized_views: HashMap<String, Vec<String>> = HashMap::default();
let incompatible_views = HashSet::default();
let mut type_registry = HashMap::default();
if enable_custom_types {
bootstrap_builtin_types(&mut type_registry)?;
}
Ok(Self {
tables,
materialized_view_names,
materialized_view_sql,
incremental_views,
views,
triggers,
indexes,
has_indexes,
schema_version: 0,
analyze_stats: AnalyzeStats::default(),
table_to_materialized_views,
incompatible_views,
dropped_root_pages: HashSet::default(),
type_registry,
generated_columns_enabled: false,
})
}
pub fn get_type_def(&self, type_name: &str, is_strict: bool) -> Option<&Arc<TypeDef>> {
if !is_strict {
return None;
}
self.type_registry.get(&type_name.to_lowercase())
}
pub fn get_type_def_unchecked(&self, type_name: &str) -> Option<&Arc<TypeDef>> {
self.type_registry.get(&type_name.to_lowercase())
}
pub fn resolve_type(
&self,
type_name: &str,
is_strict: bool,
) -> crate::Result<Option<ResolvedType>> {
if !is_strict {
return Ok(None);
}
self.resolve_type_unchecked(type_name)
}
pub fn resolve_type_unchecked(&self, type_name: &str) -> crate::Result<Option<ResolvedType>> {
let key = type_name.to_lowercase();
if !self.type_registry.contains_key(&key) {
return Ok(None);
}
let (primitive, chain) = self.resolve_base_type_chain(type_name)?;
Ok(Some(ResolvedType { primitive, chain }))
}
pub fn remove_type(&mut self, type_name: &str) {
self.type_registry.remove(&type_name.to_lowercase());
}
pub fn resolve_base_type_chain(
&self,
type_name: &str,
) -> crate::Result<(String, Vec<Arc<TypeDef>>)> {
let mut chain = Vec::new();
let mut visited = std::collections::HashSet::new();
let mut current = type_name.to_lowercase();
loop {
if !visited.insert(current.clone()) {
return Err(crate::LimboError::ParseError(format!(
"circular type dependency detected: {current}"
)));
}
match self.type_registry.get(¤t) {
Some(td) => {
chain.push(Arc::clone(td));
current = td.base().to_lowercase();
}
None => {
return Ok((current, chain));
}
}
}
}
pub fn add_type_from_sql(&mut self, sql: &str) -> crate::Result<()> {
use turso_parser::ast::{Cmd, Stmt};
use turso_parser::parser::Parser;
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next_cmd();
match cmd {
Ok(Some(Cmd::Stmt(Stmt::CreateType {
type_name, body, ..
}))) => {
let type_def =
TypeDef::from_create_type(&type_name, &body, false, sql.to_string())?;
self.type_registry
.insert(type_name.to_lowercase(), Arc::new(type_def));
}
Ok(Some(Cmd::Stmt(Stmt::CreateDomain {
domain_name,
base_type,
default,
not_null,
constraints,
..
}))) => {
let type_def = TypeDef::from_domain(
&domain_name,
&base_type,
not_null,
&constraints,
default,
sql.to_string(),
);
self.type_registry
.insert(domain_name.to_lowercase(), Arc::new(type_def));
}
_ => {
return Err(crate::LimboError::ParseError(format!(
"invalid type sql: {sql}"
)));
}
}
Ok(())
}
pub fn load_type_definitions(&mut self, type_sqls: &[String]) -> crate::Result<()> {
for sql in type_sqls {
self.add_type_from_sql(sql)?;
}
self.resolve_all_custom_type_affinities();
Ok(())
}
pub fn resolve_all_custom_type_affinities(&mut self) {
let mut tables: SmallVec<[(String, Arc<Table>); 8]> = SmallVec::with_capacity(8);
for (name, table) in self.tables.iter().filter(|(_, t)| {
t.is_strict()
&& t.btree().is_some_and(|bt| {
bt.columns
.iter()
.any(|c| self.get_type_def_unchecked(&c.ty_str).is_some())
})
}) {
let bt = table.btree().expect("checked btree table");
let mut modified = (*bt).clone();
modified.resolve_custom_type_affinities(self);
modified.propagate_domain_constraints(self);
tables.push((name.clone(), Arc::new(Table::BTree(Arc::new(modified)))));
}
for (name, table) in tables {
self.tables.insert(name, table);
}
}
pub fn is_unique_idx_name(&self, name: &str) -> bool {
!self
.indexes
.iter()
.any(|idx| idx.1.iter().any(|i| i.name == name))
}
pub fn add_materialized_view(&mut self, view: IncrementalView, table: Arc<Table>, sql: String) {
let name = normalize_ident(view.name());
self.tables.insert(name.clone(), table);
self.materialized_view_names.insert(name.clone());
self.materialized_view_sql.insert(name.clone(), sql);
self.incremental_views
.insert(name, Arc::new(Mutex::new(view)));
}
pub fn get_materialized_view(&self, name: &str) -> Option<Arc<Mutex<IncrementalView>>> {
let name = normalize_ident(name);
self.incremental_views.get(&name).cloned()
}
pub fn has_compatible_dbsp_state_table(&self, view_name: &str) -> bool {
let view_name = normalize_ident(view_name);
let expected_table_name = format!("{DBSP_TABLE_PREFIX}{DBSP_CIRCUIT_VERSION}_{view_name}");
self.tables.contains_key(&expected_table_name)
}
pub fn is_materialized_view(&self, name: &str) -> bool {
let name = normalize_ident(name);
self.materialized_view_names.contains(&name)
}
pub fn with_incompatible_dependent_views<F, T>(&self, table_name: &str, f: F) -> T
where
F: FnOnce(&[&String]) -> T,
{
let table_name = normalize_ident(table_name);
let mut views: SmallVec<[&String; 8]> = SmallVec::with_capacity(8);
if let Some(v) = self.table_to_materialized_views.get(&table_name) {
v.iter()
.filter(|name| self.incompatible_views.contains(&**name))
.for_each(|n| views.push(n));
}
f(&views)
}
pub fn remove_view(&mut self, name: &str) -> Result<()> {
let name = normalize_ident(name);
if self.views.contains_key(&name) {
self.views.remove(&name);
Ok(())
} else if self.materialized_view_names.contains(&name) {
self.tables.remove(&name);
let dbsp_table_name = format!("{DBSP_TABLE_PREFIX}{DBSP_CIRCUIT_VERSION}_{name}");
self.tables.remove(&dbsp_table_name);
self.remove_indices_for_table(&dbsp_table_name);
self.materialized_view_names.remove(&name);
self.materialized_view_sql.remove(&name);
self.incremental_views.remove(&name);
for views in self.table_to_materialized_views.values_mut() {
views.retain(|v| v != &name);
}
Ok(())
} else {
Err(crate::LimboError::ParseError(format!(
"no such view: {name}"
)))
}
}
pub fn add_materialized_view_dependency(&mut self, table_name: &str, view_name: &str) {
let table_name = normalize_ident(table_name);
let view_name = normalize_ident(view_name);
self.table_to_materialized_views
.entry(table_name)
.or_default()
.push(view_name);
}
pub fn get_dependent_materialized_views(&self, table_name: &str) -> Vec<String> {
if self.table_to_materialized_views.is_empty() {
return Vec::new();
}
let table_name = normalize_ident(table_name);
self.table_to_materialized_views
.get(&table_name)
.cloned()
.unwrap_or_default()
}
pub fn add_view(&mut self, view: View) -> Result<()> {
self.check_object_name_conflict(&view.name)?;
let name = normalize_ident(&view.name);
self.views.insert(name, Arc::new(view));
Ok(())
}
pub fn get_view(&self, name: &str) -> Option<Arc<View>> {
let name = normalize_ident(name);
self.views.get(&name).cloned()
}
pub fn add_trigger(&mut self, trigger: Trigger, table_name: &str) -> Result<()> {
let table_name = normalize_ident(table_name);
self.triggers
.entry(table_name)
.or_default()
.push_front(Arc::new(trigger));
Ok(())
}
pub fn remove_trigger(&mut self, name: &str) -> Result<()> {
let name = normalize_ident(name);
let mut removed = false;
for triggers_list in self.triggers.values_mut() {
for i in 0..triggers_list.len() {
let trigger = &triggers_list[i];
if normalize_ident(&trigger.name) == name {
removed = true;
triggers_list.remove(i);
break;
}
}
if removed {
break;
}
}
if !removed {
return Err(crate::LimboError::ParseError(format!(
"no such trigger: {name}"
)));
}
Ok(())
}
pub fn remove_triggers_for_table(&mut self, table_name: &str) {
let table_name = normalize_ident(table_name);
self.triggers.remove(&table_name);
}
pub fn remove_triggers_for_table_with_db(&mut self, table_name: &str, target_db: usize) {
let table_name = normalize_ident(table_name);
let Some(bucket) = self.triggers.get_mut(&table_name) else {
return;
};
let has_shadow_table = self.tables.contains_key(&table_name);
bucket.retain(|trigger| {
match trigger.target_database_id {
Some(db) => db != target_db,
None => has_shadow_table,
}
});
if bucket.is_empty() {
self.triggers.remove(&table_name);
}
}
pub fn get_trigger_for_table(&self, table_name: &str, name: &str) -> Option<Arc<Trigger>> {
let table_name = normalize_ident(table_name);
let name = normalize_ident(name);
self.triggers
.get(&table_name)
.and_then(|triggers| triggers.iter().find(|t| t.name == name).cloned())
}
pub fn get_triggers_for_table(
&self,
table_name: &str,
) -> impl Iterator<Item = &Arc<Trigger>> + Clone {
let table_name = normalize_ident(table_name);
self.triggers
.get(&table_name)
.map(|triggers| triggers.iter())
.unwrap_or_default()
}
pub fn get_trigger(&self, name: &str) -> Option<Arc<Trigger>> {
let name = normalize_ident(name);
self.triggers
.values()
.flatten()
.find(|t| t.name == name)
.cloned()
}
pub fn add_btree_table(&mut self, table: Arc<BTreeTable>) -> Result<()> {
self.check_object_name_conflict(&table.name)?;
let name = normalize_ident(&table.name);
self.tables.insert(name, Table::BTree(table).into());
Ok(())
}
pub fn add_virtual_table(&mut self, table: Arc<VirtualTable>) -> Result<()> {
self.check_object_name_conflict(&table.name)?;
let name = normalize_ident(&table.name);
self.tables.insert(name, Table::Virtual(table).into());
Ok(())
}
pub fn get_table(&self, name: &str) -> Option<Arc<Table>> {
let name = self.normalize_table_lookup_name(name);
self.tables.get(&name).cloned()
}
pub fn remove_table(&mut self, table_name: &str) {
let name = normalize_ident(table_name);
self.tables.remove(&name);
self.analyze_stats.remove_table(&name);
if self.materialized_view_names.remove(&name) {
self.incremental_views.remove(&name);
self.materialized_view_sql.remove(&name);
}
}
pub fn get_btree_table(&self, name: &str) -> Option<Arc<BTreeTable>> {
let name = self.normalize_table_lookup_name(name);
if let Some(table) = self.tables.get(&name) {
table.btree()
} else {
None
}
}
pub fn add_index(&mut self, index: Arc<Index>) -> Result<()> {
self.check_object_name_conflict(&index.name)?;
let table_name = normalize_ident(&index.table_name);
let is_replace = index.on_conflict == Some(ResolveType::Replace);
let indexes_for_table = self.indexes.entry(table_name).or_default();
if is_replace {
let first_replace = indexes_for_table
.iter()
.position(|idx| idx.on_conflict == Some(ResolveType::Replace));
let pos = first_replace.unwrap_or(indexes_for_table.len());
indexes_for_table.insert(pos, index);
} else {
indexes_for_table.push_front(index);
}
turso_debug_assert!(
indexes_for_table
.iter()
.position(|idx| idx.on_conflict == Some(ResolveType::Replace))
.is_none_or(|first_replace| {
indexes_for_table
.iter()
.skip(first_replace)
.all(|idx| idx.on_conflict == Some(ResolveType::Replace))
}),
"REPLACE indexes must form a contiguous suffix"
);
Ok(())
}
pub fn get_indices(&self, table_name: &str) -> impl Iterator<Item = &Arc<Index>> {
let name = normalize_ident(table_name);
self.indexes
.get(&name)
.map(|v| v.iter())
.unwrap_or_default()
.filter(|i| !i.is_backing_btree_index())
}
#[cfg(all(feature = "fts", not(target_family = "wasm")))]
pub fn has_fts_index(&self, table_name: &str) -> bool {
self.get_indices(table_name).any(|idx| {
idx.index_method.as_ref().is_some_and(|m| {
m.definition().method_name == crate::index_method::fts::FTS_INDEX_METHOD_NAME
})
})
}
pub fn get_index(&self, table_name: &str, index_name: &str) -> Option<&Arc<Index>> {
let name = normalize_ident(table_name);
self.indexes
.get(&name)?
.iter()
.find(|index| index.name == index_name)
}
pub fn remove_indices_for_table(&mut self, table_name: &str) {
let name = normalize_ident(table_name);
self.indexes.remove(&name);
self.analyze_stats.remove_table(&name);
}
pub fn remove_index(&mut self, idx: &Index) {
let name = normalize_ident(&idx.table_name);
self.indexes
.get_mut(&name)
.expect("Must have the index")
.retain_mut(|other_idx| other_idx.name != idx.name);
self.analyze_stats.remove_index(&name, &idx.name);
}
pub fn table_has_indexes(&self, table_name: &str) -> bool {
let name = normalize_ident(table_name);
self.has_indexes.contains(&name)
}
pub fn table_set_has_index(&mut self, table_name: &str) {
self.has_indexes.insert(table_name.to_string());
}
pub fn make_from_btree(
&mut self,
state: &mut MakeFromBtreeState,
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
pager: &Arc<Pager>,
syms: &SymbolTable,
) -> Result<IOResult<()>> {
let result = self.make_from_btree_internal(state, mv_cursor, pager, syms);
if result.is_err() {
state.cleanup(pager);
} else if let Ok(IOResult::Done(..)) = result {
turso_assert!(
!state.read_tx_active,
"make_from_btree must properly cleanup internal state in case of success"
);
}
result
}
fn make_from_btree_internal(
&mut self,
state: &mut MakeFromBtreeState,
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
pager: &Arc<Pager>,
syms: &SymbolTable,
) -> Result<IOResult<()>> {
loop {
tracing::debug!("make_from_btree: state.phase={:?}", state.phase);
match &state.phase {
MakeFromBtreePhase::Init => {
if mv_cursor.is_some() {
return Err(crate::LimboError::ParseError(
"MVCC is not supported for make_from_btree schema recovery".to_string(),
));
}
state.cursor = Some(BTreeCursor::new_table(Arc::clone(pager), 1, 10));
pager.begin_read_tx()?;
state.read_tx_active = true;
state.accumulators = Some(MakeFromBtreeAccumulators {
from_sql_indexes: Vec::with_capacity(10),
automatic_indices: HashMap::with_capacity_and_hasher(10, FxBuildHasher),
dbsp_state_roots: HashMap::default(),
dbsp_state_index_roots: HashMap::default(),
materialized_view_info: HashMap::default(),
});
state.phase = MakeFromBtreePhase::Rewinding;
}
MakeFromBtreePhase::Rewinding => {
let cursor = state
.cursor
.as_mut()
.expect("cursor must be initialized in Init phase");
return_if_io!(cursor.rewind());
state.phase = MakeFromBtreePhase::FetchingRecord;
}
MakeFromBtreePhase::FetchingRecord => {
let cursor = state
.cursor
.as_mut()
.expect("cursor must be initialized in Init phase");
let row = return_if_io!(cursor.record());
let Some(row) = row else {
pager.end_read_tx();
state.read_tx_active = false;
let acc = state
.accumulators
.take()
.expect("accumulators must be initialized in Init phase");
self.populate_indices(
syms,
acc.from_sql_indexes,
acc.automatic_indices,
mv_cursor.is_some(),
)?;
self.populate_materialized_views(
acc.materialized_view_info,
acc.dbsp_state_roots,
acc.dbsp_state_index_roots,
)?;
state.cursor = None;
state.phase = MakeFromBtreePhase::Done;
return Ok(IOResult::Done(()));
};
let ty_value = row.get_value(0)?;
let ValueRef::Text(ty) = ty_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let ValueRef::Text(name) = row.get_value(1)? else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let table_name_value = row.get_value(2)?;
let ValueRef::Text(table_name) = table_name_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let root_page_value = row.get_value(3)?;
let ValueRef::Numeric(crate::numeric::Numeric::Integer(root_page)) =
root_page_value
else {
return Err(LimboError::ConversionError("Expected integer value".into()));
};
let sql_value = row.get_value(4)?;
let sql_textref = match sql_value {
ValueRef::Text(sql) => Some(sql),
_ => None,
};
let sql = sql_textref.map(|s| s.as_str());
let acc = state
.accumulators
.as_mut()
.expect("accumulators must be initialized in Init phase");
self.handle_schema_row(
&ty,
&name,
&table_name,
root_page,
sql,
syms,
&mut acc.from_sql_indexes,
&mut acc.automatic_indices,
&mut acc.dbsp_state_roots,
&mut acc.dbsp_state_index_roots,
&mut acc.materialized_view_info,
&|_| None,
)?;
state.phase = MakeFromBtreePhase::Advancing;
}
MakeFromBtreePhase::Advancing => {
let cursor = state
.cursor
.as_mut()
.expect("cursor must be initialized in Init phase");
return_if_io!(cursor.next());
state.phase = MakeFromBtreePhase::FetchingRecord;
}
MakeFromBtreePhase::Done => {
return Ok(IOResult::Done(()));
}
}
}
}
pub fn populate_indices(
&mut self,
syms: &SymbolTable,
from_sql_indexes: Vec<UnparsedFromSqlIndex>,
automatic_indices: HashMap<String, Vec<(String, i64)>>,
mvcc_enabled: bool,
) -> Result<()> {
for unparsed_sql_from_index in from_sql_indexes {
let table = self
.get_btree_table(&unparsed_sql_from_index.table_name)
.ok_or_else(|| {
LimboError::Corrupt(format!(
"sqlite_schema contains index for missing table '{}': rootpage={} sql={}",
unparsed_sql_from_index.table_name,
unparsed_sql_from_index.root_page,
unparsed_sql_from_index.sql
))
})?;
let index = Index::from_sql(
syms,
&unparsed_sql_from_index.sql,
unparsed_sql_from_index.root_page,
table.as_ref(),
)?;
if mvcc_enabled && index.index_method.is_some() {
crate::bail_parse_error!("Custom index modules are not supported with MVCC");
}
self.add_index(Arc::new(index))?;
}
for automatic_index in automatic_indices {
let table = self.get_btree_table(&automatic_index.0).ok_or_else(|| {
LimboError::Corrupt(format!(
"sqlite_schema contains automatic index for missing table '{}': indexes={:?}",
automatic_index.0, automatic_index.1
))
})?;
let mut automatic_indexes = automatic_index.1;
automatic_indexes.reverse();
let mut pk_index_added = false;
for unique_set in &table.unique_sets {
if unique_set.is_primary_key {
assert!(table.primary_key_columns.len() == unique_set.columns.len(), "trying to add a {}-column primary key index for table {}, but the table has {} primary key columns", unique_set.columns.len(), table.name, table.primary_key_columns.len());
assert!(
!pk_index_added,
"trying to add a second primary key index for table {}",
table.name
);
pk_index_added = true;
if unique_set.columns.len() == 1 {
let col_name = &unique_set.columns.first().unwrap().0;
let Some((_, column)) = table.get_column(col_name) else {
return Err(LimboError::ParseError(format!(
"Column {col_name} not found in table {}",
table.name
)));
};
if column.is_rowid_alias() {
continue;
}
}
if let Some(index_entry) = automatic_indexes.pop() {
self.add_index(Arc::new(Index::automatic_from_primary_key(
table.as_ref(),
index_entry,
unique_set.columns.len(),
unique_set.conflict_clause,
)?))?;
} else if mvcc_enabled {
continue;
} else {
return Err(LimboError::InternalError(format!(
"Missing automatic index entry for primary key on table {}",
table.name
)));
}
} else {
let mut column_indices_and_sort_orders =
Vec::with_capacity(unique_set.columns.len());
for (col_name, sort_order) in unique_set.columns.iter() {
let Some((pos_in_table, _)) = table.get_column(col_name) else {
return Err(crate::LimboError::ParseError(format!(
"Column {} not found in table {}",
col_name, table.name
)));
};
column_indices_and_sort_orders.push((pos_in_table, *sort_order));
}
if let Some(index_entry) = automatic_indexes.pop() {
self.add_index(Arc::new(Index::automatic_from_unique(
table.as_ref(),
index_entry,
column_indices_and_sort_orders,
unique_set.conflict_clause,
)?))?;
} else if mvcc_enabled {
continue;
} else {
return Err(LimboError::InternalError(format!(
"Missing automatic index entry for UNIQUE constraint on table {}",
table.name
)));
}
}
}
if !mvcc_enabled {
assert!(automatic_indexes.is_empty(), "all automatic indexes parsed from sqlite_schema should have been consumed, but {} remain", automatic_indexes.len());
}
}
Ok(())
}
pub fn populate_materialized_views(
&mut self,
materialized_view_info: HashMap<String, (String, i64)>,
dbsp_state_roots: HashMap<String, i64>,
dbsp_state_index_roots: HashMap<String, i64>,
) -> Result<()> {
for (view_name, (sql, main_root)) in materialized_view_info {
let dbsp_state_root = if let Some(&root) = dbsp_state_roots.get(&view_name) {
root
} else {
tracing::warn!(
"Materialized view '{}' has incompatible version or missing DBSP state table",
view_name
);
self.incompatible_views.insert(view_name.clone());
0
};
let dbsp_state_index_root =
dbsp_state_index_roots.get(&view_name).copied().unwrap_or(0);
if dbsp_state_index_root > 0 && dbsp_state_root > 0 {
let mut index = create_dbsp_state_index(dbsp_state_index_root);
let dbsp_table_name =
format!("{DBSP_TABLE_PREFIX}{DBSP_CIRCUIT_VERSION}_{view_name}");
index.name = format!("sqlite_autoindex_{dbsp_table_name}_1");
index.table_name = dbsp_table_name;
if let Err(e) = self.add_index(std::sync::Arc::new(index)) {
if !e.to_string().contains("already exists") {
return Err(e);
}
}
}
let incremental_view = IncrementalView::from_sql(
&sql,
self,
main_root,
dbsp_state_root,
dbsp_state_index_root,
)?;
let referenced_tables = incremental_view.get_referenced_table_names();
let cols = incremental_view.column_schema.flat_columns();
let logical_to_physical_map =
BTreeTable::build_logical_to_physical_map(&cols, &[], true);
let table = Arc::new(Table::BTree(Arc::new(BTreeTable {
name: view_name.clone(),
root_page: main_root,
columns: cols,
primary_key_columns: Vec::new(),
has_rowid: true,
is_strict: false,
has_autoincrement: false,
foreign_keys: vec![],
check_constraints: vec![],
rowid_alias_conflict_clause: None,
unique_sets: vec![],
has_virtual_columns: false,
logical_to_physical_map,
column_dependencies: Default::default(),
})));
if !self.incompatible_views.contains(&view_name) {
self.add_materialized_view(incremental_view, table, sql);
}
for table_name in referenced_tables {
self.add_materialized_view_dependency(&table_name, &view_name);
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn handle_schema_row(
&mut self,
ty: &str,
name: &str,
table_name: &str,
root_page: i64,
maybe_sql: Option<&str>,
syms: &SymbolTable,
from_sql_indexes: &mut Vec<UnparsedFromSqlIndex>,
automatic_indices: &mut HashMap<String, Vec<(String, i64)>>,
dbsp_state_roots: &mut HashMap<String, i64>,
dbsp_state_index_roots: &mut HashMap<String, i64>,
materialized_view_info: &mut HashMap<String, (String, i64)>,
resolve_attached_db: &dyn Fn(&str) -> Option<usize>,
) -> Result<()> {
match ty {
"table" => {
let sql = maybe_sql.expect("sql should be present for table");
let sql_bytes = sql.as_bytes();
if root_page == 0 && contains_ignore_ascii_case!(sql_bytes, b"create virtual") {
let vtab = if let Some(vtab) = syms.vtabs.get(name) {
vtab.clone()
} else {
let mod_name = module_name_from_sql(sql)?;
crate::VirtualTable::table(
Some(name),
mod_name,
module_args_from_sql(sql)?,
syms,
)?
};
self.add_virtual_table(vtab)?;
} else {
let table = BTreeTable::from_sql(sql, root_page)?;
if table.has_virtual_columns && !self.generated_columns_enabled {
return Err(LimboError::ParseError(format!(
"table '{}' uses generated columns but the generated_columns feature is not enabled",
table.name
)));
}
if table.name.starts_with(DBSP_TABLE_PREFIX) {
let suffix = table.name.strip_prefix(DBSP_TABLE_PREFIX).unwrap();
if let Some(underscore_pos) = suffix.find('_') {
let version_str = &suffix[..underscore_pos];
let view_name = &suffix[underscore_pos + 1..];
if let Ok(stored_version) = version_str.parse::<u32>() {
if stored_version == DBSP_CIRCUIT_VERSION {
dbsp_state_roots.insert(view_name.to_string(), root_page);
} else {
tracing::warn!(
"Skipping materialized view '{}' - has version {} but current version is {}. DROP and recreate the view to use it.",
view_name, stored_version, DBSP_CIRCUIT_VERSION
);
}
}
}
}
let mut table = table;
table.resolve_custom_type_affinities(self);
table.propagate_domain_constraints(self);
self.add_btree_table(Arc::new(table))?;
}
}
"index" => {
match maybe_sql {
Some(sql) => {
from_sql_indexes.push(UnparsedFromSqlIndex {
table_name: table_name.to_string(),
root_page,
sql: sql.to_string(),
});
}
None => {
let index_name = name.to_string();
let table_name = table_name.to_string();
if table_name.starts_with(DBSP_TABLE_PREFIX) {
let suffix = table_name.strip_prefix(DBSP_TABLE_PREFIX).unwrap();
if let Some(underscore_pos) = suffix.find('_') {
let version_str = &suffix[..underscore_pos];
let view_name = &suffix[underscore_pos + 1..];
if let Ok(stored_version) = version_str.parse::<u32>() {
if stored_version == DBSP_CIRCUIT_VERSION {
dbsp_state_index_roots
.insert(view_name.to_string(), root_page);
}
}
}
} else {
match automatic_indices.entry(table_name) {
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(vec![(index_name, root_page)]);
}
std::collections::hash_map::Entry::Occupied(mut e) => {
e.get_mut().push((index_name, root_page));
}
}
}
}
}
}
"view" => {
use crate::schema::View;
use turso_parser::ast::{Cmd, Stmt};
use turso_parser::parser::Parser;
let sql = maybe_sql.expect("sql should be present for view");
let view_name = name.to_string();
let mut parser = Parser::new(sql.as_bytes());
if let Ok(Some(Cmd::Stmt(stmt))) = parser.next_cmd() {
match stmt {
Stmt::CreateMaterializedView { .. } => {
materialized_view_info
.insert(view_name.clone(), (sql.to_string(), root_page));
if self.incremental_views.contains_key(&view_name) {
}
}
Stmt::CreateView {
view_name: _,
columns: column_names,
select,
..
} => {
crate::util::validate_select_for_unsupported_features(&select)?;
let view_column_schema =
crate::util::extract_view_columns(&select, self)?;
let mut final_columns = view_column_schema.flat_columns();
for (i, indexed_col) in column_names.iter().enumerate() {
if let Some(col) = final_columns.get_mut(i) {
col.name = Some(indexed_col.col_name.to_string());
}
}
let view =
View::new(name.to_string(), sql.to_string(), select, final_columns);
self.add_view(view)?;
}
_ => {}
}
}
}
"trigger" => {
use turso_parser::ast::{Cmd, Stmt};
use turso_parser::parser::Parser;
let sql = maybe_sql.expect("sql should be present for trigger");
let trigger_name = name.to_string();
let mut parser = Parser::new(sql.as_bytes());
let Ok(Some(Cmd::Stmt(Stmt::CreateTrigger {
temporary,
if_not_exists: _,
trigger_name: _,
time,
event,
tbl_name,
for_each_row,
when_clause,
commands,
}))) = parser.next_cmd()
else {
return Err(crate::LimboError::ParseError(format!(
"invalid trigger sql: {sql}"
)));
};
let target_database_id = tbl_name.db_name.as_ref().map(|db_name| {
let db = db_name.as_str();
if db.eq_ignore_ascii_case("main") {
crate::MAIN_DB_ID
} else if db.eq_ignore_ascii_case("temp") {
crate::TEMP_DB_ID
} else {
resolve_attached_db(db).unwrap_or(crate::INVALID_DB_ID)
}
});
self.add_trigger(
Trigger::new(
trigger_name,
sql.to_string(),
tbl_name.name.to_string(),
time,
event,
for_each_row,
when_clause.map(|e| *e),
commands,
temporary,
target_database_id,
),
tbl_name.name.as_str(),
)?;
}
_ => {}
};
Ok(())
}
pub fn resolved_fks_referencing(&self, table_name: &str) -> Result<Vec<ResolvedFkRef>> {
let target = normalize_ident(table_name);
let parent_tbl = self
.get_btree_table(&target)
.ok_or_else(|| fk_mismatch_err("<unknown>", &target))?;
let mut out = Vec::with_capacity(4); for t in self.tables.values() {
let Some(child) = t.btree() else {
continue;
};
for fk in &child.foreign_keys {
if !fk.parent_table.eq_ignore_ascii_case(&target) {
continue;
}
out.push(self.resolve_fk(
fk,
&child,
&parent_tbl,
false,
)?);
}
}
Ok(out)
}
pub fn resolved_fks_for_child(&self, child_table: &str) -> crate::Result<Vec<ResolvedFkRef>> {
let child_name = normalize_ident(child_table);
let child = self
.get_btree_table(&child_name)
.ok_or_else(|| fk_mismatch_err(&child_name, "<unknown>"))?;
let mut out = Vec::with_capacity(child.foreign_keys.len());
for fk in &child.foreign_keys {
let parent_name = normalize_ident(&fk.parent_table);
let parent_tbl = self
.get_btree_table(&parent_name)
.ok_or_else(|| fk_mismatch_err(&child.name, &parent_name))?;
out.push(self.resolve_fk(fk, &child, &parent_tbl, true)?);
}
Ok(out)
}
fn resolve_fk(
&self,
fk: &Arc<ForeignKey>,
child: &Arc<BTreeTable>,
parent_tbl: &Arc<BTreeTable>,
require_unique: bool,
) -> Result<ResolvedFkRef> {
if fk.child_columns.is_empty() {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
let mut child_pos: Vec<usize> = Vec::with_capacity(fk.child_columns.len());
for cname in fk.child_columns.iter() {
let (i, _) = child
.get_column(cname)
.ok_or_else(|| fk_mismatch_err(&child.name, &parent_tbl.name))?;
child_pos.push(i);
}
let parent_cols: Box<[String]> = if fk.parent_columns.is_empty() {
if parent_tbl.primary_key_columns.is_empty() {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
parent_tbl
.primary_key_columns
.iter()
.map(|(col, _)| col.clone())
.collect()
} else {
fk.parent_columns.clone()
};
if parent_cols.len() != fk.child_columns.len() {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
let mut parent_pos: Vec<usize> = Vec::with_capacity(parent_cols.len());
for pc in parent_cols.iter() {
let pos = parent_tbl.get_column(pc).map(|(i, _)| i).or_else(|| {
ROWID_STRS
.iter()
.any(|r| pc.eq_ignore_ascii_case(r))
.then_some(0)
});
let Some(p) = pos else {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
};
parent_pos.push(p);
}
let parent_uses_rowid = parent_cols.len() == 1 && {
let pc = parent_cols[0].as_str();
ROWID_STRS.iter().any(|r| pc.eq_ignore_ascii_case(r))
|| parent_tbl.columns.iter().any(|col| {
col.is_rowid_alias()
&& col
.name
.as_deref()
.is_some_and(|n| n.eq_ignore_ascii_case(pc))
})
};
let parent_unique_index = if parent_uses_rowid {
None
} else {
let found = self
.get_indices(&parent_tbl.name)
.find(|idx| {
idx.unique
&& idx.where_clause.is_none()
&& idx.columns.len() == parent_cols.len()
&& idx
.columns
.iter()
.zip(parent_cols.iter())
.all(|(ic, pc)| ic.name.eq_ignore_ascii_case(pc))
})
.cloned();
if require_unique && found.is_none() {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
found
};
fk.validate()?;
Ok(ResolvedFkRef {
child_table: Arc::clone(child),
fk: Arc::clone(fk),
parent_cols,
child_pos: child_pos.into_boxed_slice(),
parent_pos: parent_pos.into_boxed_slice(),
parent_uses_rowid,
parent_unique_index,
})
}
pub fn any_resolved_fks_referencing(&self, table_name: &str) -> bool {
self.tables.values().any(|t| {
let Some(bt) = t.btree() else {
return false;
};
bt.foreign_keys
.iter()
.any(|fk| fk.parent_table == table_name)
})
}
pub fn has_child_fks(&self, table_name: &str) -> bool {
self.get_table(table_name)
.and_then(|t| t.btree())
.is_some_and(|t| !t.foreign_keys.is_empty())
}
fn check_object_name_conflict(&self, name: &str) -> Result<()> {
if let Some(object_type) = self.get_object_type(name) {
let type_str = match object_type {
SchemaObjectType::Table => "table",
SchemaObjectType::View => "view",
SchemaObjectType::Index => "index",
};
return Err(crate::LimboError::ParseError(format!(
"{type_str} \"{name}\" already exists"
)));
}
Ok(())
}
pub fn get_object_type(&self, name: &str) -> Option<SchemaObjectType> {
let normalized_name = self.normalize_table_lookup_name(name);
if self.tables.contains_key(&normalized_name) {
return Some(SchemaObjectType::Table);
}
if self.views.contains_key(&normalized_name) {
return Some(SchemaObjectType::View);
}
for index_list in self.indexes.values() {
if index_list.iter().any(|i| i.name.eq_ignore_ascii_case(name)) {
return Some(SchemaObjectType::Index);
}
}
None
}
}
impl Clone for Schema {
fn clone(&self) -> Self {
let tables = self
.tables
.iter()
.map(|(name, table)| match table.deref() {
Table::BTree(table) => {
let table = Arc::deref(table);
(
name.clone(),
Arc::new(Table::BTree(Arc::new(table.clone()))),
)
}
Table::Virtual(table) => {
let table = Arc::deref(table);
(
name.clone(),
Arc::new(Table::Virtual(Arc::new(table.clone()))),
)
}
Table::FromClauseSubquery(from_clause_subquery) => (
name.clone(),
Arc::new(Table::FromClauseSubquery(Arc::new(
(**from_clause_subquery).clone(),
))),
),
})
.collect();
let indexes = self
.indexes
.iter()
.map(|(name, indexes)| {
let indexes = indexes
.iter()
.map(|index| Arc::new((**index).clone()))
.collect();
(name.clone(), indexes)
})
.collect();
let materialized_view_names = self.materialized_view_names.clone();
let materialized_view_sql = self.materialized_view_sql.clone();
let incremental_views = self
.incremental_views
.iter()
.map(|(name, view)| (name.clone(), view.clone()))
.collect();
let views = self
.views
.iter()
.map(|(name, view)| (name.clone(), Arc::new((**view).clone())))
.collect();
let triggers = self
.triggers
.iter()
.map(|(table_name, triggers)| {
(
table_name.clone(),
triggers.iter().map(|t| Arc::new((**t).clone())).collect(),
)
})
.collect();
let incompatible_views = self.incompatible_views.clone();
Self {
tables,
materialized_view_names,
materialized_view_sql,
incremental_views,
views,
triggers,
indexes,
has_indexes: self.has_indexes.clone(),
schema_version: self.schema_version,
analyze_stats: self.analyze_stats.clone(),
table_to_materialized_views: self.table_to_materialized_views.clone(),
incompatible_views,
dropped_root_pages: self.dropped_root_pages.clone(),
type_registry: self.type_registry.clone(),
generated_columns_enabled: self.generated_columns_enabled,
}
}
}
#[derive(Debug, Clone)]
pub enum ColumnLayout {
Identity {
column_count: usize,
},
Mapped {
offsets: Vec<usize>,
non_virtual_col_count: usize,
},
}
impl ColumnLayout {
pub fn from_table(table: &Table) -> Self {
match table {
Table::BTree(btree) => Self::from_btree(btree),
Table::Virtual(vtable) => Self::Identity {
column_count: vtable.as_ref().columns.len(),
},
Table::FromClauseSubquery(subquery) => Self::Identity {
column_count: subquery.columns.len(),
},
}
}
pub fn from_btree(btree: &BTreeTable) -> Self {
let total = btree.columns.len();
let non_virtual_col_count = btree
.columns
.iter()
.filter(|c| !c.is_virtual_generated())
.count();
let offsets = btree.logical_to_physical_map.clone();
let is_identity = non_virtual_col_count == total && offsets.iter().copied().eq(0..total);
if is_identity {
Self::Identity {
column_count: total,
}
} else {
Self::Mapped {
offsets,
non_virtual_col_count,
}
}
}
pub fn from_columns(columns: &[Column]) -> Self {
let total = columns.len();
let non_virtual_col_count = columns.iter().filter(|c| !c.is_virtual_generated()).count();
if non_virtual_col_count == total {
return Self::Identity {
column_count: total,
};
}
let mut offsets = vec![0usize; total];
let mut nv_idx = 0;
let mut v_idx = non_virtual_col_count;
for (i, col) in columns.iter().enumerate() {
if col.is_virtual_generated() {
offsets[i] = v_idx;
v_idx += 1;
} else {
offsets[i] = nv_idx;
nv_idx += 1;
}
}
Self::Mapped {
offsets,
non_virtual_col_count,
}
}
#[inline(always)]
pub fn to_reg_offset(&self, col_idx: usize) -> usize {
match self {
Self::Identity { .. } => col_idx,
Self::Mapped { offsets, .. } => offsets[col_idx],
}
}
#[inline(always)]
pub fn to_register(&self, base: usize, schema_idx: usize) -> usize {
base + self.to_reg_offset(schema_idx)
}
#[inline(always)]
pub fn num_non_virtual_cols(&self) -> usize {
match self {
Self::Identity {
column_count: total,
} => *total,
Self::Mapped {
non_virtual_col_count,
..
} => *non_virtual_col_count,
}
}
#[inline(always)]
pub fn column_count(&self) -> usize {
match self {
Self::Identity {
column_count: total,
} => *total,
Self::Mapped { offsets, .. } => offsets.len(),
}
}
pub fn column_idx_for_offset(&self, offset: usize) -> Option<usize> {
match self {
Self::Identity { column_count } => {
if offset < *column_count {
Some(offset)
} else {
None
}
}
Self::Mapped { offsets, .. } => offsets.iter().position(|&s| s == offset),
}
}
}
#[derive(Clone, Debug)]
pub enum Table {
BTree(Arc<BTreeTable>),
Virtual(Arc<VirtualTable>),
FromClauseSubquery(Arc<FromClauseSubquery>),
}
impl Table {
pub fn get_root_page(&self) -> crate::Result<i64> {
match self {
Table::BTree(table) => Ok(table.root_page),
Table::Virtual(_) => Err(crate::LimboError::InternalError(
"Virtual tables do not have a root page".to_string(),
)),
Table::FromClauseSubquery(_) => Err(crate::LimboError::InternalError(
"FROM clause subqueries do not have a root page".to_string(),
)),
}
}
pub fn get_name(&self) -> &str {
match self {
Self::BTree(table) => &table.name,
Self::Virtual(table) => &table.name,
Self::FromClauseSubquery(from_clause_subquery) => &from_clause_subquery.name,
}
}
pub fn get_column_at(&self, index: usize) -> Option<&Column> {
match self {
Self::BTree(table) => table.columns.get(index),
Self::Virtual(table) => table.columns.get(index),
Self::FromClauseSubquery(from_clause_subquery) => {
from_clause_subquery.columns.get(index)
}
}
}
pub fn get_column_by_name(&self, name: &str) -> Option<(usize, &Column)> {
match self {
Self::BTree(table) => table.get_column(name),
Self::Virtual(table) => table.columns.iter().enumerate().find(|(_, col)| {
col.name
.as_ref()
.is_some_and(|n| n.eq_ignore_ascii_case(name))
}),
Self::FromClauseSubquery(from_clause_subquery) => from_clause_subquery
.columns
.iter()
.enumerate()
.find(|(_, col)| {
col.name
.as_ref()
.is_some_and(|n| n.eq_ignore_ascii_case(name))
}),
}
}
pub fn columns(&self) -> &Vec<Column> {
match self {
Self::BTree(table) => &table.columns,
Self::Virtual(table) => &table.columns,
Self::FromClauseSubquery(from_clause_subquery) => &from_clause_subquery.columns,
}
}
pub fn is_strict(&self) -> bool {
match self {
Self::BTree(table) => table.is_strict,
Self::Virtual(_) => false,
Self::FromClauseSubquery(_) => false,
}
}
pub fn btree(&self) -> Option<Arc<BTreeTable>> {
match self {
Self::BTree(table) => Some(table.clone()),
Self::Virtual(_) => None,
Self::FromClauseSubquery(_) => None,
}
}
pub fn require_btree(&self) -> crate::Result<Arc<BTreeTable>> {
self.btree().ok_or_else(|| {
crate::LimboError::InternalError(
"operation requires a btree table, not a virtual table".into(),
)
})
}
pub fn btree_mut(&mut self) -> Option<&mut Arc<BTreeTable>> {
match self {
Self::BTree(table) => Some(table),
Self::Virtual(_) => None,
Self::FromClauseSubquery(_) => None,
}
}
pub fn virtual_table(&self) -> Option<Arc<VirtualTable>> {
match self {
Self::Virtual(table) => Some(table.clone()),
_ => None,
}
}
}
impl PartialEq for Table {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::BTree(a), Self::BTree(b)) => Arc::ptr_eq(a, b),
(Self::Virtual(a), Self::Virtual(b)) => Arc::ptr_eq(a, b),
_ => false,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct UniqueSet {
pub columns: Vec<(String, SortOrder)>,
pub is_primary_key: bool,
pub conflict_clause: Option<ResolveType>,
}
#[derive(Clone, Debug)]
pub struct CheckConstraint {
pub name: Option<String>,
pub expr: ast::Expr,
pub column: Option<String>,
}
impl CheckConstraint {
pub fn new(name: Option<&ast::Name>, expr: &ast::Expr, column: Option<&str>) -> Self {
Self {
name: name.map(|n| n.as_str().to_string()),
expr: expr.clone(),
column: column.map(|s| s.to_string()),
}
}
pub fn sql(&self) -> String {
format!("CHECK({})", self.expr)
}
}
#[derive(Debug, Default)]
pub struct ResetOnClone<T: Default>(T);
impl<T: Default> Clone for ResetOnClone<T> {
fn clone(&self) -> Self {
Self(T::default())
}
}
bitflags! {
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct BTreeCharacteristics: u8 {
const HAS_ROWID = 0b0000_0001;
const STRICT = 0b0000_0010;
const HAS_AUTOINCREMENT = 0b0000_0100;
}
}
#[derive(Debug)]
pub(crate) struct GeneratedColGraph {
dependencies: Vec<ColumnMask>,
dependents: Vec<ColumnMask>,
topological_sort: Vec<usize>,
}
impl GeneratedColGraph {
fn build(columns: &[Column]) -> Result<Self> {
let n = columns.len();
let mut direct_deps = vec![ColumnMask::default(); n];
let mut direct_dependents = vec![ColumnMask::default(); n];
let mut in_degree: Vec<u32> = vec![0; n];
for (j, col) in columns.iter().enumerate() {
let GeneratedType::Virtual { ref expr, .. } = col.generated_type() else {
continue;
};
let mut direct = BitSet::default();
collect_column_dependencies_of_gencol(expr, columns, &mut direct);
if direct.get(j) {
bail_parse_error!(
"generated column \"{}\" cannot reference itself",
col.name.as_deref().unwrap_or("?")
);
}
let direct_mask: ColumnMask = ColumnMask::from_iter(direct.iter());
direct_deps[j].union_with(&direct_mask);
for i in direct.iter() {
direct_dependents[i].set(j);
in_degree[j] += 1;
}
}
let mut topological_sort: Vec<usize> = Vec::with_capacity(n);
let mut ready: Vec<usize> = (0..n).filter(|&i| in_degree[i] == 0).collect();
while let Some(i) = ready.pop() {
topological_sort.push(i);
for j in direct_dependents[i].iter() {
in_degree[j] -= 1;
if in_degree[j] == 0 {
ready.push(j);
}
}
}
if topological_sort.len() != n {
let cycle_names: Vec<&str> = (0..n)
.filter(|i| in_degree[*i] > 0)
.filter_map(|i| columns[i].name.as_deref())
.collect();
bail_parse_error!(
"circular dependency in generated columns: {}",
cycle_names.join(", ")
);
}
let mut dependencies = vec![ColumnMask::default(); n];
for &j in &topological_sort {
dependencies[j] = direct_deps[j].clone();
for i in direct_deps[j].iter() {
let snapshot = dependencies[i].clone();
dependencies[j].union_with(&snapshot);
}
}
let mut dependents = vec![ColumnMask::default(); n];
for &i in topological_sort.iter().rev() {
dependents[i] = direct_dependents[i].clone();
for j in direct_dependents[i].iter() {
let snapshot = dependents[j].clone();
dependents[i].union_with(&snapshot);
}
}
Ok(Self {
dependencies,
dependents,
topological_sort,
})
}
}
#[derive(Clone, Debug)]
pub struct BTreeTable {
pub root_page: i64,
pub name: String,
pub primary_key_columns: Vec<(String, SortOrder)>,
columns: Vec<Column>,
pub has_rowid: bool,
pub is_strict: bool,
pub has_autoincrement: bool,
pub unique_sets: Vec<UniqueSet>,
pub foreign_keys: Vec<Arc<ForeignKey>>,
pub check_constraints: Vec<CheckConstraint>,
pub rowid_alias_conflict_clause: Option<ResolveType>,
pub has_virtual_columns: bool,
pub logical_to_physical_map: Vec<usize>,
column_dependencies: ResetOnClone<OnceLock<GeneratedColGraph>>,
}
pub struct ColumnsMut<'a> {
table: &'a mut BTreeTable,
}
impl std::ops::Deref for ColumnsMut<'_> {
type Target = Vec<Column>;
fn deref(&self) -> &Vec<Column> {
&self.table.columns
}
}
impl std::ops::DerefMut for ColumnsMut<'_> {
fn deref_mut(&mut self) -> &mut Vec<Column> {
&mut self.table.columns
}
}
impl Drop for ColumnsMut<'_> {
fn drop(&mut self) {
self.table.column_dependencies.0 = OnceLock::new();
self.table.has_virtual_columns =
self.table.columns.iter().any(|c| c.is_virtual_generated());
self.table.logical_to_physical_map = BTreeTable::build_logical_to_physical_map(
&self.table.columns,
&self.table.primary_key_columns,
self.table.has_rowid,
);
}
}
impl BTreeTable {
#[allow(clippy::too_many_arguments)]
pub fn new(
root_page: i64,
name: String,
primary_key_columns: Vec<(String, SortOrder)>,
columns: Vec<Column>,
characteristics: BTreeCharacteristics,
unique_sets: Vec<UniqueSet>,
foreign_keys: Vec<Arc<ForeignKey>>,
check_constraints: Vec<CheckConstraint>,
rowid_alias_conflict_clause: Option<ResolveType>,
) -> Self {
let has_virtual_columns = columns.iter().any(|c| c.is_virtual_generated());
let has_rowid = characteristics.contains(BTreeCharacteristics::HAS_ROWID);
let logical_to_physical_map =
Self::build_logical_to_physical_map(&columns, &primary_key_columns, has_rowid);
Self {
root_page,
name,
primary_key_columns,
columns,
has_rowid,
is_strict: characteristics.contains(BTreeCharacteristics::STRICT),
has_autoincrement: characteristics.contains(BTreeCharacteristics::HAS_AUTOINCREMENT),
unique_sets,
foreign_keys,
check_constraints,
rowid_alias_conflict_clause,
has_virtual_columns,
logical_to_physical_map,
column_dependencies: Default::default(),
}
}
pub fn columns(&self) -> &[Column] {
&self.columns
}
pub fn columns_mut(&mut self) -> ColumnsMut<'_> {
ColumnsMut { table: self }
}
pub fn type_check_table_ref(table: &Arc<BTreeTable>, schema: &Schema) -> Arc<BTreeTable> {
let has_virtual = table.has_virtual_columns();
let has_custom = table
.columns
.iter()
.any(|c| c.is_array() || schema.get_type_def(&c.ty_str, table.is_strict).is_some());
if !has_custom && !has_virtual {
return Arc::clone(table);
}
let mut modified = (**table).clone();
if has_virtual {
modified.columns.retain(|c| !c.is_virtual_generated());
modified.has_virtual_columns = false;
}
for col in &mut modified.columns {
if col.is_array() {
col.ty_str = "BLOB".to_string();
} else if let Ok(Some(resolved)) = schema.resolve_type(&col.ty_str, table.is_strict) {
col.ty_str = resolved.primitive.to_uppercase();
}
}
Arc::new(modified)
}
pub fn input_type_check_table_ref(
table: &Arc<BTreeTable>,
schema: &Schema,
only_columns: Option<&ColumnMask>,
) -> Arc<BTreeTable> {
let has_virtual = table.has_virtual_columns();
let has_custom = table
.columns
.iter()
.any(|c| c.is_array() || schema.get_type_def(&c.ty_str, table.is_strict).is_some());
if !has_custom && !has_virtual {
return Arc::clone(table);
}
let mut modified = (**table).clone();
let remapped_only_columns = if has_virtual {
let remapped = only_columns.map(|only| {
let mut new_set = ColumnMask::default();
let mut physical = 0usize;
for (orig, col) in modified.columns.iter().enumerate() {
if col.is_virtual_generated() {
continue;
}
if only.get(orig) {
new_set.set(physical);
}
physical += 1;
}
new_set
});
modified.columns.retain(|c| !c.is_virtual_generated());
modified.has_virtual_columns = false;
remapped
} else {
None
};
let effective_only = remapped_only_columns.as_ref().or(only_columns);
for (i, col) in modified.columns.iter_mut().enumerate() {
if let Some(only) = effective_only {
if !only.get(i) {
col.ty_str = "ANY".to_string();
continue;
}
}
if col.is_array() {
col.ty_str = "ANY".to_string();
} else if let Some(type_def) = schema.get_type_def(&col.ty_str, table.is_strict) {
col.ty_str = type_def.value_input_type().to_uppercase();
}
}
Arc::new(modified)
}
pub fn resolve_custom_type_affinities(&mut self, schema: &Schema) {
if !self.is_strict {
return;
}
for col in &mut self.columns {
if col.is_array() {
col.set_ty(Type::Blob);
col.set_base_affinity(Affinity::Blob);
continue;
}
if let Ok(Some(resolved)) = schema.resolve_type_unchecked(&col.ty_str) {
let (base_ty, _) = type_from_name(&resolved.primitive);
col.set_ty(base_ty);
col.set_base_affinity(Affinity::affinity(&resolved.primitive));
}
}
}
pub fn propagate_domain_constraints(&mut self, schema: &Schema) {
if !self.is_strict {
return;
}
let mut new_checks = Vec::new();
let mut notnull_cols = Vec::new();
for (col_idx, col) in self.columns.iter().enumerate() {
let Ok(Some(resolved)) = schema.resolve_type_unchecked(&col.ty_str) else {
continue;
};
if !resolved.is_domain() {
continue;
}
let col_name = col.name.as_deref().unwrap_or("").to_string();
for td in &resolved.chain {
if td.not_null {
notnull_cols.push(col_idx);
}
for (i, dc) in td.domain_checks.iter().enumerate() {
let rewritten = rewrite_value_to_column(&dc.check, &col_name);
let name = dc
.name
.clone()
.unwrap_or_else(|| format!("{}_{}", td.name, i));
new_checks.push(CheckConstraint {
name: Some(name),
expr: *rewritten,
column: Some(col_name.clone()),
});
}
}
}
for col_idx in notnull_cols {
self.columns[col_idx].set_notnull(true);
}
self.check_constraints.extend(new_checks);
}
pub fn get_rowid_alias_column(&self) -> Option<(usize, &Column)> {
self.columns
.iter()
.enumerate()
.find(|(_, column)| column.is_rowid_alias())
}
pub fn has_virtual_columns(&self) -> bool {
self.has_virtual_columns
}
pub fn column_layout(&self) -> ColumnLayout {
ColumnLayout::from_btree(self)
}
pub fn get_column(&self, name: &str) -> Option<(usize, &Column)> {
self.columns.iter().enumerate().find(|(_, column)| {
column
.name
.as_ref()
.is_some_and(|n| n.eq_ignore_ascii_case(name))
})
}
pub fn from_sql(sql: &str, root_page: i64) -> Result<BTreeTable> {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next_cmd()?;
match cmd {
Some(Cmd::Stmt(Stmt::CreateTable { tbl_name, body, .. })) => {
create_table(tbl_name.name.as_str(), &body, root_page)
}
_ => unreachable!("Expected CREATE TABLE statement"),
}
}
pub fn to_sql(&self) -> String {
let mut sql = format!("CREATE TABLE {} (", quote_ident(&self.name));
let needs_pk_inline = self.primary_key_columns.len() == 1;
for (i, column) in self.columns.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
let column_name = column.name.as_ref().expect("column name is None");
sql.push_str("e_ident(column_name));
if !column.ty_str.is_empty() {
sql.push(' ');
sql.push_str(&column.ty_str);
if column.is_array() {
sql.push_str("[]");
}
}
if column.notnull()
&& (column.explicit_notnull() || !self.is_without_rowid_inline_pk(column))
{
sql.push_str(" NOT NULL");
}
if column.unique() {
sql.push_str(" UNIQUE");
}
if needs_pk_inline && column.primary_key() {
sql.push_str(" PRIMARY KEY");
}
if let Some(default) = &column.default {
sql.push_str(" DEFAULT ");
sql.push_str(&default.to_string());
}
if let GeneratedType::Virtual { original_sql, .. } = &column.generated_type() {
sql.push_str(" AS (");
sql.push_str(original_sql);
sql.push(')');
}
for check_constraint in &self.check_constraints {
if check_constraint.column.as_deref() == Some(column_name) {
sql.push(' ');
if let Some(name) = &check_constraint.name {
sql.push_str("CONSTRAINT ");
sql.push_str(&Name::exact(name.clone()).as_ident());
sql.push(' ');
}
sql.push_str(&check_constraint.sql());
}
}
}
let has_table_pk = !self.primary_key_columns.is_empty();
if !needs_pk_inline && has_table_pk {
sql.push_str(", PRIMARY KEY (");
for (i, col) in self.primary_key_columns.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
sql.push_str(&col.0);
}
sql.push(')');
}
for fk in &self.foreign_keys {
sql.push_str(", FOREIGN KEY (");
for (i, col) in fk.child_columns.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
sql.push_str(col);
}
sql.push_str(") REFERENCES ");
sql.push_str(&fk.parent_table);
sql.push('(');
for (i, col) in fk.parent_columns.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
sql.push_str(col);
}
sql.push(')');
if fk.on_delete != RefAct::NoAction {
sql.push_str(" ON DELETE ");
sql.push_str(match fk.on_delete {
RefAct::SetNull => "SET NULL",
RefAct::SetDefault => "SET DEFAULT",
RefAct::Cascade => "CASCADE",
RefAct::Restrict => "RESTRICT",
_ => "",
});
}
if fk.on_update != RefAct::NoAction {
sql.push_str(" ON UPDATE ");
sql.push_str(match fk.on_update {
RefAct::SetNull => "SET NULL",
RefAct::SetDefault => "SET DEFAULT",
RefAct::Cascade => "CASCADE",
RefAct::Restrict => "RESTRICT",
_ => "",
});
}
if fk.deferred {
sql.push_str(" DEFERRABLE INITIALLY DEFERRED");
}
}
for check_constraint in &self.check_constraints {
if check_constraint.column.is_some() {
continue;
}
sql.push_str(", ");
if let Some(name) = &check_constraint.name {
sql.push_str("CONSTRAINT ");
sql.push_str(&Name::exact(name.clone()).as_ident());
sql.push(' ');
}
sql.push_str(&check_constraint.sql());
}
for unique_set in &self.unique_sets {
if unique_set.is_primary_key {
continue;
}
if unique_set.columns.len() == 1 {
let col_name = &unique_set.columns[0].0;
if let Some((_, col)) = self.get_column(col_name) {
if col.unique() {
continue;
}
}
}
sql.push_str(", UNIQUE (");
for (i, (col_name, _)) in unique_set.columns.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
sql.push_str("e_ident(col_name));
}
sql.push(')');
}
sql.push(')');
if self.is_strict {
sql.push_str(" STRICT");
}
if !self.has_rowid {
if self.is_strict {
sql.push_str(", WITHOUT ROWID");
} else {
sql.push_str(" WITHOUT ROWID");
}
}
sql
}
fn is_without_rowid_inline_pk(&self, column: &Column) -> bool {
!self.has_rowid && self.primary_key_columns.len() == 1 && column.primary_key()
}
pub fn column_collations(&self) -> Vec<CollationSeq> {
self.columns
.iter()
.map(|column| column.collation())
.collect()
}
#[inline]
pub fn logical_to_physical_column(&self, logical: usize) -> usize {
self.logical_to_physical_map[logical]
}
pub fn build_logical_to_physical_map(
columns: &[Column],
primary_key_columns: &[(String, SortOrder)],
has_rowid: bool,
) -> Vec<usize> {
let mut map = vec![usize::MAX; columns.len()];
let mut physical = 0;
if !has_rowid {
for (pk_name, _) in primary_key_columns {
let Some((pk_idx, col)) = columns.iter().enumerate().find(|(_, col)| {
col.name
.as_ref()
.is_some_and(|name| name.eq_ignore_ascii_case(pk_name))
}) else {
continue;
};
if col.is_virtual_generated() || map[pk_idx] != usize::MAX {
continue;
}
map[pk_idx] = physical;
physical += 1;
}
}
for (idx, col) in columns.iter().enumerate() {
if col.is_virtual_generated() || map[idx] != usize::MAX {
continue;
}
map[idx] = physical;
physical += 1;
}
for offset in &mut map {
if *offset == usize::MAX {
*offset = physical;
physical += 1;
}
}
map
}
pub fn prepare_generated_columns(&mut self) -> Result<()> {
{
let mut guard = self.columns_mut();
for i in 0..guard.len() {
if guard[i].is_virtual_generated() {
let mut expr = guard[i].generated_expr().cloned().unwrap();
resolve_gencol_expr_columns(&mut expr, &guard)?;
*guard[i].generated_expr_mut().unwrap() = expr;
}
}
}
self.column_graph()?;
Ok(())
}
pub fn shift_generated_column_indices_after_drop(
&mut self,
dropped_index: usize,
) -> Result<()> {
if !self.has_virtual_columns {
return Ok(());
}
for column in &mut self.columns {
let Some(expr) = column.generated_expr_mut() else {
continue;
};
walk_expr_mut(expr, &mut |e| match e {
Expr::Column {
table,
column,
is_rowid_alias: _,
..
} if table.is_self_table() => {
if *column == dropped_index {
return Err(LimboError::InternalError(
"dropped column remained referenced by generated column".to_string(),
));
}
if *column > dropped_index {
*column -= 1;
}
Ok(WalkControl::Continue)
}
_ => Ok(WalkControl::Continue),
})?;
}
Ok(())
}
fn column_graph(&self) -> Result<&GeneratedColGraph> {
if let Some(graph) = self.column_dependencies.0.get() {
return Ok(graph);
}
let graph = GeneratedColGraph::build(&self.columns)?;
let _ = self.column_dependencies.0.set(graph);
Ok(self
.column_dependencies
.0
.get()
.expect("column_dependencies was just initialized"))
}
pub(crate) fn columns_topo_sort(&self) -> Result<ColumnsTopologicalSort<'_>> {
let topo = self.column_graph()?.topological_sort.to_vec();
Ok(ColumnsTopologicalSort {
columns: &self.columns,
topological_sort: topo,
})
}
#[cfg(test)]
pub(crate) fn peek_column_dependencies(&self) -> Option<&GeneratedColGraph> {
self.column_dependencies.0.get()
}
pub(crate) fn columns_affected_by_update(
&self,
updated_cols: impl IntoIterator<Item = usize>,
) -> Result<ColumnMask> {
let graph = self.column_graph()?;
let mut affected = ColumnMask::default();
for i in updated_cols {
affected.set(i);
if i < graph.dependents.len() {
let snapshot = graph.dependents[i].clone();
affected.union_with(&snapshot);
}
}
Ok(affected)
}
pub(crate) fn dependencies_of_columns(
&self,
targets: impl IntoIterator<Item = usize>,
) -> Result<ColumnMask> {
let graph = self.column_graph()?;
let mut deps = ColumnMask::default();
for j in targets {
if !self.columns[j].is_virtual_generated() {
deps.set(j);
continue;
}
for i in graph.dependencies[j].iter() {
if !self.columns[i].is_virtual_generated() {
deps.set(i);
}
}
}
Ok(deps)
}
}
pub(crate) struct ColumnsTopologicalSort<'a> {
columns: &'a [Column],
topological_sort: Vec<usize>,
}
impl<'a> ColumnsTopologicalSort<'a> {
pub fn iter(&self) -> impl Iterator<Item = (usize, &'a Column)> + '_ {
self.topological_sort
.iter()
.map(|&idx| (idx, &self.columns[idx]))
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct PseudoCursorType {
pub column_count: usize,
}
impl PseudoCursorType {
pub fn new() -> Self {
Self { column_count: 0 }
}
pub fn new_with_columns(columns: impl AsRef<[Column]>) -> Self {
Self {
column_count: columns.as_ref().len(),
}
}
}
#[derive(Debug, Clone)]
pub struct FromClauseSubquery {
pub name: String,
pub plan: Box<Plan>,
pub columns: Vec<Column>,
pub result_columns_start_reg: Option<usize>,
pub materialized_cursor_id: Option<CursorID>,
pub cte: Option<FromClauseSubqueryCteMetadata>,
}
#[derive(Debug, Clone, Copy)]
pub struct FromClauseSubqueryCteMetadata {
pub id: usize,
pub shared_materialization: bool,
pub materialize_hint: bool,
}
impl FromClauseSubquery {
pub fn cte_id(&self) -> Option<usize> {
self.cte.map(|cte| cte.id)
}
pub fn materialize_hint(&self) -> bool {
self.cte.is_some_and(|cte| cte.materialize_hint)
}
pub fn shared_materialization(&self) -> bool {
self.cte.is_some_and(|cte| cte.shared_materialization)
}
pub fn set_shared_materialization(&mut self, shared: bool) {
if let Some(cte) = &mut self.cte {
cte.shared_materialization = shared;
}
}
pub fn requires_table_materialization(&self) -> bool {
self.shared_materialization() || self.materialize_hint()
}
pub fn supports_direct_index_materialization(&self) -> bool {
matches!(self.plan.as_ref(), Plan::Select(_)) && !self.requires_table_materialization()
}
}
fn collect_column_refs(expr: &Expr) -> HashSet<String> {
collect_column_dependencies_of_expr(expr, &[])
}
pub fn collect_column_dependencies_of_expr(expr: &Expr, columns: &[Column]) -> HashSet<String> {
let mut refs = HashSet::default();
let _ = walk_expr(expr, &mut |e| match e {
Expr::Id(name) | Expr::Name(name) => {
refs.insert(normalize_ident(name.as_str()));
Ok(WalkControl::Continue)
}
Expr::Qualified(_, col) | Expr::DoublyQualified(_, _, col) => {
refs.insert(normalize_ident(col.as_str()));
Ok(WalkControl::Continue)
}
Expr::Column { table, column, .. } if table.is_self_table() => {
if let Some(col) = columns.get(*column) {
if let Some(name) = &col.name {
refs.insert(normalize_ident(name));
}
}
Ok(WalkControl::Continue)
}
Expr::Subquery(_)
| Expr::Exists(_)
| Expr::InTable { .. }
| Expr::SubqueryResult { .. } => Ok(WalkControl::SkipChildren),
_ => Ok(WalkControl::Continue),
});
refs
}
fn collect_column_dependencies_of_gencol(expr: &Expr, columns: &[Column], out: &mut BitSet) {
let _ = walk_expr(expr, &mut |e| {
match e {
Expr::Column { table, column, .. } if table.is_self_table() => {
out.set(*column);
}
Expr::Id(name) | Expr::Name(name) => {
if let Some(idx) = find_column_index_by_name(columns, name.as_str()) {
out.set(idx);
}
}
Expr::Qualified(_, col) | Expr::DoublyQualified(_, _, col) => {
if let Some(idx) = find_column_index_by_name(columns, col.as_str()) {
out.set(idx);
}
}
Expr::Subquery(_)
| Expr::Exists(_)
| Expr::InTable { .. }
| Expr::SubqueryResult { .. } => {
unreachable!("generated columns cannot contain subqueries")
}
_ => {}
}
Ok(WalkControl::Continue)
});
}
fn find_column_index_by_name(columns: &[Column], col_name: &str) -> Option<usize> {
columns.iter().enumerate().find_map(|(i, col)| {
col.name
.as_ref()
.filter(|name| name.eq_ignore_ascii_case(col_name))
.map(|_| i)
})
}
pub fn resolve_gencol_expr_columns(gencol_expr: &mut Expr, columns: &[Column]) -> Result<()> {
walk_expr_mut(gencol_expr, &mut |e| match e {
Expr::Id(name) | Expr::Qualified(_, name) | Expr::DoublyQualified(_, _, name) => {
let col_name = normalize_ident(name.as_str());
let (idx, col) = columns
.iter()
.enumerate()
.find(|(_, c)| {
c.name
.as_ref()
.is_some_and(|n| n.eq_ignore_ascii_case(&col_name))
})
.ok_or_else(|| LimboError::ParseError(format!("no such column: {col_name}")))?;
*e = Expr::Column {
database: None,
table: TableInternalId::SELF_TABLE,
column: idx,
is_rowid_alias: col.is_rowid_alias(),
};
Ok(WalkControl::Continue)
}
_ => Ok(WalkControl::Continue),
})?;
Ok(())
}
pub fn render_gencol_expr_sql_with_new_names(expr: &Expr, columns: &[Column]) -> Result<String> {
let mut clone = expr.clone();
walk_expr_mut(&mut clone, &mut |e| -> Result<WalkControl> {
if let Expr::Column { table, column, .. } = e {
if table.is_self_table() {
if let Some(col) = columns.get(*column) {
if let Some(name) = col.name.as_ref() {
*e = Expr::Id(Name::exact(name.clone()));
}
}
}
}
Ok(WalkControl::Continue)
})?;
Ok(clone.to_string())
}
pub(crate) fn validate_generated_expr(expr: &Expr) -> Result<()> {
use ast::Expr;
match expr {
Expr::Qualified(_, _) => {
bail_parse_error!("the \".\" operator prohibited in generated columns");
}
Expr::DoublyQualified(_, _, _) => {
bail_parse_error!("the \".\" operator prohibited in generated columns");
}
Expr::Variable(_) => {
bail_parse_error!("bind parameters prohibited in generated columns");
}
Expr::Subquery(_) | Expr::InSelect { .. } | Expr::Exists(_) | Expr::InTable { .. } => {
bail_parse_error!("subqueries prohibited in generated columns");
}
Expr::FunctionCall {
name,
args,
filter_over,
..
} => {
if filter_over.over_clause.is_some() {
bail_parse_error!("window functions prohibited in generated columns");
}
let arg_count = args.len();
let Some(func) = Func::resolve_function(name.as_str(), arg_count)? else {
return Err(LimboError::ParseError(format!(
"could not resolve function {}",
name.as_str()
)));
};
if matches!(func, Func::Agg(_)) {
bail_parse_error!("aggregate functions prohibited in generated columns");
}
if !func.is_deterministic() {
bail_parse_error!("non-deterministic functions prohibited in generated columns");
}
for arg in args {
validate_generated_expr(arg)?;
}
}
Expr::FunctionCallStar { name, filter_over } => {
if filter_over.over_clause.is_some() {
bail_parse_error!("window functions prohibited in generated columns");
}
let Some(func) = Func::resolve_function(name.as_str(), 0)? else {
return Err(LimboError::ParseError(format!(
"could not resolve function {}",
name.as_str()
)));
};
if matches!(func, Func::Agg(_)) {
bail_parse_error!("aggregate functions prohibited in generated columns");
}
if !func.is_deterministic() {
bail_parse_error!("non-deterministic functions prohibited in generated columns");
}
}
Expr::Binary(lhs, _, rhs) => {
validate_generated_expr(lhs)?;
validate_generated_expr(rhs)?;
}
Expr::Unary(_, inner) => {
validate_generated_expr(inner)?;
}
Expr::Parenthesized(exprs) => {
for e in exprs {
validate_generated_expr(e)?;
}
}
Expr::Case {
base,
when_then_pairs,
else_expr,
..
} => {
if let Some(b) = base {
validate_generated_expr(b)?;
}
for (w, t) in when_then_pairs {
validate_generated_expr(w)?;
validate_generated_expr(t)?;
}
if let Some(e) = else_expr {
validate_generated_expr(e)?;
}
}
Expr::Cast { expr, .. } => {
validate_generated_expr(expr)?;
}
Expr::InList { lhs, rhs, .. } => {
validate_generated_expr(lhs)?;
for e in rhs {
validate_generated_expr(e)?;
}
}
Expr::Between {
lhs, start, end, ..
} => {
validate_generated_expr(lhs)?;
validate_generated_expr(start)?;
validate_generated_expr(end)?;
}
Expr::Like {
lhs, rhs, escape, ..
} => {
validate_generated_expr(lhs)?;
validate_generated_expr(rhs)?;
if let Some(e) = escape {
validate_generated_expr(e)?;
}
}
Expr::Collate(inner, _) => {
validate_generated_expr(inner)?;
}
Expr::IsNull(inner) | Expr::NotNull(inner) => {
validate_generated_expr(inner)?;
}
_ => {}
}
Ok(())
}
pub fn create_table(tbl_name: &str, body: &CreateTableBody, root_page: i64) -> Result<BTreeTable> {
let table_name = normalize_ident(tbl_name);
trace!("Creating table {}", table_name);
let has_rowid;
let mut has_autoincrement = false;
let mut primary_key_columns = vec![];
let mut foreign_keys = vec![];
let mut check_constraints = vec![];
let mut cols: Vec<Column> = vec![];
let is_strict: bool;
let mut unique_sets_columns: Vec<UniqueSet> = vec![];
let mut unique_sets_constraints: Vec<UniqueSet> = vec![];
match body {
CreateTableBody::ColumnsAndConstraints {
columns,
constraints,
options,
} => {
has_rowid = !options.contains_without_rowid();
is_strict = options.contains_strict();
let column_fk_count = columns
.iter()
.flat_map(|col| col.constraints.iter())
.filter(|constraint| {
matches!(
&constraint.constraint,
ast::ColumnConstraint::ForeignKey { .. }
)
})
.count();
let mut table_fk_order = column_fk_count;
for c in constraints {
if let ast::TableConstraint::PrimaryKey {
columns,
auto_increment,
conflict_clause,
} = &c.constraint
{
if !primary_key_columns.is_empty() {
crate::bail_parse_error!(
"table \"{}\" has more than one primary key",
tbl_name
);
}
if *auto_increment {
has_autoincrement = true;
}
for column in columns {
let col_name = match column.expr.as_ref() {
Expr::Id(id) => normalize_ident(id.as_str()),
Expr::Literal(Literal::String(value)) => {
value.trim_matches('\'').to_owned()
}
expr => {
bail_parse_error!("unsupported primary key expression: {}", expr)
}
};
primary_key_columns
.push((col_name, column.order.unwrap_or(SortOrder::Asc)));
}
unique_sets_constraints.push(UniqueSet {
columns: primary_key_columns.clone(),
is_primary_key: true,
conflict_clause: *conflict_clause,
});
} else if let ast::TableConstraint::Unique {
columns,
conflict_clause,
} = &c.constraint
{
let mut unique_columns = Vec::with_capacity(columns.len());
for column in columns {
match column.expr.as_ref() {
Expr::Id(id) => unique_columns.push((
id.as_str().to_string(),
column.order.unwrap_or(SortOrder::Asc),
)),
Expr::Literal(Literal::String(value)) => unique_columns.push((
value.trim_matches('\'').to_owned(),
column.order.unwrap_or(SortOrder::Asc),
)),
expr => {
bail_parse_error!("unsupported unique key expression: {}", expr)
}
}
}
let unique_set = UniqueSet {
columns: unique_columns,
is_primary_key: false,
conflict_clause: *conflict_clause,
};
unique_sets_constraints.push(unique_set);
} else if let ast::TableConstraint::ForeignKey {
columns,
clause,
defer_clause,
} = &c.constraint
{
let child_columns: Box<[String]> = columns
.iter()
.map(|ic| normalize_ident(ic.col_name.as_str()))
.collect();
let parent_table = normalize_ident(clause.tbl_name.as_str());
let parent_columns: Box<[String]> = clause
.columns
.iter()
.map(|ic| normalize_ident(ic.col_name.as_str()))
.collect();
if !parent_columns.is_empty() && child_columns.len() != parent_columns.len() {
crate::bail_parse_error!(
"foreign key on \"{}\" has {} child column(s) but {} parent column(s)",
tbl_name,
child_columns.len(),
parent_columns.len()
);
}
let deferred = match defer_clause {
Some(d) => {
d.deferrable
&& matches!(
d.init_deferred,
Some(InitDeferredPred::InitiallyDeferred)
)
}
None => false, };
let fk = ForeignKey {
parent_table,
parent_columns,
child_columns,
on_delete: clause
.args
.iter()
.find_map(|a| {
if let ast::RefArg::OnDelete(x) = a {
Some(*x)
} else {
None
}
})
.unwrap_or(RefAct::NoAction),
on_update: clause
.args
.iter()
.find_map(|a| {
if let ast::RefArg::OnUpdate(x) = a {
Some(*x)
} else {
None
}
})
.unwrap_or(RefAct::NoAction),
deferred,
decl_order: table_fk_order,
};
table_fk_order += 1;
foreign_keys.push(Arc::new(fk));
} else if let ast::TableConstraint::Check(expr) = &c.constraint {
check_constraints.push(CheckConstraint::new(c.name.as_ref(), expr, None));
}
}
let mut primary_key_desc_columns_constraint = false;
let mut column_fk_order = 0;
for ast::ColumnDefinition {
col_name,
col_type,
constraints,
} in columns
{
let name = col_name.as_str().to_string();
let ty_str = col_type
.as_ref()
.cloned()
.map(|ast::Type { name, .. }| name)
.unwrap_or_default();
let ty_params: Vec<Box<Expr>> = match col_type {
Some(ast::Type {
size: Some(ast::TypeSize::MaxSize(ref expr)),
..
}) => vec![expr.clone()],
Some(ast::Type {
size: Some(ast::TypeSize::TypeSize(ref e1, ref e2)),
..
}) => vec![e1.clone(), e2.clone()],
_ => Vec::new(),
};
let mut typename_exactly_integer = false;
let ty = match col_type {
Some(data_type) => {
let (ty, ei) = type_from_name(&data_type.name);
typename_exactly_integer = ei;
ty
}
None => Type::Null,
};
let mut default = None;
let mut generated: Option<Box<Expr>> = None;
let mut primary_key = false;
let mut notnull = false;
let mut explicit_notnull = false;
let mut notnull_conflict_clause = None;
let mut order = SortOrder::Asc;
let mut unique = false;
let mut collation = None;
for c_def in constraints {
match &c_def.constraint {
ast::ColumnConstraint::Check(expr) => {
check_constraints.push(CheckConstraint::new(
c_def.name.as_ref(),
expr,
Some(&name),
));
}
ast::ColumnConstraint::Generated { expr, typ } => {
if typ
.as_ref()
.is_some_and(|t| matches!(t, ast::GeneratedColumnType::Stored))
{
bail_parse_error!("Stored generated columns are not supported");
}
validate_generated_expr(expr)?;
generated = Some(expr.clone());
}
ast::ColumnConstraint::PrimaryKey {
order: o,
auto_increment,
conflict_clause,
..
} => {
if !primary_key_columns.is_empty() {
crate::bail_parse_error!(
"table \"{}\" has more than one primary key",
tbl_name
);
}
primary_key = true;
if *auto_increment {
has_autoincrement = true;
}
if let Some(o) = o {
order = *o;
}
unique_sets_columns.push(UniqueSet {
columns: vec![(name.clone(), order)],
is_primary_key: true,
conflict_clause: *conflict_clause,
});
}
ast::ColumnConstraint::NotNull {
nullable,
conflict_clause,
..
} => {
notnull = !nullable;
explicit_notnull = !nullable;
notnull_conflict_clause = *conflict_clause;
}
ast::ColumnConstraint::Default(ref expr) => {
default = Some(
translate_ident_to_string_literal(expr)
.unwrap_or_else(|| expr.clone()),
);
}
ast::ColumnConstraint::Unique(conflict) => {
unique = true;
unique_sets_columns.push(UniqueSet {
columns: vec![(name.clone(), order)],
is_primary_key: false,
conflict_clause: *conflict,
});
}
ast::ColumnConstraint::Collate { ref collation_name } => {
collation = Some(CollationSeq::new(collation_name.as_str())?);
}
ast::ColumnConstraint::ForeignKey {
clause,
defer_clause,
} => {
if clause.columns.len() > 1 {
crate::bail_parse_error!(
"foreign key on {} should reference only one column of table {}",
name,
clause.tbl_name.as_str()
);
}
let fk = ForeignKey {
parent_table: normalize_ident(clause.tbl_name.as_str()),
parent_columns: clause
.columns
.iter()
.map(|c| normalize_ident(c.col_name.as_str()))
.collect::<Vec<_>>()
.into_boxed_slice(),
on_delete: clause
.args
.iter()
.find_map(|arg| {
if let ast::RefArg::OnDelete(act) = arg {
Some(*act)
} else {
None
}
})
.unwrap_or(RefAct::NoAction),
on_update: clause
.args
.iter()
.find_map(|arg| {
if let ast::RefArg::OnUpdate(act) = arg {
Some(*act)
} else {
None
}
})
.unwrap_or(RefAct::NoAction),
child_columns: Box::from([name.clone()]),
deferred: match defer_clause {
Some(d) => {
d.deferrable
&& matches!(
d.init_deferred,
Some(InitDeferredPred::InitiallyDeferred)
)
}
None => false,
},
decl_order: column_fk_order,
};
column_fk_order += 1;
foreign_keys.push(Arc::new(fk));
}
}
}
if let Some(ref gen_expr) = generated {
if primary_key {
bail_parse_error!(
"generated column \"{}\" cannot be part of the PRIMARY KEY",
name
);
}
if default.is_some() {
bail_parse_error!(
"generated column \"{}\" cannot have a DEFAULT value",
name
);
}
let referenced_cols = collect_column_refs(gen_expr);
let current_col_name = normalize_ident(&name);
if referenced_cols.iter().any(|c| c == ¤t_col_name) {
bail_parse_error!("generated column \"{}\" cannot reference itself", name);
}
}
if primary_key {
primary_key_columns.push((name.clone(), order));
if order == SortOrder::Desc {
primary_key_desc_columns_constraint = true;
}
} else if primary_key_columns
.iter()
.any(|(col_name, _)| col_name.eq_ignore_ascii_case(&name))
{
if generated.is_some() {
crate::bail_parse_error!(
"generated column \"{}\" cannot be part of the PRIMARY KEY",
name
);
}
primary_key = true;
}
let mut col = Column::new(
Some(name),
ty_str,
default,
generated,
ty,
collation,
ColDef {
primary_key,
rowid_alias: typename_exactly_integer
&& primary_key
&& !primary_key_desc_columns_constraint,
notnull,
explicit_notnull,
unique,
hidden: false,
notnull_conflict_clause,
},
);
col.ty_params = ty_params;
if let Some(t) = col_type.as_ref() {
if t.is_array() {
col.set_array_dimensions(t.array_dimensions);
}
}
cols.push(col);
}
}
CreateTableBody::AsSelect(_) => {
crate::bail_parse_error!("CREATE TABLE AS SELECT is not supported")
}
};
if !has_rowid || primary_key_columns.len() > 1 {
for col in cols.iter_mut() {
col.set_rowid_alias(false);
}
}
if has_autoincrement {
if primary_key_columns.len() != 1 {
crate::bail_parse_error!("AUTOINCREMENT is only allowed on an INTEGER PRIMARY KEY");
}
let pk_col_name = &primary_key_columns[0].0;
let pk_col = cols.iter().find(|c| {
c.name
.as_deref()
.is_some_and(|n| n.eq_ignore_ascii_case(pk_col_name))
});
if let Some(col) = pk_col {
if col.ty() != Type::Integer {
crate::bail_parse_error!("AUTOINCREMENT is only allowed on an INTEGER PRIMARY KEY");
}
}
}
let mut unique_sets = unique_sets_columns
.into_iter()
.chain(unique_sets_constraints)
.collect::<Vec<_>>();
let rowid_alias_conflict_clause = unique_sets
.iter()
.find(|us| us.is_primary_key)
.and_then(|us| us.conflict_clause);
for col in cols.iter() {
if col.is_rowid_alias() {
let unique_set_w_only_rowid_alias = unique_sets.iter().position(|us| {
us.is_primary_key
&& us.columns.len() == 1
&& us
.columns
.first()
.unwrap()
.0
.eq_ignore_ascii_case(col.name.as_ref().unwrap())
});
if let Some(u) = unique_set_w_only_rowid_alias {
unique_sets.remove(u);
}
}
}
let mut table = BTreeTable {
root_page,
name: table_name,
has_rowid,
primary_key_columns,
has_autoincrement,
columns: cols,
is_strict,
foreign_keys,
unique_sets: {
let mut i = 0;
while i < unique_sets.len() {
let mut j = i + 1;
while j < unique_sets.len() {
let lengths_equal =
unique_sets[i].columns.len() == unique_sets[j].columns.len();
if lengths_equal
&& unique_sets[i]
.columns
.iter()
.zip(unique_sets[j].columns.iter())
.all(|((a_name, _), (b_name, _))| a_name.eq_ignore_ascii_case(b_name))
{
if let (Some(a), Some(b)) = (
unique_sets[i].conflict_clause,
unique_sets[j].conflict_clause,
) {
if a != b {
crate::bail_parse_error!(
"conflicting ON CONFLICT clauses specified"
);
}
}
unique_sets.remove(j);
} else {
j += 1;
}
}
i += 1;
}
unique_sets
},
check_constraints,
rowid_alias_conflict_clause,
has_virtual_columns: false,
logical_to_physical_map: Vec::new(),
column_dependencies: Default::default(),
};
table.prepare_generated_columns()?;
if !table.has_rowid {
if table.primary_key_columns.is_empty() {
crate::bail_parse_error!("PRIMARY KEY missing on table {}", table.name);
}
for (pk_name, _) in &table.primary_key_columns {
let Some((_, col)) = table.get_column(pk_name) else {
crate::bail_parse_error!(
"PRIMARY KEY column {pk_name} not found in table {}",
table.name
);
};
if !col.notnull() {
let Some(idx) = table.get_column(pk_name).map(|(idx, _)| idx) else {
unreachable!("PRIMARY KEY column should exist");
};
table.columns[idx].set_notnull(true);
}
}
}
table.logical_to_physical_map = BTreeTable::build_logical_to_physical_map(
&table.columns,
&table.primary_key_columns,
table.has_rowid,
);
Ok(table)
}
pub fn translate_ident_to_string_literal(expr: &Expr) -> Option<Box<Expr>> {
match expr {
Expr::Name(name) | Expr::Id(name) => {
Some(Box::new(Expr::Literal(Literal::String(name.as_literal()))))
}
_ => None,
}
}
pub fn _build_pseudo_table(columns: &[ResultColumn]) -> PseudoCursorType {
let table = PseudoCursorType::new();
for column in columns {
match column {
ResultColumn::Expr(expr, _as_name) => {
todo!("unsupported expression {:?}", expr);
}
ResultColumn::Star => {
todo!();
}
ResultColumn::TableStar(_) => {
todo!();
}
}
}
table
}
#[derive(Debug, Clone)]
pub struct ForeignKey {
pub child_columns: Box<[String]>,
pub parent_table: String,
pub parent_columns: Box<[String]>,
pub on_delete: RefAct,
pub on_update: RefAct,
pub deferred: bool,
pub decl_order: usize,
}
#[inline]
fn fk_mismatch_err(child: &str, parent: &str) -> crate::LimboError {
crate::LimboError::ForeignKeyConstraint(format!(
"foreign key mismatch - \"{child}\" referencing \"{parent}\""
))
}
impl ForeignKey {
fn validate(&self) -> Result<()> {
if self
.parent_columns
.iter()
.any(|c| ROWID_STRS.iter().any(|&r| r.eq_ignore_ascii_case(c)))
{
return Err(crate::LimboError::ForeignKeyConstraint(format!(
"foreign key mismatch referencing \"{}\"",
self.parent_table
)));
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct ResolvedFkRef {
pub child_table: Arc<BTreeTable>,
pub fk: Arc<ForeignKey>,
pub parent_cols: Box<[String]>,
pub child_pos: Box<[usize]>,
pub parent_pos: Box<[usize]>,
pub parent_uses_rowid: bool,
pub parent_unique_index: Option<Arc<Index>>,
}
impl ResolvedFkRef {
pub fn parent_key_may_change(
&self,
updated_parent_positions: &ColumnMask,
parent_tbl: &BTreeTable,
) -> Result<bool> {
if self.parent_uses_rowid {
if let Some((idx, _)) = parent_tbl
.columns
.iter()
.enumerate()
.find(|(_, c)| c.is_rowid_alias())
{
return Ok(updated_parent_positions.get(idx));
}
return Ok(true);
}
let affected = parent_tbl.columns_affected_by_update(updated_parent_positions)?;
Ok(self.parent_pos.iter().any(|p| affected.get(*p)))
}
pub fn child_key_changed(
&self,
updated_child_positions: &ColumnMask,
child_tbl: &BTreeTable,
) -> bool {
if self
.child_pos
.iter()
.any(|p| updated_child_positions.get(*p))
{
return true;
}
if self.fk.child_columns.len() == 1 {
let (i, col) = child_tbl.get_column(&self.fk.child_columns[0]).unwrap();
if col.is_rowid_alias() && updated_child_positions.get(i) {
return true;
}
}
false
}
}
#[derive(Debug, Clone)]
pub struct Column {
pub name: Option<String>,
pub ty_str: String,
pub ty_params: Vec<Box<Expr>>,
pub default: Option<Box<Expr>>,
generated_type: GeneratedType,
raw: u16,
explicit_notnull: bool,
pub notnull_conflict_clause: Option<ResolveType>,
}
#[derive(Default)]
pub struct ColDef {
pub primary_key: bool,
pub rowid_alias: bool,
pub notnull: bool,
pub explicit_notnull: bool,
pub unique: bool,
pub hidden: bool,
pub notnull_conflict_clause: Option<ResolveType>,
}
#[derive(Debug, Clone)]
pub enum GeneratedType {
Virtual {
expr: Box<Expr>,
original_sql: String,
},
NotGenerated,
}
const F_PRIMARY_KEY: u16 = 1;
const F_ROWID_ALIAS: u16 = 2;
const F_NOTNULL: u16 = 4;
const F_UNIQUE: u16 = 8;
const F_HIDDEN: u16 = 16;
const TYPE_SHIFT: u16 = 5;
const TYPE_MASK: u16 = 0b111 << TYPE_SHIFT;
const COLL_SHIFT: u16 = TYPE_SHIFT + 3;
const COLL_MASK: u16 = 0b11 << COLL_SHIFT;
const BASE_AFF_SHIFT: u16 = COLL_SHIFT + 2;
const BASE_AFF_MASK: u16 = 0b111 << BASE_AFF_SHIFT;
const ARRAY_DIM_SHIFT: u16 = 13;
const ARRAY_DIM_MASK: u16 = 0b111 << ARRAY_DIM_SHIFT;
impl Column {
pub fn affinity(&self) -> Affinity {
let v = ((self.raw & BASE_AFF_MASK) >> BASE_AFF_SHIFT) as u8;
if v > 0 {
match v {
1 => Affinity::Integer,
2 => Affinity::Text,
3 => Affinity::Blob,
4 => Affinity::Real,
_ => Affinity::Numeric,
}
} else {
Affinity::affinity(&self.ty_str)
}
}
pub fn set_base_affinity(&mut self, affinity: Affinity) {
let v: u16 = match affinity {
Affinity::Integer => 1,
Affinity::Text => 2,
Affinity::Blob => 3,
Affinity::Real => 4,
Affinity::Numeric => 5,
};
self.raw = (self.raw & !BASE_AFF_MASK) | ((v << BASE_AFF_SHIFT) & BASE_AFF_MASK);
}
pub fn affinity_with_strict(&self, is_strict: bool) -> Affinity {
if is_strict && self.ty_str.eq_ignore_ascii_case("ANY") {
Affinity::Blob
} else {
self.affinity()
}
}
pub fn new_default_text(
name: Option<String>,
ty_str: String,
default: Option<Box<Expr>>,
) -> Self {
Self::new(
name,
ty_str,
default,
None,
Type::Text,
None,
ColDef::default(),
)
}
pub fn new_default_integer(
name: Option<String>,
ty_str: String,
default: Option<Box<Expr>>,
) -> Self {
Self::new(
name,
ty_str,
default,
None,
Type::Integer,
None,
ColDef::default(),
)
}
#[inline]
pub fn new(
name: Option<String>,
ty_str: String,
default: Option<Box<Expr>>,
generated: Option<Box<Expr>>,
ty: Type,
col: Option<CollationSeq>,
coldef: ColDef,
) -> Self {
let generated_type = match generated {
Some(expr) => {
let original_sql = expr.to_string();
GeneratedType::Virtual { expr, original_sql }
}
None => GeneratedType::NotGenerated,
};
let mut raw = 0u16;
raw |= (ty as u16) << TYPE_SHIFT;
if let Some(c) = col {
raw |= (c as u16) << COLL_SHIFT;
}
if coldef.primary_key {
raw |= F_PRIMARY_KEY
}
if coldef.rowid_alias {
raw |= F_ROWID_ALIAS
}
if coldef.notnull {
raw |= F_NOTNULL
}
if coldef.unique {
raw |= F_UNIQUE
}
if coldef.hidden {
raw |= F_HIDDEN
}
Self {
name,
ty_str,
ty_params: Vec::new(),
default,
generated_type,
raw,
explicit_notnull: coldef.explicit_notnull,
notnull_conflict_clause: coldef.notnull_conflict_clause,
}
}
#[inline]
pub const fn ty(&self) -> Type {
let v = ((self.raw & TYPE_MASK) >> TYPE_SHIFT) as u8;
Type::from_bits(v)
}
#[inline]
pub const fn set_ty(&mut self, ty: Type) {
self.raw = (self.raw & !TYPE_MASK) | (((ty as u16) << TYPE_SHIFT) & TYPE_MASK);
}
#[inline]
pub const fn collation_opt(&self) -> Option<CollationSeq> {
if self.has_explicit_collation() {
Some(self.collation())
} else {
None
}
}
#[inline]
pub const fn collation(&self) -> CollationSeq {
let v = ((self.raw & COLL_MASK) >> COLL_SHIFT) as u8;
CollationSeq::from_bits(v)
}
#[inline]
pub const fn has_explicit_collation(&self) -> bool {
let v = ((self.raw & COLL_MASK) >> COLL_SHIFT) as u8;
v != CollationSeq::Unset as u8
}
#[inline]
pub const fn set_collation(&mut self, c: Option<CollationSeq>) {
if let Some(c) = c {
self.raw = (self.raw & !COLL_MASK) | (((c as u16) << COLL_SHIFT) & COLL_MASK);
}
}
#[inline]
pub fn primary_key(&self) -> bool {
self.raw & F_PRIMARY_KEY != 0
}
#[inline]
pub const fn is_rowid_alias(&self) -> bool {
self.raw & F_ROWID_ALIAS != 0
}
#[inline]
pub const fn notnull(&self) -> bool {
self.raw & F_NOTNULL != 0
}
#[inline]
pub const fn explicit_notnull(&self) -> bool {
self.explicit_notnull
}
#[inline]
pub const fn unique(&self) -> bool {
self.raw & F_UNIQUE != 0
}
#[inline]
pub const fn hidden(&self) -> bool {
self.raw & F_HIDDEN != 0
}
pub fn ensure_not_generated(&self, verb_phrase: &str, col_name: &str) -> Result<()> {
if !matches!(self.generated_type, GeneratedType::NotGenerated) {
bail_parse_error!("cannot {} generated column \"{}\"", verb_phrase, col_name);
}
Ok(())
}
#[inline]
pub fn generated_type(&self) -> &GeneratedType {
&self.generated_type
}
#[inline]
pub const fn is_generated(&self) -> bool {
!matches!(self.generated_type, GeneratedType::NotGenerated)
}
#[inline]
pub const fn is_virtual_generated(&self) -> bool {
matches!(self.generated_type, GeneratedType::Virtual { .. })
}
#[inline]
pub fn generated_expr(&self) -> Option<&Expr> {
match &self.generated_type {
GeneratedType::Virtual { expr, .. } => Some(expr.as_ref()),
GeneratedType::NotGenerated => None,
}
}
#[inline]
pub fn generated_expr_mut(&mut self) -> Option<&mut Expr> {
match &mut self.generated_type {
GeneratedType::Virtual { expr, .. } => Some(expr.as_mut()),
GeneratedType::NotGenerated => None,
}
}
#[inline]
pub fn set_generated_original_sql(&mut self, new_sql: String) {
if let GeneratedType::Virtual {
ref mut original_sql,
..
} = self.generated_type
{
*original_sql = new_sql;
}
}
#[inline]
pub const fn set_primary_key(&mut self, v: bool) {
self.set_flag(F_PRIMARY_KEY, v);
}
#[inline]
pub const fn set_rowid_alias(&mut self, v: bool) {
self.set_flag(F_ROWID_ALIAS, v);
}
#[inline]
pub const fn set_notnull(&mut self, v: bool) {
self.set_flag(F_NOTNULL, v);
}
#[inline]
pub const fn set_unique(&mut self, v: bool) {
self.set_flag(F_UNIQUE, v);
}
#[inline]
pub const fn set_hidden(&mut self, v: bool) {
self.set_flag(F_HIDDEN, v);
}
#[inline]
pub const fn is_array(&self) -> bool {
(self.raw & ARRAY_DIM_MASK) != 0
}
#[inline]
pub const fn array_dimensions(&self) -> u32 {
((self.raw & ARRAY_DIM_MASK) >> ARRAY_DIM_SHIFT) as u32
}
#[inline]
pub fn set_array_dimensions(&mut self, dims: u32) {
assert!(dims <= 7, "array dimensions must be <= 7");
self.raw = (self.raw & !ARRAY_DIM_MASK) | ((dims as u16) << ARRAY_DIM_SHIFT);
}
#[inline]
const fn set_flag(&mut self, mask: u16, val: bool) {
if val {
self.raw |= mask
} else {
self.raw &= !mask
}
}
}
impl TryFrom<&ColumnDefinition> for Column {
type Error = crate::LimboError;
fn try_from(value: &ColumnDefinition) -> crate::Result<Self> {
let name = value.col_name.as_str();
let mut default = None;
let mut generated = None;
let mut notnull = false;
let mut notnull_conflict_clause = None;
let mut primary_key = false;
let mut unique = false;
let mut collation = None;
for ast::NamedColumnConstraint { constraint, .. } in &value.constraints {
match constraint {
ast::ColumnConstraint::PrimaryKey { .. } => primary_key = true,
ast::ColumnConstraint::NotNull {
conflict_clause, ..
} => {
notnull = true;
notnull_conflict_clause = *conflict_clause;
}
ast::ColumnConstraint::Unique(..) => unique = true,
ast::ColumnConstraint::Default(expr) => {
default.replace(
translate_ident_to_string_literal(expr).unwrap_or_else(|| expr.clone()),
);
}
ast::ColumnConstraint::Collate { collation_name } => {
collation.replace(CollationSeq::new(collation_name.as_str())?);
}
ast::ColumnConstraint::Generated { expr, .. } => {
generated = Some(expr.clone());
}
_ => {}
};
}
let ty = match value.col_type {
Some(ref data_type) => type_from_name(&data_type.name).0,
None => Type::Null,
};
let ty_str = value
.col_type
.as_ref()
.map(|t| t.name.to_string())
.unwrap_or_default();
let ty_params: Vec<Box<turso_parser::ast::Expr>> = match &value.col_type {
Some(ast::Type {
size: Some(ast::TypeSize::MaxSize(ref expr)),
..
}) => vec![expr.clone()],
Some(ast::Type {
size: Some(ast::TypeSize::TypeSize(ref e1, ref e2)),
..
}) => vec![e1.clone(), e2.clone()],
_ => Vec::new(),
};
let hidden = ty_str.contains("HIDDEN");
let mut col = Column::new(
Some(name.to_string()),
ty_str,
default,
generated,
ty,
collation,
ColDef {
primary_key,
rowid_alias: primary_key && matches!(ty, Type::Integer),
notnull,
explicit_notnull: notnull,
unique,
hidden,
notnull_conflict_clause,
},
);
col.ty_params = ty_params;
if let Some(t) = value.col_type.as_ref() {
if t.is_array() {
col.set_array_dimensions(t.array_dimensions);
}
}
Ok(col)
}
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Type {
Null = 0,
Text = 1,
Numeric = 2,
Integer = 3,
Real = 4,
Blob = 5,
}
impl Type {
#[inline]
const fn from_bits(bits: u8) -> Self {
match bits {
0 => Type::Null,
1 => Type::Text,
2 => Type::Numeric,
3 => Type::Integer,
4 => Type::Real,
5 => Type::Blob,
_ => Type::Null,
}
}
}
impl fmt::Display for Type {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
Self::Null => "",
Self::Text => "TEXT",
Self::Numeric => "NUMERIC",
Self::Integer => "INTEGER",
Self::Real => "REAL",
Self::Blob => "BLOB",
};
write!(f, "{s}")
}
}
pub fn sqlite_schema_table() -> BTreeTable {
let columns = vec![
Column::new_default_text(Some("type".to_string()), "TEXT".to_string(), None),
Column::new_default_text(Some("name".to_string()), "TEXT".to_string(), None),
Column::new_default_text(Some("tbl_name".to_string()), "TEXT".to_string(), None),
Column::new_default_integer(Some("rootpage".to_string()), "INT".to_string(), None),
Column::new_default_text(Some("sql".to_string()), "TEXT".to_string(), None),
];
let logical_to_physical_map = BTreeTable::build_logical_to_physical_map(&columns, &[], true);
BTreeTable {
root_page: 1,
name: "sqlite_schema".to_string(),
has_rowid: true,
is_strict: false,
has_autoincrement: false,
primary_key_columns: vec![],
columns,
foreign_keys: vec![],
check_constraints: vec![],
rowid_alias_conflict_clause: None,
unique_sets: vec![],
has_virtual_columns: false,
logical_to_physical_map,
column_dependencies: Default::default(),
}
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct Index {
pub name: String,
pub table_name: String,
pub root_page: i64,
pub columns: Vec<IndexColumn>,
pub unique: bool,
pub ephemeral: bool,
pub has_rowid: bool,
pub where_clause: Option<Box<Expr>>,
pub index_method: Option<Arc<dyn IndexMethodAttachment>>,
pub on_conflict: Option<ResolveType>,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct IndexColumn {
pub name: String,
pub order: SortOrder,
pub pos_in_table: usize,
pub collation: Option<CollationSeq>,
pub default: Option<Box<Expr>>,
pub expr: Option<Box<Expr>>,
}
impl Index {
pub fn from_sql(
syms: &SymbolTable,
sql: &str,
root_page: i64,
table: &BTreeTable,
) -> Result<Index> {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next_cmd()?;
match cmd {
Some(Cmd::Stmt(Stmt::CreateIndex {
idx_name,
tbl_name,
columns,
unique,
where_clause,
using,
with_clause,
..
})) => {
let index_name = normalize_ident(idx_name.name.as_str());
let index_columns = resolve_sorted_columns(table, &columns)?;
if let Some(using) = using {
if where_clause.is_some() {
bail_parse_error!("custom index module do not support partial indices");
}
if unique {
bail_parse_error!("custom index module do not support UNIQUE indices");
}
let parameters = resolve_index_method_parameters(with_clause)?;
let Some(module) = syms.index_methods.get(using.as_str()) else {
bail_parse_error!("unknown module name: '{}'", using);
};
let configuration = IndexMethodConfiguration {
table_name: table.name.clone(),
index_name: index_name.clone(),
columns: index_columns.clone(),
parameters,
};
let descriptor = module.attach(&configuration)?;
Ok(Index {
name: index_name,
table_name: normalize_ident(tbl_name.as_str()),
root_page,
columns: index_columns,
unique: false,
ephemeral: false,
has_rowid: table.has_rowid,
where_clause: None,
index_method: Some(descriptor),
on_conflict: None,
})
} else {
Ok(Index {
name: index_name,
table_name: normalize_ident(tbl_name.as_str()),
root_page,
columns: index_columns,
unique,
ephemeral: false,
has_rowid: table.has_rowid,
where_clause,
index_method: None,
on_conflict: None,
})
}
}
_ => todo!("Expected create index statement"),
}
}
pub fn is_expression_index(&self) -> bool {
self.columns.iter().any(|c| c.expr.is_some())
}
pub fn is_backing_btree_index(&self) -> bool {
self.index_method
.as_ref()
.is_some_and(|x| x.definition().backing_btree)
}
pub fn automatic_from_primary_key(
table: &BTreeTable,
auto_index: (String, i64), column_count: usize,
conflict_clause: Option<ResolveType>,
) -> Result<Index> {
let has_primary_key_index =
table.get_rowid_alias_column().is_none() && !table.primary_key_columns.is_empty();
assert!(has_primary_key_index);
let (index_name, root_page) = auto_index;
let mut primary_keys = Vec::with_capacity(column_count);
for (col_name, order) in table.primary_key_columns.iter() {
let Some((pos_in_table, _)) = table.get_column(col_name) else {
return Err(crate::LimboError::ParseError(format!(
"Column {} not found in table {}",
col_name, table.name
)));
};
let (_, column) = table.get_column(col_name).unwrap();
primary_keys.push(IndexColumn {
name: normalize_ident(col_name),
order: *order,
pos_in_table,
collation: column.collation_opt(),
default: column.default.clone(),
expr: None,
});
}
assert!(primary_keys.len() == column_count);
Ok(Index {
name: normalize_ident(index_name.as_str()),
table_name: table.name.clone(),
root_page,
columns: primary_keys,
unique: true,
ephemeral: false,
has_rowid: table.has_rowid,
where_clause: None,
index_method: None,
on_conflict: conflict_clause,
})
}
pub fn automatic_from_unique(
table: &BTreeTable,
auto_index: (String, i64), column_indices_and_sort_orders: Vec<(usize, SortOrder)>,
conflict_clause: Option<ResolveType>,
) -> Result<Index> {
let (index_name, root_page) = auto_index;
let mut unique_cols = Vec::with_capacity(column_indices_and_sort_orders.len());
for (pos, sort_order) in &column_indices_and_sort_orders {
let Some((pos_in_table, col)) = table
.columns
.iter()
.enumerate()
.find(|(pos_in_table, _)| pos == pos_in_table)
else {
return Err(crate::LimboError::ParseError(format!(
"Unique constraint column not found in table {}",
table.name
)));
};
unique_cols.push(IndexColumn {
name: normalize_ident(col.name.as_ref().unwrap()),
order: *sort_order,
pos_in_table,
collation: col.collation_opt(),
default: col.default.clone(),
expr: None,
});
}
Ok(Index {
name: normalize_ident(index_name.as_str()),
table_name: table.name.clone(),
root_page,
columns: unique_cols,
unique: true,
ephemeral: false,
has_rowid: table.has_rowid,
where_clause: None,
index_method: None,
on_conflict: conflict_clause,
})
}
pub fn column_table_pos_to_index_pos(&self, table_pos: usize) -> Option<usize> {
self.columns
.iter()
.position(|c| c.pos_in_table == table_pos)
}
pub fn expression_to_index_pos(&self, expr: &Expr) -> Option<usize> {
self.columns.iter().position(|c| {
c.expr
.as_ref()
.is_some_and(|e| exprs_are_equivalent(e, expr))
})
}
pub fn validate_where_expr(&self, table: &Table, _resolver: &Resolver) -> bool {
let Some(where_clause) = &self.where_clause else {
return true;
};
let tbl_norm = self.table_name.as_str();
let has_col = |name: &str| {
table.columns().iter().any(|c| {
c.name
.as_ref()
.is_some_and(|cn| cn.eq_ignore_ascii_case(name))
})
};
let is_tbl = |ns: &str| normalize_ident(ns) == tbl_norm;
let is_deterministic_fn = |name: &str, argc: usize| {
let n = normalize_ident(name);
Func::resolve_function(&n, argc).is_ok_and(|f| f.is_some_and(|f| f.is_deterministic()))
};
let mut ok = true;
let _ = walk_expr(where_clause.as_ref(), &mut |e: &Expr| -> crate::Result<
WalkControl,
> {
if !ok {
return Ok(WalkControl::SkipChildren);
}
match e {
Expr::Literal(_) | Expr::RowId { .. } => {}
Expr::Id(n) => {
let n = n.as_str();
if !ROWID_STRS.iter().any(|s| s.eq_ignore_ascii_case(n)) && !has_col(n) {
ok = false;
}
}
Expr::Qualified(ns, col) | Expr::DoublyQualified(_, ns, col) => {
if !is_tbl(ns.as_str()) || !has_col(col.as_str()) {
ok = false;
}
}
Expr::FunctionCall {
name, filter_over, ..
}
| Expr::FunctionCallStar {
name, filter_over, ..
} => {
if filter_over.over_clause.is_some() {
ok = false;
} else {
let argc = match e {
Expr::FunctionCall { args, .. } => args.len(),
Expr::FunctionCallStar { .. } => 0,
_ => unreachable!(),
};
if !is_deterministic_fn(name.as_str(), argc) {
ok = false;
}
}
}
Expr::Exists(_)
| Expr::InSelect { .. }
| Expr::Subquery(_)
| Expr::Raise { .. }
| Expr::Variable(_) => {
ok = false;
}
_ => {}
}
Ok(if ok {
WalkControl::Continue
} else {
WalkControl::SkipChildren
})
});
ok
}
pub fn bind_where_expr(
&self,
table_refs: Option<&mut TableReferences>,
resolver: &Resolver,
) -> Option<ast::Expr> {
let Some(where_clause) = &self.where_clause else {
return None;
};
let mut expr = where_clause.clone();
bind_and_rewrite_expr(
&mut expr,
table_refs,
None,
resolver,
BindingBehavior::ResultColumnsNotAllowed,
)
.ok()?;
Some(*expr)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
pub fn test_has_rowid_true() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
assert!(table.has_rowid, "has_rowid should be set to true");
Ok(())
}
#[test]
pub fn test_has_rowid_false() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT) WITHOUT ROWID;"#;
let table = BTreeTable::from_sql(sql, 0)?;
assert!(!table.has_rowid, "has_rowid should be set to false");
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_text() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a TEXT PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!column.is_rowid_alias(),
"column 'a´ has type different than INTEGER so can't be a rowid alias"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_integer() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
column.is_rowid_alias(),
"column 'a´ should be a rowid alias"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_integer_separate_primary_key_definition() -> Result<()>
{
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
column.is_rowid_alias(),
"column 'a´ should be a rowid alias"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_integer_separate_primary_key_definition_without_rowid(
) -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a)) WITHOUT ROWID;"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!column.is_rowid_alias(),
"column 'a´ shouldn't be a rowid alias because table has no rowid"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_integer_without_rowid() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT) WITHOUT ROWID;"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!column.is_rowid_alias(),
"column 'a´ shouldn't be a rowid alias because table has no rowid"
);
Ok(())
}
#[test]
pub fn test_multiple_pk_forbidden() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT PRIMARY KEY);"#;
let table = BTreeTable::from_sql(sql, 0);
let error = table.unwrap_err();
assert!(
matches!(error, LimboError::ParseError(e) if e.contains("table \"t1\" has more than one primary key"))
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_separate_composite_primary_key_definition() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!column.is_rowid_alias(),
"column 'a´ shouldn't be a rowid alias because table has composite primary key"
);
Ok(())
}
#[test]
pub fn test_primary_key_inline_single() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT, c REAL);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key(), "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(
!column.primary_key(),
"column 'b' shouldn't be a primary key"
);
let column = table.get_column("c").unwrap().1;
assert!(
!column.primary_key(),
"column 'c' shouldn't be a primary key"
);
assert_eq!(
vec![("a".to_string(), SortOrder::Asc)],
table.primary_key_columns,
"primary key column names should be ['a']"
);
Ok(())
}
#[test]
pub fn test_primary_key_inline_multiple_forbidden() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT PRIMARY KEY, c REAL);"#;
let table = BTreeTable::from_sql(sql, 0);
let error = table.unwrap_err();
assert!(
matches!(error, LimboError::ParseError(e) if e.contains("table \"t1\" has more than one primary key"))
);
Ok(())
}
#[test]
pub fn test_conflicting_on_conflict_unique_rejected() -> Result<()> {
let sql =
r#"CREATE TABLE t1 (a UNIQUE ON CONFLICT FAIL, b, UNIQUE(a) ON CONFLICT IGNORE);"#;
let table = BTreeTable::from_sql(sql, 0);
let error = table.unwrap_err();
assert!(
matches!(error, LimboError::ParseError(e) if e.contains("conflicting ON CONFLICT clauses"))
);
Ok(())
}
#[test]
pub fn test_conflicting_on_conflict_composite_unique_rejected() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a, b, UNIQUE(a, b) ON CONFLICT FAIL, UNIQUE(a, b) ON CONFLICT IGNORE);"#;
let table = BTreeTable::from_sql(sql, 0);
let error = table.unwrap_err();
assert!(
matches!(error, LimboError::ParseError(e) if e.contains("conflicting ON CONFLICT clauses"))
);
Ok(())
}
#[test]
pub fn test_same_on_conflict_unique_allowed() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a UNIQUE ON CONFLICT FAIL, b, UNIQUE(a) ON CONFLICT FAIL);"#;
assert!(BTreeTable::from_sql(sql, 0).is_ok());
Ok(())
}
#[test]
pub fn test_one_on_conflict_unique_allowed() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a UNIQUE ON CONFLICT FAIL, b, UNIQUE(a));"#;
assert!(BTreeTable::from_sql(sql, 0).is_ok());
Ok(())
}
#[test]
pub fn test_primary_key_separate_single() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY(a desc));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key(), "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(
!column.primary_key(),
"column 'b' shouldn't be a primary key"
);
let column = table.get_column("c").unwrap().1;
assert!(
!column.primary_key(),
"column 'c' shouldn't be a primary key"
);
assert_eq!(
vec![("a".to_string(), SortOrder::Desc)],
table.primary_key_columns,
"primary key column names should be ['a']"
);
Ok(())
}
#[test]
pub fn test_primary_key_separate_multiple() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY(a, b desc));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key(), "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(column.primary_key(), "column 'b' shouldn be a primary key");
let column = table.get_column("c").unwrap().1;
assert!(
!column.primary_key(),
"column 'c' shouldn't be a primary key"
);
assert_eq!(
vec![
("a".to_string(), SortOrder::Asc),
("b".to_string(), SortOrder::Desc)
],
table.primary_key_columns,
"primary key column names should be ['a', 'b']"
);
Ok(())
}
#[test]
pub fn test_primary_key_separate_single_quoted() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY('a'));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key(), "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(
!column.primary_key(),
"column 'b' shouldn't be a primary key"
);
let column = table.get_column("c").unwrap().1;
assert!(
!column.primary_key(),
"column 'c' shouldn't be a primary key"
);
assert_eq!(
vec![("a".to_string(), SortOrder::Asc)],
table.primary_key_columns,
"primary key column names should be ['a']"
);
Ok(())
}
#[test]
pub fn test_primary_key_separate_single_doubly_quoted() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY("a"));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key(), "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(
!column.primary_key(),
"column 'b' shouldn't be a primary key"
);
let column = table.get_column("c").unwrap().1;
assert!(
!column.primary_key(),
"column 'c' shouldn't be a primary key"
);
assert_eq!(
vec![("a".to_string(), SortOrder::Asc)],
table.primary_key_columns,
"primary key column names should be ['a']"
);
Ok(())
}
#[test]
pub fn test_default_value() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER DEFAULT 23);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
let default = column.default.clone().unwrap();
assert_eq!(default.to_string(), "23");
Ok(())
}
#[test]
pub fn test_col_notnull() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER NOT NULL);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.notnull());
Ok(())
}
#[test]
pub fn test_col_notnull_negative() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(!column.notnull());
Ok(())
}
#[test]
pub fn test_col_type_string_integer() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a InTeGeR);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert_eq!(column.ty_str, "InTeGeR");
Ok(())
}
#[test]
pub fn test_sqlite_schema() {
let expected = r#"CREATE TABLE sqlite_schema (type TEXT, name TEXT, tbl_name TEXT, rootpage INT, sql TEXT)"#;
let actual = sqlite_schema_table().to_sql();
assert_eq!(expected, actual);
}
#[test]
pub fn test_special_column_names() -> Result<()> {
let tests = [
("foobar", "CREATE TABLE t (foobar TEXT)"),
("_table_name3", r#"CREATE TABLE t (_table_name3 TEXT)"#),
("special name", r#"CREATE TABLE t ("special name" TEXT)"#),
("foo&bar", r#"CREATE TABLE t ("foo&bar" TEXT)"#),
(" name", r#"CREATE TABLE t (" name" TEXT)"#),
];
for (input_column_name, expected_sql) in tests {
let sql = format!(r#"CREATE TABLE t ("{input_column_name}" TEXT)"#);
let actual = BTreeTable::from_sql(&sql, 0)?.to_sql();
assert_eq!(expected_sql, actual);
}
Ok(())
}
#[test]
fn test_special_table_names_are_quoted_in_to_sql() -> Result<()> {
let tests = [
(
r#"CREATE TABLE "t t" (x TEXT)"#,
r#"CREATE TABLE "t t" (x TEXT)"#,
),
(
r#"CREATE TABLE "123table" (x TEXT)"#,
r#"CREATE TABLE "123table" (x TEXT)"#,
),
(
r#"CREATE TABLE "t""t" (x TEXT)"#,
r#"CREATE TABLE "t""t" (x TEXT)"#,
),
];
for (input_sql, expected_sql) in tests {
let actual = BTreeTable::from_sql(input_sql, 0)?.to_sql();
assert_eq!(actual, expected_sql);
}
Ok(())
}
#[test]
#[should_panic]
fn test_automatic_index_single_column() {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0).unwrap();
let _index = Index::automatic_from_primary_key(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
1,
None,
)
.unwrap();
}
#[test]
fn test_automatic_index_composite_key() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index = Index::automatic_from_primary_key(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
2,
None,
)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 2);
assert_eq!(index.columns[0].name, "a");
assert_eq!(index.columns[1].name, "b");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
assert!(matches!(index.columns[1].order, SortOrder::Asc));
Ok(())
}
#[test]
#[should_panic]
fn test_automatic_index_no_primary_key() {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0).unwrap();
Index::automatic_from_primary_key(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
1,
None,
)
.unwrap();
}
#[test]
fn test_automatic_index_nonexistent_column() {
let columns = vec![Column::new_default_integer(
Some("a".to_string()),
"INT".to_string(),
None,
)];
let logical_to_physical_map =
BTreeTable::build_logical_to_physical_map(&columns, &[], true);
let table = BTreeTable {
root_page: 0,
name: "t1".to_string(),
has_rowid: true,
is_strict: false,
has_autoincrement: false,
primary_key_columns: vec![("nonexistent".to_string(), SortOrder::Asc)],
columns,
unique_sets: vec![],
foreign_keys: vec![],
check_constraints: vec![],
rowid_alias_conflict_clause: None,
has_virtual_columns: false,
logical_to_physical_map,
column_dependencies: Default::default(),
};
let result = Index::automatic_from_primary_key(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
1,
None,
);
assert!(result.is_err());
}
#[test]
fn test_automatic_index_unique_column() -> Result<()> {
let sql = r#"CREATE table t1 (x INTEGER, y INTEGER UNIQUE);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index = Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
vec![(1, SortOrder::Asc)],
None,
)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "y");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_automatic_index_pkey_unique_column() -> Result<()> {
let sql = r#"CREATE TABLE t1 (x PRIMARY KEY, y UNIQUE);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let indices = [
Index::automatic_from_primary_key(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
1,
None,
)?,
Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_2".to_string(), 3),
vec![(1, SortOrder::Asc)],
None,
)?,
];
assert_eq!(indices[0].name, "sqlite_autoindex_t1_1");
assert_eq!(indices[0].table_name, "t1");
assert_eq!(indices[0].root_page, 2);
assert!(indices[0].unique);
assert_eq!(indices[0].columns.len(), 1);
assert_eq!(indices[0].columns[0].name, "x");
assert!(matches!(indices[0].columns[0].order, SortOrder::Asc));
assert_eq!(indices[1].name, "sqlite_autoindex_t1_2");
assert_eq!(indices[1].table_name, "t1");
assert_eq!(indices[1].root_page, 3);
assert!(indices[1].unique);
assert_eq!(indices[1].columns.len(), 1);
assert_eq!(indices[1].columns[0].name, "y");
assert!(matches!(indices[1].columns[0].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_automatic_index_pkey_many_unique_columns() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a PRIMARY KEY, b UNIQUE, c, d, UNIQUE(c, d));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let auto_indices = [
("sqlite_autoindex_t1_1".to_string(), 2),
("sqlite_autoindex_t1_2".to_string(), 3),
("sqlite_autoindex_t1_3".to_string(), 4),
];
let indices = vec![
Index::automatic_from_primary_key(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
1,
None,
)?,
Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_2".to_string(), 3),
vec![(1, SortOrder::Asc)],
None,
)?,
Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_3".to_string(), 4),
vec![(2, SortOrder::Asc), (3, SortOrder::Asc)],
None,
)?,
];
assert!(indices.len() == auto_indices.len());
for (pos, index) in indices.iter().enumerate() {
let (index_name, root_page) = &auto_indices[pos];
assert_eq!(index.name, *index_name);
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, *root_page);
assert!(index.unique);
if pos == 0 {
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "a");
} else if pos == 1 {
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "b");
} else if pos == 2 {
assert_eq!(index.columns.len(), 2);
assert_eq!(index.columns[0].name, "c");
assert_eq!(index.columns[1].name, "d");
}
assert!(matches!(index.columns[0].order, SortOrder::Asc));
}
Ok(())
}
#[test]
fn test_automatic_index_unique_set_dedup() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a, b, UNIQUE(a, b), UNIQUE(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index = Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
vec![(0, SortOrder::Asc), (1, SortOrder::Asc)],
None,
)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 2);
assert_eq!(index.columns[0].name, "a");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
assert_eq!(index.columns[1].name, "b");
assert!(matches!(index.columns[1].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_automatic_index_primary_key_is_unique() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a primary key unique);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index = Index::automatic_from_primary_key(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
1,
None,
)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "a");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_automatic_index_primary_key_is_unique_and_composite() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a, b, PRIMARY KEY(a, b), UNIQUE(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index = Index::automatic_from_primary_key(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
2,
None,
)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 2);
assert_eq!(index.columns[0].name, "a");
assert_eq!(index.columns[1].name, "b");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_strict_table_to_sql() -> Result<()> {
let sql = r#"CREATE TABLE test_strict (id INTEGER, name TEXT) STRICT"#;
let table = BTreeTable::from_sql(sql, 0)?;
assert!(table.is_strict);
let reconstructed_sql = table.to_sql();
assert!(
reconstructed_sql.contains("STRICT"),
"Reconstructed SQL should contain STRICT keyword: {reconstructed_sql}"
);
assert_eq!(
reconstructed_sql,
"CREATE TABLE test_strict (id INTEGER, name TEXT) STRICT"
);
Ok(())
}
#[test]
fn test_non_strict_table_to_sql() -> Result<()> {
let sql = r#"CREATE TABLE test_normal (id INTEGER, name TEXT)"#;
let table = BTreeTable::from_sql(sql, 0)?;
assert!(!table.is_strict);
let reconstructed_sql = table.to_sql();
assert!(
!reconstructed_sql.contains("STRICT"),
"Non-strict table SQL should not contain STRICT keyword: {reconstructed_sql}"
);
assert_eq!(
reconstructed_sql,
"CREATE TABLE test_normal (id INTEGER, name TEXT)"
);
Ok(())
}
#[test]
fn test_without_rowid_preserved_in_sql() -> Result<()> {
let sql = r#"CREATE TABLE t(code TEXT PRIMARY KEY, val TEXT) WITHOUT ROWID"#;
let table = BTreeTable::from_sql(sql, 0)?;
assert!(table.get_column("code").unwrap().1.notnull());
assert_eq!(
table.to_sql(),
"CREATE TABLE t (code TEXT PRIMARY KEY, val TEXT) WITHOUT ROWID"
);
Ok(())
}
#[test]
fn test_strict_without_rowid_preserved_in_sql() -> Result<()> {
let sql = r#"CREATE TABLE t(code TEXT PRIMARY KEY, val TEXT) STRICT, WITHOUT ROWID"#;
let table = BTreeTable::from_sql(sql, 0)?;
assert!(table.get_column("code").unwrap().1.notnull());
assert_eq!(
table.to_sql(),
"CREATE TABLE t (code TEXT PRIMARY KEY, val TEXT) STRICT, WITHOUT ROWID"
);
Ok(())
}
#[test]
fn test_automatic_index_unique_and_a_pk() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a NUMERIC UNIQUE UNIQUE, b TEXT PRIMARY KEY)"#;
let table = BTreeTable::from_sql(sql, 0)?;
let mut indexes = vec![
Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
vec![(0, SortOrder::Asc)],
None,
)?,
Index::automatic_from_primary_key(
&table,
("sqlite_autoindex_t1_2".to_string(), 3),
1,
None,
)?,
];
assert!(indexes.len() == 2);
let index = indexes.pop().unwrap();
assert_eq!(index.name, "sqlite_autoindex_t1_2");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 3);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "b");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
let index = indexes.pop().unwrap();
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "a");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_schema_loading_rejects_gencol_without_flag() {
let mut schema = Schema::new();
schema.generated_columns_enabled = false;
let result = schema.handle_schema_row(
"table",
"t1",
"t1",
2,
Some("CREATE TABLE t1(a INTEGER, b AS (a*2))"),
&SymbolTable::default(),
&mut Vec::new(),
&mut HashMap::default(),
&mut HashMap::default(),
&mut HashMap::default(),
&mut HashMap::default(),
&|_| None,
);
assert!(result
.unwrap_err()
.to_string()
.contains("generated columns"));
}
fn indices(mask: &ColumnMask) -> Vec<usize> {
let mut v: Vec<usize> = mask.iter().collect();
v.sort_unstable();
v
}
fn stored(bits: &ColumnMask) -> Vec<usize> {
let mut v: Vec<usize> = bits.iter().collect();
v.sort_unstable();
v
}
#[test]
fn gencol_graph_no_virtual_columns() -> Result<()> {
let t = BTreeTable::from_sql("CREATE TABLE t(a, b)", 0)?;
assert_eq!(indices(&t.columns_affected_by_update([0])?), vec![0]);
assert_eq!(indices(&t.columns_affected_by_update([0, 1])?), vec![0, 1]);
assert_eq!(stored(&t.dependencies_of_columns([0])?), vec![0]);
assert_eq!(stored(&t.dependencies_of_columns([])?), Vec::<usize>::new());
Ok(())
}
#[test]
fn gencol_graph_linear_chain() -> Result<()> {
let t = BTreeTable::from_sql("CREATE TABLE t(a, b AS (a) VIRTUAL, c AS (b) VIRTUAL)", 0)?;
assert_eq!(indices(&t.columns_affected_by_update([0])?), vec![0, 1, 2]);
assert_eq!(indices(&t.columns_affected_by_update([1])?), vec![1, 2]);
assert_eq!(stored(&t.dependencies_of_columns([2])?), vec![0]);
assert_eq!(stored(&t.dependencies_of_columns([1])?), vec![0]);
assert_eq!(stored(&t.dependencies_of_columns([0])?), vec![0]);
Ok(())
}
#[test]
fn gencol_graph_diamond() -> Result<()> {
let t = BTreeTable::from_sql(
"CREATE TABLE t(a, b AS (a) VIRTUAL, c AS (a) VIRTUAL, d AS (b + c) VIRTUAL)",
0,
)?;
assert_eq!(
indices(&t.columns_affected_by_update([0])?),
vec![0, 1, 2, 3]
);
assert_eq!(stored(&t.dependencies_of_columns([3])?), vec![0]);
assert_eq!(stored(&t.dependencies_of_columns([1])?), vec![0]);
Ok(())
}
#[test]
fn gencol_graph_multiple_stored_roots() -> Result<()> {
let t = BTreeTable::from_sql("CREATE TABLE t(a, b, c AS (a + b) VIRTUAL)", 0)?;
assert_eq!(indices(&t.columns_affected_by_update([0])?), vec![0, 2]);
assert_eq!(indices(&t.columns_affected_by_update([1])?), vec![1, 2]);
assert_eq!(
indices(&t.columns_affected_by_update([0, 1])?),
vec![0, 1, 2]
);
assert_eq!(stored(&t.dependencies_of_columns([2])?), vec![0, 1]);
Ok(())
}
#[test]
fn gencol_graph_empty_input() -> Result<()> {
let t = BTreeTable::from_sql("CREATE TABLE t(a, b AS (a) VIRTUAL)", 0)?;
assert!(t.columns_affected_by_update(std::iter::empty())?.is_empty());
assert!(t.dependencies_of_columns(std::iter::empty())?.is_empty());
Ok(())
}
#[test]
fn gencol_graph_disjoint_components() -> Result<()> {
let t = BTreeTable::from_sql(
"CREATE TABLE t(a, b AS (a) VIRTUAL, c, d AS (c) VIRTUAL)",
0,
)?;
assert_eq!(indices(&t.columns_affected_by_update([0])?), vec![0, 1]);
assert_eq!(indices(&t.columns_affected_by_update([2])?), vec![2, 3]);
assert_eq!(stored(&t.dependencies_of_columns([1])?), vec![0]);
assert_eq!(stored(&t.dependencies_of_columns([3])?), vec![2]);
Ok(())
}
#[test]
fn gencol_graph_deep_chain() -> Result<()> {
let mut sql = String::from("CREATE TABLE t(c0");
for i in 1..50 {
sql.push_str(&format!(", c{i} AS (c{prev}) VIRTUAL", prev = i - 1));
}
sql.push(')');
let t = BTreeTable::from_sql(&sql, 0)?;
let affected = t.columns_affected_by_update([0])?;
assert_eq!(affected.count(), 50);
assert_eq!(stored(&t.dependencies_of_columns([49])?), vec![0]);
Ok(())
}
#[test]
fn gencol_graph_very_deep_chain_no_stack_overflow() -> Result<()> {
let mut sql = String::from("CREATE TABLE t(c0");
for i in 1..500 {
sql.push_str(&format!(", c{i} AS (c{prev}) VIRTUAL", prev = i - 1));
}
sql.push(')');
let t = BTreeTable::from_sql(&sql, 0)?;
assert_eq!(t.columns_affected_by_update([0])?.count(), 500);
assert_eq!(stored(&t.dependencies_of_columns([499])?), vec![0]);
Ok(())
}
#[test]
fn gencol_graph_rowid_sentinel_passthrough() -> Result<()> {
let t = BTreeTable::from_sql("CREATE TABLE t(a, b AS (a) VIRTUAL)", 0)?;
let affected = t.columns_affected_by_update([ROWID_SENTINEL])?;
assert!(affected.get(ROWID_SENTINEL));
assert_eq!(affected.count(), 1);
Ok(())
}
#[test]
fn gencol_graph_transpose_duality() -> Result<()> {
let t = BTreeTable::from_sql(
"CREATE TABLE t(a, b AS (a) VIRTUAL, c AS (b) VIRTUAL, d AS (a + c) VIRTUAL)",
0,
)?;
let graph = t.column_graph()?;
for i in 0..graph.dependencies.len() {
for j in graph.dependencies[i].iter() {
assert!(
graph.dependents[j].get(i),
"transpose violated: {j} is in dependencies[{i}] but {i} is not in dependents[{j}]"
);
}
for j in graph.dependents[i].iter() {
assert!(
graph.dependencies[j].get(i),
"transpose violated: {j} is in dependents[{i}] but {i} is not in dependencies[{j}]"
);
}
}
Ok(())
}
#[test]
fn gencol_graph_idempotence() -> Result<()> {
let t = BTreeTable::from_sql(
"CREATE TABLE t(a, b, c AS (a) VIRTUAL, d AS (b + c) VIRTUAL)",
0,
)?;
let once = t.columns_affected_by_update([0, 1])?;
let twice = t.columns_affected_by_update(once.iter())?;
assert_eq!(indices(&twice), indices(&once));
Ok(())
}
#[test]
fn gencol_graph_union_monotonicity() -> Result<()> {
let t = BTreeTable::from_sql(
"CREATE TABLE t(a, b, c AS (a) VIRTUAL, d AS (b) VIRTUAL, e AS (c + d) VIRTUAL)",
0,
)?;
let mut expected = t.columns_affected_by_update([0])?;
let b_mask = t.columns_affected_by_update([1])?;
expected.union_with(&b_mask);
let union_mask = t.columns_affected_by_update([0, 1])?;
assert_eq!(indices(&union_mask), indices(&expected));
Ok(())
}
#[test]
fn gencol_graph_cycle_rejected() {
let err = BTreeTable::from_sql(
"CREATE TABLE t(stored, a AS (b) VIRTUAL, b AS (a) VIRTUAL)",
0,
)
.expect_err("cycle must be rejected");
assert!(
err.to_string().contains("circular dependency")
|| err.to_string().contains("cannot reference itself"),
"unexpected error: {err}"
);
}
#[test]
fn gencol_graph_three_cycle_rejected() {
let err = BTreeTable::from_sql(
"CREATE TABLE t(stored, a AS (b) VIRTUAL, b AS (c) VIRTUAL, c AS (a) VIRTUAL)",
0,
)
.expect_err("cycle must be rejected");
assert!(err.to_string().contains("circular dependency"));
}
#[test]
fn gencol_graph_self_reference_rejected() {
let err = BTreeTable::from_sql("CREATE TABLE t(a, b AS (b) VIRTUAL)", 0)
.expect_err("self-reference must be rejected");
assert!(err.to_string().contains("cannot reference itself"));
}
#[test]
#[allow(clippy::redundant_clone)]
fn gencol_graph_clone_invalidates_cache() -> Result<()> {
let original = BTreeTable::from_sql("CREATE TABLE t(a, b AS (a) VIRTUAL)", 0)?;
let _ = original.columns_affected_by_update([0])?;
assert!(original.peek_column_dependencies().is_some());
let cloned = original.clone();
assert!(cloned.peek_column_dependencies().is_none());
assert!(original.peek_column_dependencies().is_some());
assert_eq!(
indices(&cloned.columns_affected_by_update([0])?),
vec![0, 1]
);
assert!(cloned.peek_column_dependencies().is_some());
Ok(())
}
#[test]
fn gencol_graph_columns_mut_invalidates_cache() -> Result<()> {
let mut t = BTreeTable::from_sql("CREATE TABLE t(a, b AS (a) VIRTUAL)", 0)?;
let _ = t.columns_affected_by_update([0])?;
assert!(t.peek_column_dependencies().is_some());
let _ = t.columns_mut();
assert!(t.peek_column_dependencies().is_none());
Ok(())
}
}