use parking_lot::{Mutex, MutexGuard};
use smallvec::SmallVec;
use spacetimedb_sats::bsatn::ser::BsatnError;
use spacetimedb_table::table::{RowRef, UniqueConstraintViolation};
use spacetimedb_vm::relation::RelValue;
use std::ops::DerefMut;
use std::sync::Arc;
use super::scheduler::{ScheduleError, ScheduledReducerId, Scheduler};
use super::timestamp::Timestamp;
use crate::database_instance_context::DatabaseInstanceContext;
use crate::database_logger::{BacktraceProvider, LogLevel, Record};
use crate::db::datastore::locking_tx_datastore::MutTxId;
use crate::error::{IndexError, NodesError};
use crate::execution_context::ExecutionContext;
use crate::vm::{build_query, TxMode};
use spacetimedb_lib::filter::CmpArgs;
use spacetimedb_lib::operator::OpQuery;
use spacetimedb_lib::ProductValue;
use spacetimedb_primitives::{ColId, ColListBuilder, TableId};
use spacetimedb_sats::db::def::{IndexDef, IndexType};
use spacetimedb_sats::relation::{FieldExpr, FieldName};
use spacetimedb_sats::Typespace;
use spacetimedb_vm::expr::{ColumnOp, NoInMemUsed, QueryExpr};
#[derive(Clone)]
pub struct InstanceEnv {
pub dbic: Arc<DatabaseInstanceContext>,
pub scheduler: Scheduler,
pub tx: TxSlot,
}
#[derive(Clone, Default)]
pub struct TxSlot {
inner: Arc<Mutex<Option<MutTxId>>>,
ctx: Arc<Mutex<Option<ExecutionContext>>>,
}
#[derive(Default)]
struct ChunkedWriter {
chunks: Vec<Box<[u8]>>,
scratch_space: Vec<u8>,
}
impl ChunkedWriter {
fn write_row_ref_to_scratch(&mut self, row: RowRef<'_>) -> Result<(), BsatnError> {
row.to_bsatn_extend(&mut self.scratch_space)
}
fn write_rel_value_to_scratch(&mut self, row: &RelValue<'_>) -> Result<(), BsatnError> {
row.to_bsatn_extend(&mut self.scratch_space)
}
pub fn flush(&mut self) {
const ITER_CHUNK_SIZE: usize = 64 * 1024;
if self.scratch_space.len() > ITER_CHUNK_SIZE {
self.chunks.push(self.scratch_space.as_slice().into());
self.scratch_space.clear();
}
}
pub fn into_chunks(mut self) -> Vec<Box<[u8]>> {
if !self.scratch_space.is_empty() {
self.chunks.push(self.scratch_space.into());
}
self.chunks
}
}
impl InstanceEnv {
pub fn new(dbic: Arc<DatabaseInstanceContext>, scheduler: Scheduler) -> Self {
Self {
dbic,
scheduler,
tx: TxSlot::default(),
}
}
#[tracing::instrument(skip_all, fields(reducer=reducer))]
pub fn schedule(
&self,
reducer: String,
args: Vec<u8>,
time: Timestamp,
) -> Result<ScheduledReducerId, ScheduleError> {
self.scheduler.schedule(reducer, args, time)
}
#[tracing::instrument(skip_all)]
pub fn cancel_reducer(&self, id: ScheduledReducerId) {
self.scheduler.cancel(id)
}
fn get_tx(&self) -> Result<impl DerefMut<Target = MutTxId> + '_, GetTxError> {
self.tx.get()
}
pub fn get_ctx(&self) -> Result<impl DerefMut<Target = ExecutionContext> + '_, GetTxError> {
self.tx.get_ctx()
}
#[tracing::instrument(skip_all)]
pub fn console_log(&self, level: LogLevel, record: &Record, bt: &dyn BacktraceProvider) {
self.dbic.logger.write(level, record, bt);
log::trace!("MOD({}): {}", self.dbic.address.to_abbreviated_hex(), record.message);
}
pub fn insert(&self, ctx: &ExecutionContext, table_id: TableId, buffer: &[u8]) -> Result<ProductValue, NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;
let ret = stdb
.insert_bytes_as_row(tx, table_id, buffer)
.inspect_err(|e| match e {
crate::error::DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation {
constraint_name: _,
table_name: _,
cols: _,
value: _,
})) => {}
_ => {
let res = stdb.table_name_from_id_mut(ctx, tx, table_id);
if let Ok(Some(table_name)) = res {
log::debug!("insert(table: {table_name}, table_id: {table_id}): {e}")
} else {
log::debug!("insert(table_id: {table_id}): {e}")
}
}
})?;
Ok(ret)
}
pub fn delete_by_col_eq(
&self,
ctx: &ExecutionContext,
table_id: TableId,
col_id: ColId,
value: &[u8],
) -> Result<u32, NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;
let eq_value = &stdb.decode_column(tx, table_id, col_id, value)?;
let rows_to_delete = stdb
.iter_by_col_eq_mut(ctx, tx, table_id, col_id, eq_value)?
.map(|row_ref| row_ref.pointer())
.collect::<SmallVec<[_; 1]>>();
Ok(stdb.delete(tx, table_id, rows_to_delete))
}
#[tracing::instrument(skip(self, relation))]
pub fn delete_by_rel(&self, table_id: TableId, relation: &[u8]) -> Result<u32, NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;
let row_ty = stdb.row_schema_for_table(tx, table_id)?;
let relation = ProductValue::decode_smallvec(&row_ty, &mut &*relation).map_err(NodesError::DecodeRow)?;
Ok(stdb.delete_by_rel(tx, table_id, relation))
}
#[tracing::instrument(skip_all)]
pub fn get_table_id(&self, table_name: &str) -> Result<TableId, NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;
let table_id = stdb
.table_id_from_name_mut(tx, table_name)?
.ok_or(NodesError::TableNotFound)?;
Ok(table_id)
}
#[tracing::instrument(skip_all)]
pub fn create_index(
&self,
index_name: Box<str>,
table_id: TableId,
index_type: u8,
col_ids: Vec<u8>,
) -> Result<(), NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;
let index_type = IndexType::try_from(index_type).map_err(|_| NodesError::BadIndexType(index_type))?;
match index_type {
IndexType::BTree => {}
IndexType::Hash => {
todo!("Hash indexes not yet supported")
}
};
let columns = col_ids
.into_iter()
.map(Into::into)
.collect::<ColListBuilder>()
.build()
.expect("Attempt to create an index with zero columns");
let is_unique = stdb.column_constraints(tx, table_id, &columns)?.has_unique();
let index = IndexDef {
columns,
index_name,
is_unique,
index_type,
};
stdb.create_index(tx, table_id, index)?;
Ok(())
}
pub fn iter_by_col_eq(
&self,
ctx: &ExecutionContext,
table_id: TableId,
col_id: ColId,
value: &[u8],
) -> Result<Vec<u8>, NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;
let value = &stdb.decode_column(tx, table_id, col_id, value)?;
let results = stdb.iter_by_col_eq_mut(ctx, tx, table_id, col_id, value)?;
let mut bytes = Vec::new();
for result in results {
result.to_bsatn_extend(&mut bytes).unwrap();
}
Ok(bytes)
}
#[tracing::instrument(skip_all)]
pub fn iter_chunks(&self, ctx: &ExecutionContext, table_id: TableId) -> Result<Vec<Box<[u8]>>, NodesError> {
let mut chunked_writer = ChunkedWriter::default();
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.tx.get()?;
for row in stdb.iter_mut(ctx, tx, table_id)? {
chunked_writer.write_row_ref_to_scratch(row).unwrap();
chunked_writer.flush();
}
Ok(chunked_writer.into_chunks())
}
pub fn iter_filtered_chunks(
&self,
ctx: &ExecutionContext,
table_id: TableId,
filter: &[u8],
) -> Result<Vec<Box<[u8]>>, NodesError> {
use spacetimedb_lib::filter;
fn filter_to_column_op(table_id: TableId, filter: filter::Expr) -> ColumnOp {
match filter {
filter::Expr::Cmp(filter::Cmp {
op,
args: CmpArgs { lhs_field, rhs },
}) => ColumnOp::Cmp {
op: OpQuery::Cmp(op),
lhs: Box::new(ColumnOp::Field(FieldExpr::Name(FieldName::new(
table_id,
lhs_field.into(),
)))),
rhs: Box::new(ColumnOp::Field(match rhs {
filter::Rhs::Field(rhs_field) => FieldExpr::Name(FieldName::new(table_id, rhs_field.into())),
filter::Rhs::Value(rhs_value) => FieldExpr::Value(rhs_value),
})),
},
filter::Expr::Logic(filter::Logic { lhs, op, rhs }) => ColumnOp::Cmp {
op: OpQuery::Logic(op),
lhs: Box::new(filter_to_column_op(table_id, *lhs)),
rhs: Box::new(filter_to_column_op(table_id, *rhs)),
},
filter::Expr::Unary(_) => todo!("unary operations are not yet supported"),
}
}
let stdb = &self.dbic.relational_db;
let tx = &mut *self.tx.get()?;
let schema = stdb.schema_for_table_mut(tx, table_id)?;
let row_type = schema.get_row_type();
let filter = filter::Expr::from_bytes(
&Typespace::default(),
&row_type.elements,
filter,
)
.map_err(NodesError::DecodeFilter)?;
let query = QueryExpr::new(schema.as_ref())
.with_select(filter_to_column_op(table_id, filter))
.optimize(&|table_id, table_name| stdb.row_count(table_id, table_name));
let tx: TxMode = tx.into();
let mut query = build_query(ctx, stdb, &tx, &query, &mut NoInMemUsed)?;
let mut chunked_writer = ChunkedWriter::default();
while let Some(row) = query.next()? {
chunked_writer.write_rel_value_to_scratch(&row).unwrap();
chunked_writer.flush();
}
Ok(chunked_writer.into_chunks())
}
}
impl TxSlot {
pub fn set<T>(
&mut self,
ctx: ExecutionContext,
tx: MutTxId,
f: impl FnOnce() -> T,
) -> (ExecutionContext, MutTxId, T) {
self.ctx.lock().replace(ctx);
let prev = self.inner.lock().replace(tx);
assert!(prev.is_none(), "reentrant TxSlot::set");
let remove_tx = || self.inner.lock().take();
let remove_ctx = || self.ctx.lock().take();
let res = {
scopeguard::defer_on_unwind! { remove_ctx(); remove_tx();}
f()
};
let ctx = remove_ctx().expect("ctx was removed during transaction");
let tx = remove_tx().expect("tx was removed during transaction");
(ctx, tx, res)
}
pub fn get(&self) -> Result<impl DerefMut<Target = MutTxId> + '_, GetTxError> {
MutexGuard::try_map(self.inner.lock(), |map| map.as_mut()).map_err(|_| GetTxError)
}
pub fn get_ctx(&self) -> Result<impl DerefMut<Target = ExecutionContext> + '_, GetTxError> {
MutexGuard::try_map(self.ctx.lock(), |map| map.as_mut()).map_err(|_| GetTxError)
}
}
#[derive(Debug)]
pub struct GetTxError;
impl From<GetTxError> for NodesError {
fn from(_: GetTxError) -> Self {
NodesError::NotInTransaction
}
}