use crate::db::cursor::{IndexCursor, TableCursor};
use crate::db::datastore::locking_tx_datastore::IterByColRange;
use crate::db::relational_db::{MutTx, RelationalDB, Tx};
use crate::execution_context::ExecutionContext;
use core::ops::RangeBounds;
use itertools::Itertools;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::relation::RowCount;
use spacetimedb_primitives::*;
use spacetimedb_sats::db::def::TableDef;
use spacetimedb_sats::relation::{DbTable, FieldExpr, FieldExprRef, Header, Relation};
use spacetimedb_sats::{AlgebraicValue, ProductValue};
use spacetimedb_vm::errors::ErrorVm;
use spacetimedb_vm::eval::IterRows;
use spacetimedb_vm::expr::*;
use spacetimedb_vm::iterators::RelIter;
use spacetimedb_vm::program::{ProgramVm, Sources};
use spacetimedb_vm::rel_ops::{EmptyRelOps, RelOps};
use spacetimedb_vm::relation::{MemTable, RelValue, Table};
use std::ops::Bound;
use std::sync::Arc;
pub enum TxMode<'a> {
MutTx(&'a mut MutTx),
Tx(&'a Tx),
}
impl<'a> From<&'a mut MutTx> for TxMode<'a> {
fn from(tx: &'a mut MutTx) -> Self {
TxMode::MutTx(tx)
}
}
impl<'a> From<&'a Tx> for TxMode<'a> {
fn from(tx: &'a Tx) -> Self {
TxMode::Tx(tx)
}
}
fn bound_is_satisfiable(lower: &Bound<AlgebraicValue>, upper: &Bound<AlgebraicValue>) -> bool {
match (lower, upper) {
(Bound::Excluded(lower), Bound::Excluded(upper)) if lower >= upper => false,
(Bound::Included(lower), Bound::Excluded(upper)) | (Bound::Excluded(lower), Bound::Included(upper))
if lower > upper =>
{
false
}
_ => true,
}
}
pub fn build_query<'a>(
ctx: &'a ExecutionContext,
stdb: &'a RelationalDB,
tx: &'a TxMode<'a>,
query: &'a QueryExpr,
sources: &mut impl SourceProvider<'a>,
) -> Result<Box<IterRows<'a>>, ErrorVm> {
let db_table = query.source.is_db_table();
let mut result = None;
for op in &query.query {
result = Some(match op {
Query::IndexScan(IndexScan { table, columns, bounds }) if db_table => {
if !bound_is_satisfiable(&bounds.0, &bounds.1) {
Box::new(EmptyRelOps::new(table.head.clone())) as Box<IterRows<'a>>
} else {
let bounds = (bounds.start_bound(), bounds.end_bound());
iter_by_col_range(ctx, stdb, tx, table, columns.clone(), bounds)?
}
}
Query::IndexScan(index_scan) => {
let result = result
.take()
.map(Ok)
.unwrap_or_else(|| get_table(ctx, stdb, tx, &query.source, sources))?;
let cols = &index_scan.columns;
let bounds = &index_scan.bounds;
if !bound_is_satisfiable(&bounds.0, &bounds.1) {
Box::new(EmptyRelOps::new(index_scan.table.head.clone())) as Box<IterRows<'a>>
} else if cols.is_singleton() {
let head = cols.head().idx();
let iter = result.select(move |row| Ok(bounds.contains(&*row.read_column(head).unwrap())));
Box::new(iter) as Box<IterRows<'a>>
} else {
let start_bound = bounds.0.as_ref().map(|av| &av.as_product().unwrap().elements);
let end_bound = bounds.1.as_ref().map(|av| &av.as_product().unwrap().elements);
let iter = result.select(move |row| {
Ok(cols.iter().enumerate().all(|(idx, col)| {
let start_bound = start_bound.map(|pv| &pv[idx]);
let end_bound = end_bound.map(|pv| &pv[idx]);
let read_col = row.read_column(col.idx()).unwrap();
(start_bound, end_bound).contains(&*read_col)
}))
});
Box::new(iter)
}
}
Query::IndexJoin(IndexJoin {
probe_side,
probe_field,
index_side,
index_select,
index_col,
return_index_rows,
}) => {
if result.is_some() {
return Err(anyhow::anyhow!("Invalid query: `IndexJoin` must be the first operator").into());
}
let index_table = index_side.table_id().unwrap();
let index_header = index_side.head();
let probe_side = build_query(ctx, stdb, tx, probe_side, sources)?;
let probe_col = probe_side
.head()
.column_pos(*probe_field)
.expect("query compiler should have ensured the column exist");
Box::new(IndexSemiJoin {
ctx,
db: stdb,
tx,
probe_side,
probe_col,
index_header,
index_select,
index_table,
index_col: *index_col,
index_iter: None,
return_index_rows: *return_index_rows,
})
}
Query::Select(cmp) => {
let result = result
.take()
.map(Ok)
.unwrap_or_else(|| get_table(ctx, stdb, tx, &query.source, sources))?;
let header = result.head().clone();
let iter = result.select(move |row| cmp.compare(row, &header));
Box::new(iter)
}
Query::Project(cols, _) => {
let result = result
.take()
.map(Ok)
.unwrap_or_else(|| get_table(ctx, stdb, tx, &query.source, sources))?;
if cols.is_empty() {
result
} else {
let header = result.head().clone();
let iter = result.project(cols, move |cols, row| {
Ok(RelValue::Projection(row.project_owned(cols, &header)?))
})?;
Box::new(iter)
}
}
Query::JoinInner(join) => {
let result = result
.take()
.map(Ok)
.unwrap_or_else(|| get_table(ctx, stdb, tx, &query.source, sources))?;
let iter = join_inner(ctx, stdb, tx, result, join, sources)?;
Box::new(iter)
}
})
}
result
.map(Ok)
.unwrap_or_else(|| get_table(ctx, stdb, tx, &query.source, sources))
}
fn join_inner<'a>(
ctx: &'a ExecutionContext,
db: &'a RelationalDB,
tx: &'a TxMode<'a>,
lhs: impl RelOps<'a> + 'a,
rhs: &'a JoinExpr,
sources: &mut impl SourceProvider<'a>,
) -> Result<impl RelOps<'a> + 'a, ErrorVm> {
let semi = rhs.semi;
let col_lhs = FieldExprRef::Name(rhs.col_lhs);
let col_rhs = FieldExprRef::Name(rhs.col_rhs);
let key_lhs = [col_lhs];
let key_rhs = [col_rhs];
let rhs = build_query(ctx, db, tx, &rhs.rhs, sources)?;
let key_lhs_header = lhs.head().clone();
let key_rhs_header = rhs.head().clone();
let col_lhs_header = lhs.head().clone();
let col_rhs_header = rhs.head().clone();
let header = if semi {
col_lhs_header.clone()
} else {
Arc::new(col_lhs_header.extend(&col_rhs_header))
};
lhs.join_inner(
rhs,
header,
move |row| Ok(row.project(&key_lhs, &key_lhs_header)?),
move |row| Ok(row.project(&key_rhs, &key_rhs_header)?),
move |l, r| {
let l = l.get(col_lhs, &col_lhs_header)?;
let r = r.get(col_rhs, &col_rhs_header)?;
Ok(l == r)
},
move |l, r| {
if semi {
l
} else {
l.extend(r)
}
},
)
}
fn get_table<'a>(
ctx: &'a ExecutionContext,
stdb: &'a RelationalDB,
tx: &'a TxMode,
query: &SourceExpr,
sources: &mut impl SourceProvider<'a>,
) -> Result<Box<IterRows<'a>>, ErrorVm> {
Ok(match query {
SourceExpr::InMemory {
source_id,
header,
row_count,
..
} => in_mem_to_rel_ops(sources, *source_id, header.clone(), *row_count),
SourceExpr::DbTable(x) => {
let iter = match tx {
TxMode::MutTx(tx) => stdb.iter_mut(ctx, tx, x.table_id)?,
TxMode::Tx(tx) => stdb.iter(ctx, tx, x.table_id)?,
};
Box::new(TableCursor::new(x.clone(), iter)?) as Box<IterRows<'_>>
}
})
}
fn in_mem_to_rel_ops<'a>(
sources: &mut impl SourceProvider<'a>,
source_id: SourceId,
head: Arc<Header>,
rc: RowCount,
) -> Box<IterRows<'a>> {
let source = sources.take_source(source_id).unwrap_or_else(|| {
panic!("Query plan specifies in-mem table for {source_id:?}, but found a `DbTable` or nothing")
});
Box::new(RelIter::new(head, rc, source)) as Box<IterRows<'a>>
}
fn iter_by_col_range<'a>(
ctx: &'a ExecutionContext,
db: &'a RelationalDB,
tx: &'a TxMode,
table: &'a DbTable,
columns: ColList,
range: impl RangeBounds<AlgebraicValue> + 'a,
) -> Result<Box<dyn RelOps<'a> + 'a>, ErrorVm> {
let iter = match tx {
TxMode::MutTx(tx) => db.iter_by_col_range_mut(ctx, tx, table.table_id, columns, range)?,
TxMode::Tx(tx) => db.iter_by_col_range(ctx, tx, table.table_id, columns, range)?,
};
Ok(Box::new(IndexCursor::new(table, iter)?) as Box<IterRows<'_>>)
}
pub struct IndexSemiJoin<'a, 'c, Rhs: RelOps<'a>> {
pub probe_side: Rhs,
pub probe_col: ColId,
pub index_header: &'c Arc<Header>,
pub index_select: &'c Option<ColumnOp>,
pub index_table: TableId,
pub index_col: ColId,
pub return_index_rows: bool,
pub index_iter: Option<IterByColRange<'a, AlgebraicValue>>,
pub db: &'a RelationalDB,
pub tx: &'a TxMode<'a>,
ctx: &'a ExecutionContext,
}
impl<'a, Rhs: RelOps<'a>> IndexSemiJoin<'a, '_, Rhs> {
fn filter(&self, index_row: &RelValue<'_>) -> Result<bool, ErrorVm> {
Ok(if let Some(op) = &self.index_select {
op.compare(index_row, self.index_header)?
} else {
true
})
}
fn map(&self, index_row: RelValue<'a>, probe_row: Option<RelValue<'a>>) -> RelValue<'a> {
if let Some(value) = probe_row {
if !self.return_index_rows {
return value;
}
}
index_row
}
}
impl<'a, Rhs: RelOps<'a>> RelOps<'a> for IndexSemiJoin<'a, '_, Rhs> {
fn head(&self) -> &Arc<Header> {
if self.return_index_rows {
self.index_header
} else {
self.probe_side.head()
}
}
fn next(&mut self) -> Result<Option<RelValue<'a>>, ErrorVm> {
if self.return_index_rows {
while let Some(value) = self.index_iter.as_mut().and_then(|iter| iter.next()) {
let value = RelValue::Row(value);
if self.filter(&value)? {
return Ok(Some(self.map(value, None)));
}
}
}
let table_id = self.index_table;
let col_id = self.index_col;
while let Some(mut row) = self.probe_side.next()? {
if let Some(value) = row.read_or_take_column(self.probe_col.idx()) {
let mut index_iter = match self.tx {
TxMode::MutTx(tx) => self.db.iter_by_col_range_mut(self.ctx, tx, table_id, col_id, value)?,
TxMode::Tx(tx) => self.db.iter_by_col_range(self.ctx, tx, table_id, col_id, value)?,
};
while let Some(value) = index_iter.next() {
let value = RelValue::Row(value);
if self.filter(&value)? {
self.index_iter = Some(index_iter);
return Ok(Some(self.map(value, Some(row))));
}
}
}
}
Ok(None)
}
}
pub struct DbProgram<'db, 'tx> {
ctx: &'tx ExecutionContext,
pub(crate) db: &'db RelationalDB,
pub(crate) tx: &'tx mut TxMode<'tx>,
pub(crate) auth: AuthCtx,
}
impl<'db, 'tx> DbProgram<'db, 'tx> {
pub fn new(ctx: &'tx ExecutionContext, db: &'db RelationalDB, tx: &'tx mut TxMode<'tx>, auth: AuthCtx) -> Self {
Self { ctx, db, tx, auth }
}
fn _eval_query<const N: usize>(&mut self, query: &QueryExpr, sources: Sources<'_, N>) -> Result<Code, ErrorVm> {
let table_access = query.source.table_access();
tracing::trace!(table = query.source.table_name());
let result = build_query(self.ctx, self.db, self.tx, query, &mut |id| {
sources.take(id).map(|mt| mt.into_iter().map(RelValue::Projection))
})?;
let head = result.head().clone();
let rows = result.collect_vec(|row| row.into_product_value())?;
Ok(Code::Table(MemTable::new(head, table_access, rows)))
}
fn _execute_insert(&mut self, table: &Table, rows: Vec<ProductValue>) -> Result<Code, ErrorVm> {
match self.tx {
TxMode::MutTx(tx) => match table {
Table::MemTable(_) => Err(ErrorVm::Other(anyhow::anyhow!("How deal with mutating values?"))),
Table::DbTable(x) => {
for row in rows {
self.db.insert(tx, x.table_id, row)?;
}
Ok(Code::Pass)
}
},
TxMode::Tx(_) => unreachable!("mutable operation is invalid with read tx"),
}
}
fn _execute_delete(&mut self, table: &Table, rows: Vec<ProductValue>) -> Result<Code, ErrorVm> {
match self.tx {
TxMode::MutTx(tx) => match table {
Table::MemTable(_) => Err(ErrorVm::Other(anyhow::anyhow!("How deal with mutating values?"))),
Table::DbTable(t) => {
let count = self.db.delete_by_rel(tx, t.table_id, rows);
Ok(Code::Value(count.into()))
}
},
TxMode::Tx(_) => unreachable!("mutable operation is invalid with read tx"),
}
}
fn _delete_query<const N: usize>(&mut self, query: &QueryExpr, sources: Sources<'_, N>) -> Result<Code, ErrorVm> {
let table = sources
.take_table(&query.source)
.expect("Cannot delete from a `MemTable`");
let result = self._eval_query(query, sources)?;
match result {
Code::Table(result) => self._execute_delete(&table, result.data),
_ => Ok(result),
}
}
fn _create_table(&mut self, table: TableDef) -> Result<Code, ErrorVm> {
match self.tx {
TxMode::MutTx(tx) => {
self.db.create_table(tx, table)?;
Ok(Code::Pass)
}
TxMode::Tx(_) => unreachable!("mutable operation is invalid with read tx"),
}
}
fn _drop(&mut self, name: &str, kind: DbType) -> Result<Code, ErrorVm> {
match self.tx {
TxMode::MutTx(tx) => {
match kind {
DbType::Table => {
if let Some(id) = self.db.table_id_from_name_mut(tx, name)? {
self.db.drop_table(self.ctx, tx, id)?;
}
}
DbType::Index => {
if let Some(id) = self.db.index_id_from_name(tx, name)? {
self.db.drop_index(tx, id)?;
}
}
DbType::Sequence => {
if let Some(id) = self.db.sequence_id_from_name(tx, name)? {
self.db.drop_sequence(tx, id)?;
}
}
DbType::Constraint => {
if let Some(id) = self.db.constraint_id_from_name(tx, name)? {
self.db.drop_constraint(tx, id)?;
}
}
}
Ok(Code::Pass)
}
TxMode::Tx(_) => unreachable!("mutable operation is invalid with read tx"),
}
}
fn _set_config(&mut self, name: String, value: AlgebraicValue) -> Result<Code, ErrorVm> {
self.db.set_config(&name, value)?;
Ok(Code::Pass)
}
fn _read_config(&self, name: String) -> Result<Code, ErrorVm> {
let config = self.db.read_config();
Ok(Code::Table(config.read_key_into_table(&name)?))
}
}
impl ProgramVm for DbProgram<'_, '_> {
fn eval_query<const N: usize>(&mut self, query: CrudExpr, sources: Sources<'_, N>) -> Result<Code, ErrorVm> {
query.check_auth(self.auth.owner, self.auth.caller)?;
match query {
CrudExpr::Query(query) => self._eval_query(&query, sources),
CrudExpr::Insert { source, rows } => {
let src = sources.take_table(&source).unwrap();
self._execute_insert(&src, rows)
}
CrudExpr::Update {
delete,
mut assignments,
} => {
let result = self._eval_query(&delete, sources)?;
let Code::Table(deleted) = result else {
return Ok(result);
};
let table = sources.take_table(&delete.source).unwrap();
self._execute_delete(&table, deleted.data.clone())?;
let exprs: Vec<Option<FieldExpr>> = table
.head()
.fields
.iter()
.map(|col| assignments.remove(&col.field))
.collect();
let insert_rows = deleted
.data
.into_iter()
.map(|row| {
let elements = row
.into_iter()
.zip(&exprs)
.map(|(val, expr)| {
if let Some(FieldExpr::Value(assigned)) = expr {
assigned.clone()
} else {
val
}
})
.collect();
ProductValue { elements }
})
.collect_vec();
self._execute_insert(&table, insert_rows)
}
CrudExpr::Delete { query } => self._delete_query(&query, sources),
CrudExpr::CreateTable { table } => self._create_table(table),
CrudExpr::Drop { name, kind, .. } => self._drop(&name, kind),
CrudExpr::SetVar { name, value } => self._set_config(name, value),
CrudExpr::ReadVar { name } => self._read_config(name),
}
}
}
impl<'a> RelOps<'a> for TableCursor<'a> {
fn head(&self) -> &Arc<Header> {
&self.table.head
}
fn next(&mut self) -> Result<Option<RelValue<'a>>, ErrorVm> {
Ok(self.iter.next().map(RelValue::Row))
}
}
impl<'a, R: RangeBounds<AlgebraicValue>> RelOps<'a> for IndexCursor<'a, R> {
fn head(&self) -> &Arc<Header> {
&self.table.head
}
fn next(&mut self) -> Result<Option<RelValue<'a>>, ErrorVm> {
Ok(self.iter.next().map(RelValue::Row))
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::db::datastore::system_tables::{
st_columns_schema, st_indexes_schema, st_sequences_schema, st_table_schema, StColumnFields, StColumnRow,
StIndexFields, StIndexRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, ST_COLUMNS_ID,
ST_COLUMNS_NAME, ST_INDEXES_ID, ST_INDEXES_NAME, ST_SEQUENCES_ID, ST_SEQUENCES_NAME, ST_TABLES_ID,
ST_TABLES_NAME,
};
use crate::db::relational_db::tests_utils::TestDB;
use crate::execution_context::ExecutionContext;
use spacetimedb_lib::error::ResultTest;
use spacetimedb_sats::db::auth::{StAccess, StTableType};
use spacetimedb_sats::db::def::{ColumnDef, IndexDef, IndexType, TableSchema};
use spacetimedb_sats::relation::FieldName;
use spacetimedb_sats::{product, AlgebraicType, ProductType, ProductValue};
use spacetimedb_vm::eval::run_ast;
use spacetimedb_vm::eval::test_helpers::{mem_table, mem_table_one_u64, scalar};
use spacetimedb_vm::operator::OpCmp;
pub(crate) fn create_table_with_rows(
db: &RelationalDB,
tx: &mut MutTx,
table_name: &str,
schema: ProductType,
rows: &[ProductValue],
) -> ResultTest<Arc<TableSchema>> {
let columns: Vec<_> = Vec::from(schema.elements)
.into_iter()
.enumerate()
.map(|(i, e)| ColumnDef {
col_name: e.name.unwrap_or_else(|| i.to_string().into()),
col_type: e.algebraic_type,
})
.collect();
let table_id = db.create_table(
tx,
TableDef::new(table_name.into(), columns)
.with_type(StTableType::User)
.with_access(StAccess::for_name(table_name)),
)?;
let schema = db.schema_for_table_mut(tx, table_id)?;
for row in rows {
db.insert(tx, table_id, row.clone())?;
}
Ok(schema)
}
fn create_inv_table(db: &RelationalDB, tx: &mut MutTx) -> ResultTest<(Arc<TableSchema>, ProductValue)> {
let schema_ty = ProductType::from([("inventory_id", AlgebraicType::U64), ("name", AlgebraicType::String)]);
let row = product!(1u64, "health");
let schema = create_table_with_rows(db, tx, "inventory", schema_ty.clone(), &[row.clone()])?;
Ok((schema, row))
}
#[test]
fn test_db_query_inner_join() -> ResultTest<()> {
let stdb = TestDB::durable()?;
let ctx = ExecutionContext::default();
let (schema, _) = stdb.with_auto_commit(&ctx, |tx| create_inv_table(&stdb, tx))?;
let table_id = schema.table_id;
let data = mem_table_one_u64(u32::MAX.into());
let rhs = *data.get_field_pos(0).unwrap();
let mut sources = SourceSet::<_, 1>::empty();
let rhs_source_expr = sources.add_mem_table(data);
let q =
QueryExpr::new(&*schema).with_join_inner(rhs_source_expr, FieldName::new(table_id, 0.into()), rhs, false);
let result = stdb.with_read_only(&ctx, |tx| {
let mut tx_mode = (&*tx).into();
let p = &mut DbProgram::new(&ctx, &stdb, &mut tx_mode, AuthCtx::for_testing());
match run_ast(p, q.into(), sources) {
Code::Table(x) => x,
x => panic!("invalid result {x}"),
}
});
let inv = ProductType::from([AlgebraicType::U64, AlgebraicType::String, AlgebraicType::U64]);
let row = product![1u64, "health", 1u64];
let input = mem_table(table_id, inv, vec![row]);
assert_eq!(result.data, input.data, "Inventory");
Ok(())
}
#[test]
fn test_db_query_semijoin() -> ResultTest<()> {
let stdb = TestDB::durable()?;
let ctx = ExecutionContext::default();
let (schema, row) = stdb.with_auto_commit(&ctx, |tx| create_inv_table(&stdb, tx))?;
let table_id = schema.table_id;
let data = mem_table_one_u64(u32::MAX.into());
let rhs = *data.get_field_pos(0).unwrap();
let mut sources = SourceSet::<_, 1>::empty();
let rhs_source_expr = sources.add_mem_table(data);
let q =
QueryExpr::new(&*schema).with_join_inner(rhs_source_expr, FieldName::new(table_id, 0.into()), rhs, true);
let result = stdb.with_read_only(&ctx, |tx| {
let mut tx_mode = (&*tx).into();
let p = &mut DbProgram::new(&ctx, &stdb, &mut tx_mode, AuthCtx::for_testing());
match run_ast(p, q.into(), sources) {
Code::Table(x) => x,
x => panic!("invalid result {x}"),
}
});
let input = mem_table(schema.table_id, schema.get_row_type().clone(), vec![row]);
assert_eq!(result.data, input.data, "Inventory");
Ok(())
}
fn check_catalog(db: &RelationalDB, name: &str, row: ProductValue, q: QueryExpr, schema: &TableSchema) {
let ctx = ExecutionContext::default();
let result = db.with_read_only(&ctx, |tx| {
let tx_mode = &mut (&*tx).into();
let p = &mut DbProgram::new(&ctx, db, tx_mode, AuthCtx::for_testing());
run_ast(p, q.into(), [].into())
});
let input = MemTable::from_iter(Header::from(schema).into(), [row]);
assert_eq!(result, Code::Table(input), "{}", name);
}
#[test]
fn test_query_catalog_tables() -> ResultTest<()> {
let stdb = TestDB::durable()?;
let schema = st_table_schema();
let q = QueryExpr::new(&schema).with_select_cmp(
OpCmp::Eq,
FieldName::new(ST_TABLES_ID, StTableFields::TableName.into()),
scalar(ST_TABLES_NAME),
);
let st_table_row = StTableRow {
table_id: ST_TABLES_ID,
table_name: ST_TABLES_NAME.into(),
table_type: StTableType::System,
table_access: StAccess::Public,
}
.into();
check_catalog(&stdb, ST_TABLES_NAME, st_table_row, q, &schema);
Ok(())
}
#[test]
fn test_query_catalog_columns() -> ResultTest<()> {
let stdb = TestDB::durable()?;
let schema = st_columns_schema();
let q = QueryExpr::new(&schema)
.with_select_cmp(
OpCmp::Eq,
FieldName::new(ST_COLUMNS_ID, StColumnFields::TableId.into()),
scalar(ST_COLUMNS_ID),
)
.with_select_cmp(
OpCmp::Eq,
FieldName::new(ST_COLUMNS_ID, StColumnFields::ColPos.into()),
scalar(StColumnFields::TableId as u32),
);
let st_column_row = StColumnRow {
table_id: ST_COLUMNS_ID,
col_pos: StColumnFields::TableId.col_id(),
col_name: StColumnFields::TableId.col_name(),
col_type: AlgebraicType::U32,
}
.into();
check_catalog(&stdb, ST_COLUMNS_NAME, st_column_row, q, &schema);
Ok(())
}
#[test]
fn test_query_catalog_indexes() -> ResultTest<()> {
let db = TestDB::durable()?;
let ctx = ExecutionContext::default();
let (schema, _) = db.with_auto_commit(&ctx, |tx| create_inv_table(&db, tx))?;
let table_id = schema.table_id;
let index = IndexDef::btree("idx_1".into(), ColId(0), true);
let index_id = db.with_auto_commit(&ctx, |tx| db.create_index(tx, table_id, index))?;
let indexes_schema = st_indexes_schema();
let q = QueryExpr::new(&indexes_schema).with_select_cmp(
OpCmp::Eq,
FieldName::new(ST_INDEXES_ID, StIndexFields::IndexName.into()),
scalar("idx_1"),
);
let st_index_row = StIndexRow {
index_id,
index_name: "idx_1".into(),
table_id,
columns: ColList::new(0.into()),
is_unique: true,
index_type: IndexType::BTree,
}
.into();
check_catalog(&db, ST_INDEXES_NAME, st_index_row, q, &indexes_schema);
Ok(())
}
#[test]
fn test_query_catalog_sequences() -> ResultTest<()> {
let db = TestDB::durable()?;
let schema = st_sequences_schema();
let q = QueryExpr::new(&schema).with_select_cmp(
OpCmp::Eq,
FieldName::new(ST_SEQUENCES_ID, StSequenceFields::TableId.into()),
scalar(ST_SEQUENCES_ID),
);
let st_sequence_row = StSequenceRow {
sequence_id: 3.into(),
sequence_name: "seq_st_sequence_sequence_id_primary_key_auto".into(),
table_id: 2.into(),
col_pos: 0.into(),
increment: 1,
start: 4,
min_value: 1,
max_value: i128::MAX,
allocated: 4096,
}
.into();
check_catalog(&db, ST_SEQUENCES_NAME, st_sequence_row, q, &schema);
Ok(())
}
}