use crate::translate::collate::CollationSeq;
use crate::translate::plan::SelectPlan;
use crate::VirtualTable;
use crate::{util::normalize_ident, Result};
use fallible_iterator::FallibleIterator;
use limbo_sqlite3_parser::ast::{Expr, Literal, SortOrder, TableOptions};
use limbo_sqlite3_parser::{
ast::{Cmd, CreateTableBody, QualifiedName, ResultColumn, Stmt},
lexer::sql::Parser,
};
use std::collections::BTreeSet;
use std::rc::Rc;
use tracing::trace;
use super::column::{Column, Type};
#[derive(Clone, Debug)]
pub struct BTreeTable {
pub root_page: usize,
pub name: String,
pub primary_key_columns: Vec<(String, SortOrder)>,
pub columns: Vec<Column>,
pub has_rowid: bool,
pub is_strict: bool,
pub unique_sets: Option<Vec<Vec<(String, SortOrder)>>>,
pub foreign_keys: Vec<ForeignKeyDef>,
}
impl BTreeTable {
pub fn get_rowid_alias_column(&self) -> Option<(usize, &Column)> {
if self.primary_key_columns.len() == 1 {
let (idx, col) = self.get_column(&self.primary_key_columns[0].0)?;
if self.column_is_rowid_alias(col) {
return Some((idx, col));
}
}
None
}
pub fn column_is_rowid_alias(&self, col: &Column) -> bool {
col.is_rowid_alias
}
pub fn get_column(&self, name: &str) -> Option<(usize, &Column)> {
let name = normalize_ident(name);
self.columns
.iter()
.enumerate()
.find(|(_, column)| column.name.as_ref() == Some(&name))
}
pub fn from_sql(sql: &str, root_page: usize) -> Result<BTreeTable> {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next()?;
match cmd {
Some(Cmd::Stmt(Stmt::CreateTable { tbl_name, body, .. })) => {
create_table(tbl_name, *body, root_page)
}
_ => todo!("Expected CREATE TABLE statement"),
}
}
pub fn to_sql(&self) -> String {
let mut sql = format!("CREATE TABLE {} (", self.name);
for (i, column) in self.columns.iter().enumerate() {
if i > 0 {
sql.push(',');
}
sql.push(' ');
sql.push_str(column.name.as_ref().expect("column name is None"));
sql.push(' ');
sql.push_str(&column.ty.to_string());
if column.unique {
sql.push_str(" UNIQUE");
}
if column.primary_key {
sql.push_str(" PRIMARY KEY");
}
if let Some(default) = &column.default {
sql.push_str(" DEFAULT ");
sql.push_str(&default.to_string());
}
}
sql.push_str(" )");
sql
}
pub fn column_collations(&self) -> Vec<Option<CollationSeq>> {
self.columns.iter().map(|column| column.collation).collect()
}
}
#[derive(Clone, Debug)]
pub struct ForeignKeyDef {
pub id: u32,
pub from_cols: Vec<String>,
pub to_table: String,
pub to_cols: Vec<String>,
pub on_update: String,
pub on_delete: String,
pub match_clause: String,
}
#[derive(Debug, Clone)]
pub struct FromClauseSubquery {
pub name: String,
pub plan: Box<SelectPlan>,
pub columns: Vec<Column>,
pub result_columns_start_reg: Option<usize>,
}
#[derive(Debug, Default)]
pub struct PseudoTable {
pub columns: Vec<Column>,
}
impl PseudoTable {
pub fn new() -> Self {
Self { columns: vec![] }
}
pub fn new_with_columns(columns: Vec<Column>) -> Self {
Self { columns }
}
pub fn add_column(&mut self, name: &str, ty: Type, primary_key: bool) {
self.columns.push(Column {
name: Some(normalize_ident(name)),
ty,
ty_str: ty.to_string().to_uppercase(),
primary_key,
is_rowid_alias: false,
notnull: false,
default: None,
unique: false,
collation: None,
});
}
pub fn get_column(&self, name: &str) -> Option<(usize, &Column)> {
let name = normalize_ident(name);
for (i, column) in self.columns.iter().enumerate() {
if column.name.as_ref().map_or(false, |n| *n == name) {
return Some((i, column));
}
}
None
}
}
#[derive(Clone, Debug)]
pub enum Table {
BTree(Rc<BTreeTable>),
Pseudo(Rc<PseudoTable>),
Virtual(Rc<VirtualTable>),
FromClauseSubquery(FromClauseSubquery),
}
impl Table {
pub fn get_root_page(&self) -> usize {
match self {
Table::BTree(table) => table.root_page,
Table::Pseudo(_) => unimplemented!(),
Table::Virtual(_) => unimplemented!(),
Table::FromClauseSubquery(_) => unimplemented!(),
}
}
pub fn get_name(&self) -> &str {
match self {
Self::BTree(table) => &table.name,
Self::Pseudo(_) => "",
Self::Virtual(table) => &table.name,
Self::FromClauseSubquery(from_clause_subquery) => &from_clause_subquery.name,
}
}
pub fn get_column_at(&self, index: usize) -> Option<&Column> {
match self {
Self::BTree(table) => table.columns.get(index),
Self::Pseudo(table) => table.columns.get(index),
Self::Virtual(table) => table.columns.get(index),
Self::FromClauseSubquery(from_clause_subquery) => {
from_clause_subquery.columns.get(index)
}
}
}
pub fn columns(&self) -> &Vec<Column> {
match self {
Self::BTree(table) => &table.columns,
Self::Pseudo(table) => &table.columns,
Self::Virtual(table) => &table.columns,
Self::FromClauseSubquery(from_clause_subquery) => &from_clause_subquery.columns,
}
}
pub fn btree(&self) -> Option<Rc<BTreeTable>> {
match self {
Self::BTree(table) => Some(table.clone()),
Self::Pseudo(_) => None,
Self::Virtual(_) => None,
Self::FromClauseSubquery(_) => None,
}
}
pub fn virtual_table(&self) -> Option<Rc<VirtualTable>> {
match self {
Self::Virtual(table) => Some(table.clone()),
_ => None,
}
}
}
impl PartialEq for Table {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::BTree(a), Self::BTree(b)) => Rc::ptr_eq(a, b),
(Self::Pseudo(a), Self::Pseudo(b)) => Rc::ptr_eq(a, b),
(Self::Virtual(a), Self::Virtual(b)) => Rc::ptr_eq(a, b),
_ => false,
}
}
}
#[derive(Debug, Eq)]
pub(super) struct UniqueColumnProps {
pub(super) column_name: String,
pub(super) order: SortOrder,
}
impl PartialEq for UniqueColumnProps {
fn eq(&self, other: &Self) -> bool {
self.column_name.eq(&other.column_name)
}
}
impl PartialOrd for UniqueColumnProps {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for UniqueColumnProps {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.column_name.cmp(&other.column_name)
}
}
pub(super) fn ref_act_to_str(act: limbo_sqlite3_parser::ast::RefAct) -> &'static str {
use limbo_sqlite3_parser::ast::RefAct;
match act {
RefAct::SetNull => "SET NULL",
RefAct::SetDefault => "SET DEFAULT",
RefAct::Cascade => "CASCADE",
RefAct::Restrict => "RESTRICT",
RefAct::NoAction => "NO ACTION",
}
}
pub(super) fn build_fk_def(
id: u32,
from_cols: Vec<String>,
clause: &limbo_sqlite3_parser::ast::ForeignKeyClause,
) -> ForeignKeyDef {
use limbo_sqlite3_parser::ast::RefArg;
let to_table = normalize_ident(&clause.tbl_name.0);
let to_cols: Vec<String> = clause
.columns
.as_deref()
.unwrap_or(&[])
.iter()
.map(|c| normalize_ident(&c.col_name.0))
.collect();
let mut on_update = "NO ACTION".to_string();
let mut on_delete = "NO ACTION".to_string();
let mut match_clause = "NONE".to_string();
for arg in &clause.args {
match arg {
RefArg::OnUpdate(act) => on_update = ref_act_to_str(*act).to_string(),
RefArg::OnDelete(act) => on_delete = ref_act_to_str(*act).to_string(),
RefArg::Match(name) => match_clause = name.0.clone(),
RefArg::OnInsert(_) => {}
}
}
ForeignKeyDef {
id,
from_cols,
to_table,
to_cols,
on_update,
on_delete,
match_clause,
}
}
pub(super) fn create_table(
tbl_name: QualifiedName,
body: CreateTableBody,
root_page: usize,
) -> Result<BTreeTable> {
let table_name = normalize_ident(&tbl_name.name.0);
trace!("Creating table {}", table_name);
let mut has_rowid = true;
let mut primary_key_columns = vec![];
let mut cols = vec![];
let is_strict: bool;
let mut unique_sets: Vec<BTreeSet<UniqueColumnProps>> = vec![];
let mut raw_fks: Vec<(Vec<String>, limbo_sqlite3_parser::ast::ForeignKeyClause)> = vec![];
match body {
CreateTableBody::ColumnsAndConstraints {
columns,
constraints,
options,
} => {
is_strict = options.contains(TableOptions::STRICT);
if let Some(constraints) = constraints {
for c in constraints {
match c.constraint {
limbo_sqlite3_parser::ast::TableConstraint::PrimaryKey {
columns, ..
} => {
for column in columns {
let col_name = match column.expr {
Expr::Id(id) => normalize_ident(&id.0),
Expr::Literal(Literal::String(value)) => {
value.trim_matches('\'').to_owned()
}
_ => {
todo!("Unsupported primary key expression");
}
};
primary_key_columns
.push((col_name, column.order.unwrap_or(SortOrder::Asc)));
}
}
limbo_sqlite3_parser::ast::TableConstraint::Unique {
columns,
conflict_clause,
} => {
if conflict_clause.is_some() {
unimplemented!("ON CONFLICT not implemented");
}
let unique_set = columns
.into_iter()
.map(|column| {
let column_name = match column.expr {
Expr::Id(id) => normalize_ident(&id.0),
_ => {
todo!("Unsupported unique expression");
}
};
UniqueColumnProps {
column_name,
order: column.order.unwrap_or(SortOrder::Asc),
}
})
.collect();
unique_sets.push(unique_set);
}
limbo_sqlite3_parser::ast::TableConstraint::ForeignKey {
columns,
clause,
..
} => {
let from_cols: Vec<String> = columns
.iter()
.map(|c| normalize_ident(&c.col_name.0))
.collect();
raw_fks.push((from_cols, clause));
}
limbo_sqlite3_parser::ast::TableConstraint::Check(_) => {}
}
}
}
for (col_name, col_def) in columns {
let name = col_name.0.to_string();
let (ty, ty_str) = match col_def.col_type {
Some(data_type) => {
let s = data_type.name.as_str();
let ty_str = if matches!(
s.to_uppercase().as_str(),
"TEXT" | "INT" | "INTEGER" | "BLOB" | "REAL"
) {
s.to_uppercase().to_string()
} else {
s.to_string()
};
let type_name = ty_str.to_uppercase();
if type_name.contains("INT") {
(Type::Integer, ty_str)
} else if type_name.contains("CHAR")
|| type_name.contains("CLOB")
|| type_name.contains("TEXT")
{
(Type::Text, ty_str)
} else if type_name.contains("BLOB") {
(Type::Blob, ty_str)
} else if type_name.is_empty() {
(Type::Blob, "".to_string())
} else if type_name.contains("REAL")
|| type_name.contains("FLOA")
|| type_name.contains("DOUB")
{
(Type::Real, ty_str)
} else {
(Type::Numeric, ty_str)
}
}
None => (Type::Null, "".to_string()),
};
let mut default = None;
let mut primary_key = false;
let mut notnull = false;
let mut order = SortOrder::Asc;
let mut unique = false;
let mut collation = None;
for c_def in &col_def.constraints {
match &c_def.constraint {
limbo_sqlite3_parser::ast::ColumnConstraint::PrimaryKey {
order: o,
..
} => {
primary_key = true;
if let Some(o) = o {
order = o.clone();
}
}
limbo_sqlite3_parser::ast::ColumnConstraint::NotNull { .. } => {
notnull = true;
}
limbo_sqlite3_parser::ast::ColumnConstraint::Default(expr) => {
default = Some(expr.clone());
}
limbo_sqlite3_parser::ast::ColumnConstraint::Unique(on_conflict) => {
if on_conflict.is_some() {
unimplemented!("ON CONFLICT not implemented");
}
unique = true;
}
limbo_sqlite3_parser::ast::ColumnConstraint::Collate { collation_name } => {
collation = Some(CollationSeq::new(collation_name.0.as_str())?);
}
limbo_sqlite3_parser::ast::ColumnConstraint::ForeignKey {
clause, ..
} => {
raw_fks.push((vec![normalize_ident(&name)], clause.clone()));
}
_ => {}
}
}
if primary_key {
primary_key_columns.push((normalize_ident(&name), order));
}
cols.push(Column {
name: Some(normalize_ident(&name)),
ty,
ty_str,
primary_key,
is_rowid_alias: false,
notnull,
default,
unique,
collation,
});
}
if options.contains(TableOptions::WITHOUT_ROWID) {
has_rowid = false;
}
}
CreateTableBody::AsSelect(_) => todo!(),
};
let single_int_rowid_alias = has_rowid && primary_key_columns.len() == 1;
for col in cols.iter_mut() {
let is_pk = col.name.as_ref().is_some_and(|name| {
primary_key_columns
.iter()
.any(|(pk_name, _)| pk_name == name)
});
if is_pk {
col.primary_key = true;
}
col.is_rowid_alias =
single_int_rowid_alias && is_pk && col.ty == Type::Integer && col.ty_str == "INTEGER";
}
let foreign_keys: Vec<ForeignKeyDef> = raw_fks
.into_iter()
.enumerate()
.map(|(idx, (from_cols, clause))| build_fk_def(idx as u32, from_cols, &clause))
.collect();
Ok(BTreeTable {
root_page,
name: table_name,
has_rowid,
primary_key_columns,
columns: cols,
is_strict,
unique_sets: if unique_sets.is_empty() {
None
} else {
unique_sets.dedup();
Some(
unique_sets
.into_iter()
.map(|set| {
set.into_iter()
.map(|UniqueColumnProps { column_name, order }| (column_name, order))
.collect()
})
.collect(),
)
},
foreign_keys,
})
}
pub fn _build_pseudo_table(columns: &[ResultColumn]) -> PseudoTable {
let table = PseudoTable::new();
for column in columns {
match column {
ResultColumn::Expr(expr, _as_name) => {
todo!("unsupported expression {:?}", expr);
}
ResultColumn::Star => {
todo!();
}
ResultColumn::TableStar(_) => {
todo!();
}
}
}
table
}