use crate::{util::normalize_ident, Result};
use crate::{LimboError, VirtualTable};
use core::fmt;
use fallible_iterator::FallibleIterator;
use limbo_sqlite3_parser::ast::{Expr, Literal, SortOrder, TableOptions};
use limbo_sqlite3_parser::{
ast::{Cmd, CreateTableBody, QualifiedName, ResultColumn, Stmt},
lexer::sql::Parser,
};
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;
use tracing::trace;
pub struct Schema {
pub tables: HashMap<String, Arc<Table>>,
pub indexes: HashMap<String, Vec<Arc<Index>>>,
}
impl Schema {
pub fn new() -> Self {
let mut tables: HashMap<String, Arc<Table>> = HashMap::new();
let indexes: HashMap<String, Vec<Arc<Index>>> = HashMap::new();
#[allow(clippy::arc_with_non_send_sync)]
tables.insert(
"sqlite_schema".to_string(),
Arc::new(Table::BTree(sqlite_schema_table().into())),
);
Self { tables, indexes }
}
pub fn is_unique_idx_name(&self, name: &str) -> bool {
!self
.indexes
.iter()
.any(|idx| idx.1.iter().any(|i| i.name == name))
}
pub fn add_btree_table(&mut self, table: Rc<BTreeTable>) {
let name = normalize_ident(&table.name);
self.tables.insert(name, Table::BTree(table).into());
}
pub fn add_virtual_table(&mut self, table: Rc<VirtualTable>) {
let name = normalize_ident(&table.name);
self.tables.insert(name, Table::Virtual(table).into());
}
pub fn get_table(&self, name: &str) -> Option<Arc<Table>> {
let name = normalize_ident(name);
self.tables.get(&name).cloned()
}
pub fn remove_table(&mut self, table_name: &str) {
let name = normalize_ident(table_name);
self.tables.remove(&name);
}
pub fn get_btree_table(&self, name: &str) -> Option<Rc<BTreeTable>> {
let name = normalize_ident(name);
if let Some(table) = self.tables.get(&name) {
table.btree()
} else {
None
}
}
pub fn add_index(&mut self, index: Arc<Index>) {
let table_name = normalize_ident(&index.table_name);
self.indexes
.entry(table_name)
.or_default()
.push(index.clone())
}
pub fn get_indices(&self, table_name: &str) -> &[Arc<Index>] {
let name = normalize_ident(table_name);
self.indexes
.get(&name)
.map_or_else(|| &[] as &[Arc<Index>], |v| v.as_slice())
}
pub fn remove_indices_for_table(&mut self, table_name: &str) {
let name = normalize_ident(table_name);
self.indexes.remove(&name);
}
}
#[derive(Clone, Debug)]
pub enum Table {
BTree(Rc<BTreeTable>),
Pseudo(Rc<PseudoTable>),
Virtual(Rc<VirtualTable>),
}
impl Table {
pub fn get_root_page(&self) -> usize {
match self {
Table::BTree(table) => table.root_page,
Table::Pseudo(_) => unimplemented!(),
Table::Virtual(_) => unimplemented!(),
}
}
pub fn get_name(&self) -> &str {
match self {
Self::BTree(table) => &table.name,
Self::Pseudo(_) => "",
Self::Virtual(table) => &table.name,
}
}
pub fn get_column_at(&self, index: usize) -> Option<&Column> {
match self {
Self::BTree(table) => table.columns.get(index),
Self::Pseudo(table) => table.columns.get(index),
Self::Virtual(table) => table.columns.get(index),
}
}
pub fn columns(&self) -> &Vec<Column> {
match self {
Self::BTree(table) => &table.columns,
Self::Pseudo(table) => &table.columns,
Self::Virtual(table) => &table.columns,
}
}
pub fn btree(&self) -> Option<Rc<BTreeTable>> {
match self {
Self::BTree(table) => Some(table.clone()),
Self::Pseudo(_) => None,
Self::Virtual(_) => None,
}
}
pub fn virtual_table(&self) -> Option<Rc<VirtualTable>> {
match self {
Self::Virtual(table) => Some(table.clone()),
_ => None,
}
}
}
impl PartialEq for Table {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::BTree(a), Self::BTree(b)) => Rc::ptr_eq(a, b),
(Self::Pseudo(a), Self::Pseudo(b)) => Rc::ptr_eq(a, b),
(Self::Virtual(a), Self::Virtual(b)) => Rc::ptr_eq(a, b),
_ => false,
}
}
}
#[derive(Debug)]
pub struct BTreeTable {
pub root_page: usize,
pub name: String,
pub primary_key_column_names: Vec<String>,
pub columns: Vec<Column>,
pub has_rowid: bool,
pub is_strict: bool,
}
impl BTreeTable {
pub fn get_rowid_alias_column(&self) -> Option<(usize, &Column)> {
if self.primary_key_column_names.len() == 1 {
let (idx, col) = self.get_column(&self.primary_key_column_names[0]).unwrap();
if self.column_is_rowid_alias(col) {
return Some((idx, col));
}
}
None
}
pub fn column_is_rowid_alias(&self, col: &Column) -> bool {
col.is_rowid_alias
}
pub fn get_column(&self, name: &str) -> Option<(usize, &Column)> {
let name = normalize_ident(name);
for (i, column) in self.columns.iter().enumerate() {
if column.name.as_ref().map_or(false, |n| *n == name) {
return Some((i, column));
}
}
None
}
pub fn from_sql(sql: &str, root_page: usize) -> Result<BTreeTable> {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next()?;
match cmd {
Some(Cmd::Stmt(Stmt::CreateTable { tbl_name, body, .. })) => {
create_table(tbl_name, *body, root_page)
}
_ => todo!("Expected CREATE TABLE statement"),
}
}
#[cfg(test)]
pub fn to_sql(&self) -> String {
let mut sql = format!("CREATE TABLE {} (\n", self.name);
for (i, column) in self.columns.iter().enumerate() {
if i > 0 {
sql.push_str(",\n");
}
sql.push_str(" ");
sql.push_str(column.name.as_ref().expect("column name is None"));
sql.push(' ');
sql.push_str(&column.ty.to_string());
}
sql.push_str(");\n");
sql
}
}
#[derive(Debug, Default)]
pub struct PseudoTable {
pub columns: Vec<Column>,
}
impl PseudoTable {
pub fn new() -> Self {
Self { columns: vec![] }
}
pub fn new_with_columns(columns: Vec<Column>) -> Self {
Self { columns }
}
pub fn add_column(&mut self, name: &str, ty: Type, primary_key: bool) {
self.columns.push(Column {
name: Some(normalize_ident(name)),
ty,
ty_str: ty.to_string().to_uppercase(),
primary_key,
is_rowid_alias: false,
notnull: false,
default: None,
});
}
pub fn get_column(&self, name: &str) -> Option<(usize, &Column)> {
let name = normalize_ident(name);
for (i, column) in self.columns.iter().enumerate() {
if column.name.as_ref().map_or(false, |n| *n == name) {
return Some((i, column));
}
}
None
}
}
fn create_table(
tbl_name: QualifiedName,
body: CreateTableBody,
root_page: usize,
) -> Result<BTreeTable> {
let table_name = normalize_ident(&tbl_name.name.0);
trace!("Creating table {}", table_name);
let mut has_rowid = true;
let mut primary_key_column_names = vec![];
let mut cols = vec![];
let is_strict: bool;
match body {
CreateTableBody::ColumnsAndConstraints {
columns,
constraints,
options,
} => {
is_strict = options.contains(TableOptions::STRICT);
if let Some(constraints) = constraints {
for c in constraints {
if let limbo_sqlite3_parser::ast::TableConstraint::PrimaryKey {
columns, ..
} = c.constraint
{
for column in columns {
primary_key_column_names.push(match column.expr {
Expr::Id(id) => normalize_ident(&id.0),
Expr::Literal(Literal::String(value)) => {
value.trim_matches('\'').to_owned()
}
_ => {
todo!("Unsupported primary key expression");
}
});
}
}
}
}
for (col_name, col_def) in columns {
let name = col_name.0.to_string();
let mut typename_exactly_integer = false;
let (ty, ty_str) = match col_def.col_type {
Some(data_type) => {
let s = data_type.name.as_str();
let ty_str = if matches!(
s.to_uppercase().as_str(),
"TEXT" | "INT" | "INTEGER" | "BLOB" | "REAL"
) {
s.to_uppercase().to_string()
} else {
s.to_string()
};
let type_name = ty_str.to_uppercase();
if type_name.contains("INT") {
typename_exactly_integer = type_name == "INTEGER";
(Type::Integer, ty_str)
} else if type_name.contains("CHAR")
|| type_name.contains("CLOB")
|| type_name.contains("TEXT")
{
(Type::Text, ty_str)
} else if type_name.contains("BLOB") {
(Type::Blob, ty_str)
} else if type_name.is_empty() {
(Type::Blob, "".to_string())
} else if type_name.contains("REAL")
|| type_name.contains("FLOA")
|| type_name.contains("DOUB")
{
(Type::Real, ty_str)
} else {
(Type::Numeric, ty_str)
}
}
None => (Type::Null, "".to_string()),
};
let mut default = None;
let mut primary_key = false;
let mut notnull = false;
for c_def in &col_def.constraints {
match &c_def.constraint {
limbo_sqlite3_parser::ast::ColumnConstraint::PrimaryKey { .. } => {
primary_key = true;
}
limbo_sqlite3_parser::ast::ColumnConstraint::NotNull { .. } => {
notnull = true;
}
limbo_sqlite3_parser::ast::ColumnConstraint::Default(expr) => {
default = Some(expr.clone())
}
_ => {}
}
}
if primary_key {
primary_key_column_names.push(name.clone());
} else if primary_key_column_names.contains(&name) {
primary_key = true;
}
cols.push(Column {
name: Some(normalize_ident(&name)),
ty,
ty_str,
primary_key,
is_rowid_alias: typename_exactly_integer && primary_key,
notnull,
default,
});
}
if options.contains(TableOptions::WITHOUT_ROWID) {
has_rowid = false;
}
}
CreateTableBody::AsSelect(_) => todo!(),
};
if !has_rowid || primary_key_column_names.len() > 1 {
for col in cols.iter_mut() {
col.is_rowid_alias = false;
}
}
Ok(BTreeTable {
root_page,
name: table_name,
has_rowid,
primary_key_column_names,
columns: cols,
is_strict,
})
}
pub fn _build_pseudo_table(columns: &[ResultColumn]) -> PseudoTable {
let table = PseudoTable::new();
for column in columns {
match column {
ResultColumn::Expr(expr, _as_name) => {
todo!("unsupported expression {:?}", expr);
}
ResultColumn::Star => {
todo!();
}
ResultColumn::TableStar(_) => {
todo!();
}
}
}
table
}
#[derive(Debug, Clone)]
pub struct Column {
pub name: Option<String>,
pub ty: Type,
pub ty_str: String,
pub primary_key: bool,
pub is_rowid_alias: bool,
pub notnull: bool,
pub default: Option<Expr>,
}
impl Column {
pub fn affinity(&self) -> Affinity {
affinity(&self.ty_str)
}
}
pub fn affinity(datatype: &str) -> Affinity {
if datatype.contains("INT") {
return Affinity::Integer;
}
if datatype.contains("CHAR") || datatype.contains("CLOB") || datatype.contains("TEXT") {
return Affinity::Text;
}
if datatype.contains("BLOB") || datatype.is_empty() || datatype.contains("ANY") {
return Affinity::Blob;
}
if datatype.contains("REAL") || datatype.contains("FLOA") || datatype.contains("DOUB") {
return Affinity::Real;
}
Affinity::Numeric
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Type {
Null,
Text,
Numeric,
Integer,
Real,
Blob,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Affinity {
Integer,
Text,
Blob,
Real,
Numeric,
}
pub const SQLITE_AFF_NONE: char = 'A'; pub const SQLITE_AFF_TEXT: char = 'B';
pub const SQLITE_AFF_NUMERIC: char = 'C';
pub const SQLITE_AFF_INTEGER: char = 'D';
pub const SQLITE_AFF_REAL: char = 'E';
impl Affinity {
pub fn aff_mask(&self) -> char {
match self {
Affinity::Integer => SQLITE_AFF_INTEGER,
Affinity::Text => SQLITE_AFF_TEXT,
Affinity::Blob => SQLITE_AFF_NONE,
Affinity::Real => SQLITE_AFF_REAL,
Affinity::Numeric => SQLITE_AFF_NUMERIC,
}
}
pub fn from_char(char: char) -> Result<Self> {
match char {
SQLITE_AFF_INTEGER => Ok(Affinity::Integer),
SQLITE_AFF_TEXT => Ok(Affinity::Text),
SQLITE_AFF_NONE => Ok(Affinity::Blob),
SQLITE_AFF_REAL => Ok(Affinity::Real),
SQLITE_AFF_NUMERIC => Ok(Affinity::Numeric),
_ => Err(LimboError::InternalError(format!(
"Invalid affinity character: {}",
char
))),
}
}
}
impl fmt::Display for Type {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
Self::Null => "",
Self::Text => "TEXT",
Self::Numeric => "NUMERIC",
Self::Integer => "INTEGER",
Self::Real => "REAL",
Self::Blob => "BLOB",
};
write!(f, "{}", s)
}
}
pub fn sqlite_schema_table() -> BTreeTable {
BTreeTable {
root_page: 1,
name: "sqlite_schema".to_string(),
has_rowid: true,
is_strict: false,
primary_key_column_names: vec![],
columns: vec![
Column {
name: Some("type".to_string()),
ty: Type::Text,
ty_str: "TEXT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
},
Column {
name: Some("name".to_string()),
ty: Type::Text,
ty_str: "TEXT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
},
Column {
name: Some("tbl_name".to_string()),
ty: Type::Text,
ty_str: "TEXT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
},
Column {
name: Some("rootpage".to_string()),
ty: Type::Integer,
ty_str: "INT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
},
Column {
name: Some("sql".to_string()),
ty: Type::Text,
ty_str: "TEXT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
},
],
}
}
#[allow(dead_code)]
#[derive(Debug)]
pub struct Index {
pub name: String,
pub table_name: String,
pub root_page: usize,
pub columns: Vec<IndexColumn>,
pub unique: bool,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct IndexColumn {
pub name: String,
pub order: SortOrder,
pub pos_in_table: usize,
}
impl Index {
pub fn from_sql(sql: &str, root_page: usize, table: &BTreeTable) -> Result<Index> {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next()?;
match cmd {
Some(Cmd::Stmt(Stmt::CreateIndex {
idx_name,
tbl_name,
columns,
unique,
..
})) => {
let index_name = normalize_ident(&idx_name.name.0);
let mut index_columns = Vec::with_capacity(columns.len());
for col in columns.into_iter() {
let name = normalize_ident(&col.expr.to_string());
let Some((pos_in_table, _)) = table.get_column(&name) else {
return Err(crate::LimboError::InternalError(format!(
"Column {} is in index {} but not found in table {}",
name, index_name, table.name
)));
};
index_columns.push(IndexColumn {
name,
order: col.order.unwrap_or(SortOrder::Asc),
pos_in_table,
});
}
Ok(Index {
name: index_name,
table_name: normalize_ident(&tbl_name.0),
root_page,
columns: index_columns,
unique,
})
}
_ => todo!("Expected create index statement"),
}
}
pub fn automatic_from_primary_key(
table: &BTreeTable,
index_name: &str,
root_page: usize,
) -> Result<Index> {
if table.primary_key_column_names.is_empty() {
return Err(crate::LimboError::InternalError(
"Cannot create automatic index for table without primary key".to_string(),
));
}
let index_columns = table
.primary_key_column_names
.iter()
.map(|col_name| {
let Some((pos_in_table, _)) = table.get_column(col_name) else {
return Err(crate::LimboError::InternalError(format!(
"Column {} is in index {} but not found in table {}",
col_name, index_name, table.name
)));
};
Ok(IndexColumn {
name: normalize_ident(col_name),
order: SortOrder::Asc, pos_in_table,
})
})
.collect::<Result<Vec<_>>>()?;
Ok(Index {
name: normalize_ident(index_name),
table_name: table.name.clone(),
root_page,
columns: index_columns,
unique: true, })
}
pub fn column_table_pos_to_index_pos(&self, table_pos: usize) -> Option<usize> {
self.columns
.iter()
.position(|c| c.pos_in_table == table_pos)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::LimboError;
#[test]
pub fn test_has_rowid_true() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
assert!(table.has_rowid, "has_rowid should be set to true");
Ok(())
}
#[test]
pub fn test_has_rowid_false() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT) WITHOUT ROWID;"#;
let table = BTreeTable::from_sql(sql, 0)?;
assert!(!table.has_rowid, "has_rowid should be set to false");
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_text() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a TEXT PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!table.column_is_rowid_alias(column),
"column 'a´ has type different than INTEGER so can't be a rowid alias"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_integer() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
table.column_is_rowid_alias(column),
"column 'a´ should be a rowid alias"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_integer_separate_primary_key_definition() -> Result<()>
{
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
table.column_is_rowid_alias(column),
"column 'a´ should be a rowid alias"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_integer_separate_primary_key_definition_without_rowid(
) -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a)) WITHOUT ROWID;"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!table.column_is_rowid_alias(column),
"column 'a´ shouldn't be a rowid alias because table has no rowid"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_integer_without_rowid() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT) WITHOUT ROWID;"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!table.column_is_rowid_alias(column),
"column 'a´ shouldn't be a rowid alias because table has no rowid"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_inline_composite_primary_key() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT PRIMARY KEY);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!table.column_is_rowid_alias(column),
"column 'a´ shouldn't be a rowid alias because table has composite primary key"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_separate_composite_primary_key_definition() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!table.column_is_rowid_alias(column),
"column 'a´ shouldn't be a rowid alias because table has composite primary key"
);
Ok(())
}
#[test]
pub fn test_primary_key_inline_single() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT, c REAL);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key, "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(!column.primary_key, "column 'b' shouldn't be a primary key");
let column = table.get_column("c").unwrap().1;
assert!(!column.primary_key, "column 'c' shouldn't be a primary key");
assert_eq!(
vec!["a"],
table.primary_key_column_names,
"primary key column names should be ['a']"
);
Ok(())
}
#[test]
pub fn test_primary_key_inline_multiple() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT PRIMARY KEY, c REAL);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key, "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(column.primary_key, "column 'b' shouldn be a primary key");
let column = table.get_column("c").unwrap().1;
assert!(!column.primary_key, "column 'c' shouldn't be a primary key");
assert_eq!(
vec!["a", "b"],
table.primary_key_column_names,
"primary key column names should be ['a', 'b']"
);
Ok(())
}
#[test]
pub fn test_primary_key_separate_single() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY(a));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key, "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(!column.primary_key, "column 'b' shouldn't be a primary key");
let column = table.get_column("c").unwrap().1;
assert!(!column.primary_key, "column 'c' shouldn't be a primary key");
assert_eq!(
vec!["a"],
table.primary_key_column_names,
"primary key column names should be ['a']"
);
Ok(())
}
#[test]
pub fn test_primary_key_separate_multiple() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key, "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(column.primary_key, "column 'b' shouldn be a primary key");
let column = table.get_column("c").unwrap().1;
assert!(!column.primary_key, "column 'c' shouldn't be a primary key");
assert_eq!(
vec!["a", "b"],
table.primary_key_column_names,
"primary key column names should be ['a', 'b']"
);
Ok(())
}
#[test]
pub fn test_primary_key_separate_single_quoted() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY('a'));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key, "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(!column.primary_key, "column 'b' shouldn't be a primary key");
let column = table.get_column("c").unwrap().1;
assert!(!column.primary_key, "column 'c' shouldn't be a primary key");
assert_eq!(
vec!["a"],
table.primary_key_column_names,
"primary key column names should be ['a']"
);
Ok(())
}
#[test]
pub fn test_primary_key_separate_single_doubly_quoted() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY("a"));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key, "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(!column.primary_key, "column 'b' shouldn't be a primary key");
let column = table.get_column("c").unwrap().1;
assert!(!column.primary_key, "column 'c' shouldn't be a primary key");
assert_eq!(
vec!["a"],
table.primary_key_column_names,
"primary key column names should be ['a']"
);
Ok(())
}
#[test]
pub fn test_default_value() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER DEFAULT 23);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
let default = column.default.clone().unwrap();
assert_eq!(default.to_string(), "23");
Ok(())
}
#[test]
pub fn test_col_notnull() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER NOT NULL);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert_eq!(column.notnull, true);
Ok(())
}
#[test]
pub fn test_col_notnull_negative() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert_eq!(column.notnull, false);
Ok(())
}
#[test]
pub fn test_col_type_string_integer() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a InTeGeR);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert_eq!(column.ty_str, "INTEGER");
Ok(())
}
#[test]
pub fn test_col_type_string_int() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a InT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert_eq!(column.ty_str, "INT");
Ok(())
}
#[test]
pub fn test_col_type_string_blob() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a bLoB);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert_eq!(column.ty_str, "BLOB");
Ok(())
}
#[test]
pub fn test_col_type_string_empty() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert_eq!(column.ty_str, "");
Ok(())
}
#[test]
pub fn test_col_type_string_some_nonsense() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a someNonsenseName);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert_eq!(column.ty_str, "someNonsenseName");
Ok(())
}
#[test]
pub fn test_sqlite_schema() {
let expected = r#"CREATE TABLE sqlite_schema (
type TEXT,
name TEXT,
tbl_name TEXT,
rootpage INTEGER,
sql TEXT);
"#;
let actual = sqlite_schema_table().to_sql();
assert_eq!(expected, actual);
}
#[test]
fn test_automatic_index_single_column() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index = Index::automatic_from_primary_key(&table, "sqlite_autoindex_t1_1", 2)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "a");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_automatic_index_composite_key() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index = Index::automatic_from_primary_key(&table, "sqlite_autoindex_t1_1", 2)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 2);
assert_eq!(index.columns[0].name, "a");
assert_eq!(index.columns[1].name, "b");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
assert!(matches!(index.columns[1].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_automatic_index_no_primary_key() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let result = Index::automatic_from_primary_key(&table, "sqlite_autoindex_t1_1", 2);
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
LimboError::InternalError(msg) if msg.contains("without primary key")
));
Ok(())
}
#[test]
fn test_automatic_index_nonexistent_column() -> Result<()> {
let table = BTreeTable {
root_page: 0,
name: "t1".to_string(),
has_rowid: true,
is_strict: false,
primary_key_column_names: vec!["nonexistent".to_string()],
columns: vec![Column {
name: Some("a".to_string()),
ty: Type::Integer,
ty_str: "INT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
}],
};
let result = Index::automatic_from_primary_key(&table, "sqlite_autoindex_t1_1", 2);
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
LimboError::InternalError(msg) if msg.contains("not found in table")
));
Ok(())
}
}