use nonempty::NonEmpty;
use parking_lot::{Mutex, MutexGuard};
use spacetimedb_lib::{bsatn, ProductValue};
use std::ops::DerefMut;
use std::sync::Arc;
use crate::database_instance_context::DatabaseInstanceContext;
use crate::database_logger::{BacktraceProvider, LogLevel, Record};
use crate::db::datastore::locking_tx_datastore::{MutTxId, RowId};
use crate::db::datastore::traits::IndexDef;
use crate::error::{IndexError, NodesError};
use crate::execution_context::ExecutionContext;
use crate::util::ResultInspectExt;
use super::scheduler::{ScheduleError, ScheduledReducerId, Scheduler};
use super::timestamp::Timestamp;
use crate::vm::DbProgram;
use spacetimedb_lib::filter::CmpArgs;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::operator::OpQuery;
use spacetimedb_lib::relation::{FieldExpr, FieldName};
use spacetimedb_primitives::{ColId, TableId};
use spacetimedb_sats::buffer::BufWriter;
use spacetimedb_sats::{ProductType, Typespace};
use spacetimedb_vm::expr::{Code, ColumnOp};
#[derive(Clone)]
pub struct InstanceEnv {
pub dbic: Arc<DatabaseInstanceContext>,
pub scheduler: Scheduler,
pub tx: TxSlot,
}
#[derive(Clone, Default)]
pub struct TxSlot {
inner: Arc<Mutex<Option<MutTxId>>>,
}
#[derive(Default)]
struct ChunkedWriter {
chunks: Vec<Box<[u8]>>,
scratch_space: Vec<u8>,
}
impl BufWriter for ChunkedWriter {
fn put_slice(&mut self, slice: &[u8]) {
self.scratch_space.extend_from_slice(slice);
}
}
impl ChunkedWriter {
pub fn force_flush(&mut self) {
if !self.scratch_space.is_empty() {
self.chunks.push(self.scratch_space.as_slice().into());
self.scratch_space.clear();
}
}
pub fn flush(&mut self) {
const ITER_CHUNK_SIZE: usize = 64 * 1024;
if self.scratch_space.len() > ITER_CHUNK_SIZE {
self.force_flush();
}
}
pub fn into_chunks(mut self) -> Vec<Box<[u8]>> {
if !self.scratch_space.is_empty() {
self.chunks.push(self.scratch_space.into());
}
self.chunks
}
}
impl InstanceEnv {
pub fn new(dbic: Arc<DatabaseInstanceContext>, scheduler: Scheduler) -> Self {
Self {
dbic,
scheduler,
tx: TxSlot::default(),
}
}
#[tracing::instrument(skip_all, fields(reducer=reducer))]
pub fn schedule(
&self,
reducer: String,
args: Vec<u8>,
time: Timestamp,
) -> Result<ScheduledReducerId, ScheduleError> {
self.scheduler.schedule(reducer, args, time)
}
#[tracing::instrument(skip_all)]
pub fn cancel_reducer(&self, id: ScheduledReducerId) {
self.scheduler.cancel(id)
}
fn get_tx(&self) -> Result<impl DerefMut<Target = MutTxId> + '_, GetTxError> {
self.tx.get()
}
#[tracing::instrument(skip_all)]
pub fn console_log(&self, level: LogLevel, record: &Record, bt: &dyn BacktraceProvider) {
self.dbic.logger.write(level, record, bt);
log::trace!("MOD({}): {}", self.dbic.address.to_abbreviated_hex(), record.message);
}
pub fn insert(&self, ctx: &ExecutionContext, table_id: TableId, buffer: &[u8]) -> Result<ProductValue, NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;
let ret = stdb
.insert_bytes_as_row(tx, table_id, buffer)
.inspect_err_(|e| match e {
crate::error::DBError::Index(IndexError::UniqueConstraintViolation {
constraint_name: _,
table_name: _,
col_names: _,
value: _,
}) => {}
_ => {
let res = stdb.table_name_from_id(ctx, tx, table_id);
if let Ok(Some(table_name)) = res {
log::debug!("insert(table: {table_name}, table_id: {table_id}): {e}")
} else {
log::debug!("insert(table_id: {table_id}): {e}")
}
}
})?;
Ok(ret)
}
#[tracing::instrument(skip(self, ctx, value))]
pub fn delete_by_col_eq(
&self,
ctx: &ExecutionContext,
table_id: TableId,
col_id: ColId,
value: &[u8],
) -> Result<u32, NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;
let eq_value = stdb.decode_column(tx, table_id, col_id, value)?;
let rows_to_delete = stdb
.iter_by_col_eq(ctx, tx, table_id, col_id, eq_value)?
.map(|x| RowId(*x.id()))
.collect::<Vec<_>>();
Ok(stdb.delete(tx, table_id, rows_to_delete))
}
#[tracing::instrument(skip(self, relation))]
pub fn delete_by_rel(&self, table_id: TableId, relation: &[u8]) -> Result<u32, NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;
let row_ty = stdb.row_schema_for_table(tx, table_id)?;
let relation = ProductValue::decode_smallvec(&row_ty, &mut &*relation).map_err(NodesError::DecodeRow)?;
Ok(stdb.delete_by_rel(tx, table_id, relation))
}
#[tracing::instrument(skip_all)]
pub fn get_table_id(&self, table_name: String) -> Result<TableId, NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;
let table_id = stdb
.table_id_from_name(tx, &table_name)?
.ok_or(NodesError::TableNotFound)?;
Ok(table_id)
}
#[tracing::instrument(skip_all)]
pub fn create_index(
&self,
index_name: String,
table_id: TableId,
index_type: u8,
col_ids: Vec<u8>,
) -> Result<(), NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;
match index_type {
0 => (),
1 => todo!("Hash indexes not yet supported"),
_ => return Err(NodesError::BadIndexType(index_type)),
};
let cols = NonEmpty::from_slice(&col_ids)
.expect("Attempt to create an index with zero columns")
.map(Into::into);
let is_unique = stdb.column_attrs(tx, table_id, &cols)?.is_unique();
let index = IndexDef {
table_id,
cols,
name: index_name,
is_unique,
};
stdb.create_index(tx, index)?;
Ok(())
}
#[tracing::instrument(skip_all)]
pub fn iter_by_col_eq(
&self,
ctx: &ExecutionContext,
table_id: TableId,
col_id: ColId,
value: &[u8],
) -> Result<Vec<u8>, NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;
let value = stdb.decode_column(tx, table_id, col_id, value)?;
let results = stdb.iter_by_col_eq(ctx, tx, table_id, col_id, value)?;
let mut bytes = Vec::new();
for result in results {
bsatn::to_writer(&mut bytes, result.view()).unwrap();
}
Ok(bytes)
}
#[tracing::instrument(skip_all)]
pub fn iter_chunks(&self, ctx: &ExecutionContext, table_id: TableId) -> Result<Vec<Box<[u8]>>, NodesError> {
let mut chunked_writer = ChunkedWriter::default();
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.tx.get()?;
stdb.row_schema_for_table(tx, table_id)?.encode(&mut chunked_writer);
chunked_writer.force_flush();
for row in stdb.iter(ctx, tx, table_id)? {
row.view().encode(&mut chunked_writer);
chunked_writer.flush();
}
Ok(chunked_writer.into_chunks())
}
#[tracing::instrument(skip_all)]
pub fn iter_filtered_chunks(
&self,
ctx: &ExecutionContext,
table_id: TableId,
filter: &[u8],
) -> Result<Vec<Box<[u8]>>, NodesError> {
use spacetimedb_lib::filter;
fn filter_to_column_op(table_name: &str, filter: filter::Expr) -> ColumnOp {
match filter {
filter::Expr::Cmp(filter::Cmp {
op,
args: CmpArgs { lhs_field, rhs },
}) => ColumnOp::Cmp {
op: OpQuery::Cmp(op),
lhs: Box::new(ColumnOp::Field(FieldExpr::Name(FieldName::positional(
table_name,
lhs_field as usize,
)))),
rhs: Box::new(ColumnOp::Field(match rhs {
filter::Rhs::Field(rhs_field) => {
FieldExpr::Name(FieldName::positional(table_name, rhs_field as usize))
}
filter::Rhs::Value(rhs_value) => FieldExpr::Value(rhs_value),
})),
},
filter::Expr::Logic(filter::Logic { lhs, op, rhs }) => ColumnOp::Cmp {
op: OpQuery::Logic(op),
lhs: Box::new(filter_to_column_op(table_name, *lhs)),
rhs: Box::new(filter_to_column_op(table_name, *rhs)),
},
filter::Expr::Unary(_) => todo!("unary operations are not yet supported"),
}
}
let mut chunked_writer = ChunkedWriter::default();
let stdb = &self.dbic.relational_db;
let tx = &mut *self.tx.get()?;
let schema = stdb.schema_for_table(tx, table_id)?;
let row_type = ProductType::from(&*schema);
row_type.encode(&mut chunked_writer);
chunked_writer.force_flush();
let filter = filter::Expr::from_bytes(
&Typespace::default(),
&row_type.elements,
filter,
)
.map_err(NodesError::DecodeFilter)?;
let q = spacetimedb_vm::dsl::query(&*schema).with_select(filter_to_column_op(&schema.table_name, filter));
let p = &mut DbProgram::new(ctx, stdb, tx, AuthCtx::for_current(self.dbic.identity));
let results = match spacetimedb_vm::eval::run_ast(p, q.into()) {
Code::Table(table) => table,
_ => unreachable!("query should always return a table"),
};
for row in results.data {
row.data.encode(&mut chunked_writer);
chunked_writer.flush();
}
Ok(chunked_writer.into_chunks())
}
}
impl TxSlot {
pub fn set<T>(&self, tx: MutTxId, f: impl FnOnce() -> T) -> (MutTxId, T) {
let prev = self.inner.lock().replace(tx);
assert!(prev.is_none(), "reentrant TxSlot::set");
let remove_tx = || self.inner.lock().take();
let res = {
scopeguard::defer_on_unwind! { remove_tx(); }
f()
};
let tx = remove_tx().expect("tx was removed during transaction");
(tx, res)
}
pub fn get(&self) -> Result<impl DerefMut<Target = MutTxId> + '_, GetTxError> {
MutexGuard::try_map(self.inner.lock(), |map| map.as_mut()).map_err(|_| GetTxError)
}
}
#[derive(Debug)]
pub struct GetTxError;
impl From<GetTxError> for NodesError {
fn from(_: GetTxError) -> Self {
NodesError::NotInTransaction
}
}