use crate::{
callbacks::DbCallbacks,
client_cache::ClientCache,
db_connection::DbContextImpl,
subscription::{OnEndedCallback, SubscriptionHandleImpl},
Event, ReducerEvent,
__codegen::InternalError,
};
use bytes::Bytes;
use spacetimedb_client_api_messages::websocket::{self as ws, common::RowListLen as _, v2::BsatnRowList};
use spacetimedb_lib::{bsatn, de::DeserializeOwned};
use std::fmt::Debug;
pub trait InModule {
type Module: SpacetimeModule;
}
#[derive(Default)]
pub struct QueryBuilder {
pub from: QueryTableAccessor,
}
#[derive(Default)]
pub struct QueryTableAccessor;
pub trait SpacetimeModule: Debug + Send + Sync + 'static {
type DbConnection: DbConnection<Module = Self>;
type EventContext: EventContext<Module = Self>;
type ReducerEventContext: ReducerEventContext<Module = Self>;
type ProcedureEventContext: ProcedureEventContext<Module = Self>;
type SubscriptionEventContext: SubscriptionEventContext<Module = Self>;
type ErrorContext: ErrorContext<Module = Self>;
type Reducer: Reducer<Module = Self>;
type DbView: InModule<Module = Self> + Send + 'static;
type Reducers: InModule<Module = Self> + Send + 'static;
type Procedures: InModule<Module = Self> + Send + 'static;
type DbUpdate: DbUpdate<Module = Self>;
type AppliedDiff<'r>: AppliedDiff<'r, Module = Self>;
type SubscriptionHandle: SubscriptionHandle<Module = Self>;
type QueryBuilder: Default + Send + 'static;
fn register_tables(client_cache: &mut ClientCache<Self>);
const ALL_TABLE_NAMES: &'static [&'static str];
}
pub trait DbUpdate:
TryFrom<ws::v2::TransactionUpdate, Error = crate::Error> + Default + Debug + InModule + Send + 'static
where
Self::Module: SpacetimeModule<DbUpdate = Self>,
{
fn apply_to_client_cache(
&self,
cache: &mut ClientCache<Self::Module>,
) -> <Self::Module as SpacetimeModule>::AppliedDiff<'_>;
fn parse_update(update: ws::v2::TransactionUpdate) -> crate::Result<Self> {
Self::try_from(update).map_err(|source| {
InternalError::failed_parse(std::any::type_name::<Self>(), "TransactionUpdate")
.with_cause(source)
.into()
})
}
fn parse_initial_rows(rows: ws::v2::QueryRows) -> crate::Result<Self>;
fn parse_unsubscribe_rows(rows: ws::v2::QueryRows) -> crate::Result<Self>;
}
pub trait AppliedDiff<'r>: InModule + Send
where
Self::Module: SpacetimeModule<AppliedDiff<'r> = Self>,
{
fn invoke_row_callbacks(
&self,
event: &<<Self as InModule>::Module as SpacetimeModule>::EventContext,
callbacks: &mut DbCallbacks<Self::Module>,
);
}
pub trait DbConnection: InModule + Send + 'static
where
Self::Module: SpacetimeModule<DbConnection = Self>,
{
fn new(imp: DbContextImpl<Self::Module>) -> Self;
}
pub trait AbstractEventContext: InModule + Send + 'static {
type Event;
fn event(&self) -> &Self::Event;
fn new(imp: DbContextImpl<Self::Module>, event: Self::Event) -> Self;
}
pub trait EventContext:
AbstractEventContext<Event = Event<<<Self as InModule>::Module as SpacetimeModule>::Reducer>>
where
Self::Module: SpacetimeModule<EventContext = Self>,
{
}
pub trait ReducerEventContext:
AbstractEventContext<Event = ReducerEvent<<<Self as InModule>::Module as SpacetimeModule>::Reducer>>
where
Self::Module: SpacetimeModule<ReducerEventContext = Self>,
{
}
pub trait ProcedureEventContext: AbstractEventContext<Event = ()>
where
Self::Module: SpacetimeModule<ProcedureEventContext = Self>,
{
}
pub trait SubscriptionEventContext: AbstractEventContext<Event = ()>
where
Self::Module: SpacetimeModule<SubscriptionEventContext = Self>,
{
}
pub trait ErrorContext: AbstractEventContext<Event = Option<crate::Error>>
where
Self::Module: SpacetimeModule<ErrorContext = Self>,
{
}
pub trait Reducer: InModule + std::fmt::Debug + Clone + Send + 'static
where
Self::Module: SpacetimeModule<Reducer = Self>,
{
fn reducer_name(&self) -> &'static str;
fn args_bsatn(&self) -> Result<Vec<u8>, bsatn::EncodeError>;
}
pub trait SubscriptionHandle: InModule + Clone + Send + 'static
where
Self::Module: SpacetimeModule<SubscriptionHandle = Self>,
{
fn new(imp: SubscriptionHandleImpl<Self::Module>) -> Self;
fn is_ended(&self) -> bool;
fn is_active(&self) -> bool;
fn unsubscribe_then(self, on_end: OnEndedCallback<Self::Module>) -> crate::Result<()>;
fn unsubscribe(self) -> crate::Result<()>;
}
#[derive(Debug)]
pub struct WithBsatn<Row> {
pub bsatn: Bytes,
pub row: Row,
}
#[derive(Debug)]
pub struct TableUpdate<Row> {
pub inserts: Vec<WithBsatn<Row>>,
pub deletes: Vec<WithBsatn<Row>>,
}
impl<Row> Default for TableUpdate<Row> {
fn default() -> Self {
Self {
inserts: Default::default(),
deletes: Default::default(),
}
}
}
impl<Row> TableUpdate<Row> {
pub fn append(&mut self, mut other: TableUpdate<Row>) {
self.inserts.append(&mut other.inserts);
self.deletes.append(&mut other.deletes);
}
pub(crate) fn is_empty(&self) -> bool {
self.inserts.is_empty() && self.deletes.is_empty()
}
pub fn into_event_diff(&self) -> crate::client_cache::TableAppliedDiff<'_, Row> {
crate::client_cache::TableAppliedDiff::from_event_inserts(&self.inserts)
}
}
impl<Row: DeserializeOwned + Debug> TableUpdate<Row> {
pub fn parse_table_update(raw_updates: ws::v2::TableUpdate) -> crate::Result<TableUpdate<Row>> {
let mut inserts = Vec::new();
let mut deletes = Vec::new();
for update in raw_updates.rows {
match update {
ws::v2::TableUpdateRows::EventTable(update) => {
Self::parse_from_row_list(&mut inserts, &update.events)?;
}
ws::v2::TableUpdateRows::PersistentTable(update) => {
Self::parse_from_row_list(&mut deletes, &update.deletes)?;
Self::parse_from_row_list(&mut inserts, &update.inserts)?;
}
}
}
Ok(Self { inserts, deletes })
}
fn parse_from_row_list(sink: &mut Vec<WithBsatn<Row>>, raw_rows: &ws::common::BsatnRowList) -> crate::Result<()> {
sink.reserve(raw_rows.len());
for raw_row in raw_rows {
sink.push(Self::parse_row(raw_row)?);
}
Ok(())
}
fn parse_row(bytes: Bytes) -> crate::Result<WithBsatn<Row>> {
let parsed = bsatn::from_slice::<Row>(&bytes).map_err(|source| {
InternalError::failed_parse(std::any::type_name::<Row>(), "row data").with_cause(source)
})?;
Ok(WithBsatn {
bsatn: bytes,
row: parsed,
})
}
}
pub fn parse_row_list_as_inserts<Row>(rows: BsatnRowList) -> crate::Result<TableUpdate<Row>>
where
Row: DeserializeOwned + Debug,
{
let mut inserts = Vec::new();
TableUpdate::parse_from_row_list(&mut inserts, &rows)?;
Ok(TableUpdate {
inserts,
deletes: Vec::new(),
})
}
pub fn parse_row_list_as_deletes<Row>(rows: BsatnRowList) -> crate::Result<TableUpdate<Row>>
where
Row: DeserializeOwned + Debug,
{
let mut deletes = Vec::new();
TableUpdate::parse_from_row_list(&mut deletes, &rows)?;
Ok(TableUpdate {
inserts: Vec::new(),
deletes,
})
}
pub fn transaction_update_iter_table_updates(
tx_update: ws::v2::TransactionUpdate,
) -> impl Iterator<Item = ws::v2::TableUpdate> {
Box::<[_]>::into_iter(tx_update.query_sets)
.flat_map(|query_set_update| Box::<[_]>::into_iter(query_set_update.tables))
}