#![deny(unsafe_op_in_unsafe_fn)]
use crate::query_builder::{FromWhere, HasCols, LeftSemiJoin, RawQuery, RightSemiJoin, Table as QbTable};
use crate::table::IndexAlgo;
use crate::{sys, AnonymousViewContext, IterBuf, ReducerContext, ReducerResult, SpacetimeType, Table, ViewContext};
use spacetimedb_lib::bsatn::EncodeError;
use spacetimedb_lib::db::raw_def::v10::{
CaseConversionPolicy, ExplicitNames as RawExplicitNames, RawModuleDefV10Builder,
};
pub use spacetimedb_lib::db::raw_def::v9::Lifecycle as LifecycleReducer;
use spacetimedb_lib::db::raw_def::v9::{RawIndexAlgorithm, TableType, ViewResultHeader};
use spacetimedb_lib::de::{self, Deserialize, DeserializeOwned, Error as _, SeqProductAccess};
use spacetimedb_lib::sats::typespace::TypespaceBuilder;
use spacetimedb_lib::sats::{impl_deserialize, impl_serialize, ProductTypeElement};
use spacetimedb_lib::ser::{Serialize, SerializeSeqProduct};
use spacetimedb_lib::{bsatn, AlgebraicType, ConnectionId, Identity, ProductType, RawModuleDef, Timestamp};
use spacetimedb_primitives::*;
use std::convert::Infallible;
use std::fmt;
use std::marker::PhantomData;
use std::sync::{Mutex, OnceLock};
pub use sys::raw::{BytesSink, BytesSource};
#[cfg(feature = "unstable")]
use crate::{ProcedureContext, ProcedureResult};
pub trait IntoVec<T> {
fn into_vec(self) -> Vec<T>;
}
impl<T> IntoVec<T> for Vec<T> {
fn into_vec(self) -> Vec<T> {
self
}
}
impl<T> IntoVec<T> for Option<T> {
fn into_vec(self) -> Vec<T> {
self.into_iter().collect()
}
}
pub fn invoke_reducer<'a, A: Args<'a>>(
reducer: impl Reducer<'a, A>,
ctx: &ReducerContext,
args: &'a [u8],
) -> Result<(), Box<str>> {
let SerDeArgs(args) = bsatn::from_slice(args).expect("unable to decode args");
reducer.invoke(ctx, args)
}
#[cfg(feature = "unstable")]
pub fn invoke_procedure<'a, A: Args<'a>, Ret: IntoProcedureResult>(
procedure: impl Procedure<'a, A, Ret>,
ctx: &mut ProcedureContext,
args: &'a [u8],
) -> ProcedureResult {
let SerDeArgs(args) = bsatn::from_slice(args).expect("unable to decode args");
let res = procedure.invoke(ctx, args);
res.to_result()
}
#[expect(clippy::duplicated_attributes, reason = "false positive")]
#[diagnostic::on_unimplemented(
message = "invalid reducer signature",
label = "this reducer signature is not valid",
note = "",
note = "reducer signatures must match the following pattern:",
note = " `Fn(&ReducerContext, [T1, ...]) [-> Result<(), impl Display>]`",
note = "where each `Ti` type implements `SpacetimeType`.",
note = ""
)]
pub trait Reducer<'de, A: Args<'de>> {
fn invoke(&self, ctx: &ReducerContext, args: A) -> ReducerResult;
}
pub fn invoke_view<'a, A: Args<'a>, T: ViewReturn>(
view: impl View<'a, A, T>,
ctx: ViewContext,
args: &'a [u8],
) -> Vec<u8> {
let SerDeArgs(args) = bsatn::from_slice(args).expect("unable to decode args");
let retn = view.invoke(&ctx, args);
let mut buf = IterBuf::take();
retn.to_writer(&mut buf).expect("unable to encode view return value");
std::mem::take(&mut *buf)
}
#[expect(clippy::duplicated_attributes, reason = "false positive")]
#[diagnostic::on_unimplemented(
message = "invalid view signature",
label = "this view signature is not valid",
note = "",
note = "view signatures must match:",
note = " `Fn(&ViewContext, [T1, ...]) -> Vec<Tn> | Option<Tn>`",
note = "where each `Ti` implements `SpacetimeType`.",
note = ""
)]
pub trait View<'de, A: Args<'de>, T: ViewReturn> {
fn invoke(&self, ctx: &ViewContext, args: A) -> T;
}
pub fn invoke_anonymous_view<'a, A: Args<'a>, T: ViewReturn>(
view: impl AnonymousView<'a, A, T>,
ctx: AnonymousViewContext,
args: &'a [u8],
) -> Vec<u8> {
let SerDeArgs(args) = bsatn::from_slice(args).expect("unable to decode args");
let retn = view.invoke(&ctx, args);
let mut buf = IterBuf::take();
retn.to_writer(&mut buf).expect("unable to encode view return value");
std::mem::take(&mut *buf)
}
#[expect(clippy::duplicated_attributes, reason = "false positive")]
#[diagnostic::on_unimplemented(
message = "invalid anonymous view signature",
label = "this view signature is not valid",
note = "",
note = "anonymous view signatures must match:",
note = " `Fn(&AnonymousViewContext, [T1, ...]) -> Vec<Tn> | Option<Tn>`",
note = "where each `Ti` implements `SpacetimeType`.",
note = ""
)]
pub trait AnonymousView<'de, A: Args<'de>, T: ViewReturn> {
fn invoke(&self, ctx: &AnonymousViewContext, args: A) -> T;
}
pub trait FnInfo: ExplicitNames {
type Invoke;
#[cfg_attr(
feature = "unstable",
doc = "One of [`FnKindReducer`], [`FnKindProcedure`] or [`FnKindView`]."
)]
#[cfg_attr(not(feature = "unstable"), doc = "Either [`FnKindReducer`] or [`FnKindView`].")]
type FnKind;
const NAME: &'static str;
const LIFECYCLE: Option<LifecycleReducer> = None;
const ARG_NAMES: &'static [Option<&'static str>];
const INVOKE: Self::Invoke;
fn return_type(_ts: &mut impl TypespaceBuilder) -> Option<AlgebraicType> {
None
}
}
#[cfg(feature = "unstable")]
pub trait Procedure<'de, A: Args<'de>, Ret: IntoProcedureResult> {
fn invoke(&self, ctx: &mut ProcedureContext, args: A) -> Ret;
}
pub trait Args<'de>: Sized {
const LEN: usize;
fn visit_seq_product<A: SeqProductAccess<'de>>(prod: A) -> Result<Self, A::Error>;
fn serialize_seq_product<S: SerializeSeqProduct>(&self, prod: &mut S) -> Result<(), S::Error>;
fn schema<I: FnInfo>(typespace: &mut impl TypespaceBuilder) -> ProductType;
}
#[diagnostic::on_unimplemented(
message = "`{Self}` is not a valid reducer return type",
note = "reducers cannot return values -- you can only return `()` or `Result<(), impl Display>`"
)]
pub trait IntoReducerResult {
fn into_result(self) -> Result<(), Box<str>>;
}
impl IntoReducerResult for () {
#[inline]
fn into_result(self) -> Result<(), Box<str>> {
Ok(self)
}
}
impl<E: fmt::Display> IntoReducerResult for Result<(), E> {
#[inline]
fn into_result(self) -> Result<(), Box<str>> {
self.map_err(|e| e.to_string().into())
}
}
#[cfg(feature = "unstable")]
#[diagnostic::on_unimplemented(
message = "The procedure return type `{Self}` does not implement `SpacetimeType`",
note = "if you own the type, try adding `#[derive(SpacetimeType)]` to its definition"
)]
pub trait IntoProcedureResult: SpacetimeType + Serialize {
#[inline]
fn to_result(&self) -> ProcedureResult {
bsatn::to_vec(&self).expect("Failed to serialize procedure result")
}
}
#[cfg(feature = "unstable")]
impl<T: SpacetimeType + Serialize> IntoProcedureResult for T {}
#[diagnostic::on_unimplemented(
message = "the first argument of a reducer must be `&ReducerContext`",
label = "first argument must be `&ReducerContext`"
)]
pub trait ReducerContextArg {
#[doc(hidden)]
const _ITEM: () = ();
}
impl ReducerContextArg for &ReducerContext {}
#[diagnostic::on_unimplemented(
message = "the reducer argument `{Self}` does not implement `SpacetimeType`",
note = "if you own the type, try adding `#[derive(SpacetimeType)]` to its definition"
)]
pub trait ReducerArg {
#[doc(hidden)]
const _ITEM: () = ();
}
impl<T: SpacetimeType> ReducerArg for T {}
#[cfg(feature = "unstable")]
#[diagnostic::on_unimplemented(
message = "the first argument of a procedure must be `&mut ProcedureContext`",
label = "first argument must be `&mut ProcedureContext`"
)]
pub trait ProcedureContextArg {
#[doc(hidden)]
const _ITEM: () = ();
}
#[cfg(feature = "unstable")]
impl ProcedureContextArg for &mut ProcedureContext {}
#[cfg(feature = "unstable")]
#[diagnostic::on_unimplemented(
message = "the procedure argument `{Self}` does not implement `SpacetimeType`",
note = "if you own the type, try adding `#[derive(SpacetimeType)]` to its definition"
)]
pub trait ProcedureArg {
#[doc(hidden)]
const _ITEM: () = ();
}
#[cfg(feature = "unstable")]
impl<T: SpacetimeType> ProcedureArg for T {}
#[diagnostic::on_unimplemented(
message = "The first parameter of a `#[view]` must be `&ViewContext` or `&AnonymousViewContext`"
)]
pub trait ViewContextArg {
#[doc(hidden)]
const _ITEM: () = ();
}
impl ViewContextArg for ViewContext {}
impl ViewContextArg for AnonymousViewContext {}
#[diagnostic::on_unimplemented(
message = "the view argument `{Self}` does not implement `SpacetimeType`",
note = "if you own the type, try adding `#[derive(SpacetimeType)]` to its definition"
)]
pub trait ViewArg {
#[doc(hidden)]
const _ITEM: () = ();
}
impl<T: SpacetimeType> ViewArg for T {}
#[diagnostic::on_unimplemented(message = "Views must return `Vec<T>` or `Option<T>` where `T` is a `SpacetimeType`")]
pub trait ViewReturn {
#[doc(hidden)]
const _ITEM: () = ();
fn to_writer(self, w: &mut Vec<u8>) -> Result<(), EncodeError>;
}
impl<T: SpacetimeType + Serialize> ViewReturn for Vec<T> {
fn to_writer(self, buf: &mut Vec<u8>) -> Result<(), EncodeError> {
bsatn::to_writer(buf, &ViewResultHeader::RowData)?;
bsatn::to_writer(buf, &self)
}
}
impl<T: SpacetimeType + Serialize> ViewReturn for Option<T> {
fn to_writer(self, buf: &mut Vec<u8>) -> Result<(), EncodeError> {
bsatn::to_writer(buf, &ViewResultHeader::RowData)?;
bsatn::to_writer(buf, self.as_slice())
}
}
impl<T: SpacetimeType + Serialize> ViewReturn for RawQuery<T> {
fn to_writer(self, buf: &mut Vec<u8>) -> Result<(), EncodeError> {
bsatn::to_writer(buf, &ViewResultHeader::RawSql(self.sql().to_string()))
}
}
impl<T: HasCols + SpacetimeType + Serialize> ViewReturn for QbTable<T> {
fn to_writer(self, buf: &mut Vec<u8>) -> Result<(), EncodeError> {
self.build().to_writer(buf)
}
}
impl<T: HasCols + SpacetimeType + Serialize> ViewReturn for FromWhere<T> {
fn to_writer(self, buf: &mut Vec<u8>) -> Result<(), EncodeError> {
self.build().to_writer(buf)
}
}
impl<L: HasCols + SpacetimeType + Serialize> ViewReturn for LeftSemiJoin<L> {
fn to_writer(self, buf: &mut Vec<u8>) -> Result<(), EncodeError> {
self.build().to_writer(buf)
}
}
impl<R: HasCols + SpacetimeType + Serialize, L: HasCols> ViewReturn for RightSemiJoin<R, L> {
fn to_writer(self, buf: &mut Vec<u8>) -> Result<(), EncodeError> {
self.build().to_writer(buf)
}
}
pub struct ViewKind<Ctx> {
_marker: PhantomData<Ctx>,
}
pub trait ViewKindTrait {
type InvokeFn;
}
impl ViewKindTrait for ViewKind<ViewContext> {
type InvokeFn = ViewFn;
}
impl ViewKindTrait for ViewKind<AnonymousViewContext> {
type InvokeFn = AnonymousFn;
}
pub struct ViewDispatcher<Ctx> {
_marker: PhantomData<Ctx>,
}
impl ViewDispatcher<ViewContext> {
#[inline]
pub fn invoke<'a, A, T, V>(view: V, ctx: ViewContext, args: &'a [u8]) -> Vec<u8>
where
A: Args<'a>,
T: ViewReturn,
V: View<'a, A, T>,
{
invoke_view(view, ctx, args)
}
}
impl ViewDispatcher<AnonymousViewContext> {
#[inline]
pub fn invoke<'a, A, T, V>(view: V, ctx: AnonymousViewContext, args: &'a [u8]) -> Vec<u8>
where
A: Args<'a>,
T: ViewReturn,
V: AnonymousView<'a, A, T>,
{
invoke_anonymous_view(view, ctx, args)
}
}
pub struct ViewRegistrar<Ctx> {
_marker: PhantomData<Ctx>,
}
impl ViewRegistrar<ViewContext> {
#[inline]
pub fn register<'a, A, I, T, V>(view: V)
where
A: Args<'a>,
T: ViewReturn,
I: FnInfo<Invoke = ViewFn>,
V: View<'a, A, T>,
{
register_view::<A, I, T>(view)
}
}
impl ViewRegistrar<AnonymousViewContext> {
#[inline]
pub fn register<'a, A, I, T, V>(view: V)
where
A: Args<'a>,
T: ViewReturn,
I: FnInfo<Invoke = AnonymousFn>,
V: AnonymousView<'a, A, T>,
{
register_anonymous_view::<A, I, T>(view)
}
}
pub const fn scheduled_typecheck<'de, Row, FnKind>(_x: impl ExportFunctionForScheduledTable<'de, Row, FnKind>)
where
Row: SpacetimeType + Serialize + Deserialize<'de>,
{
core::mem::forget(_x);
}
pub struct FnKindReducer {
_never: Infallible,
}
#[cfg(feature = "unstable")]
pub struct FnKindProcedure<Ret> {
_never: Infallible,
_ret_ty: PhantomData<fn() -> Ret>,
}
pub struct FnKindView {
_never: Infallible,
}
#[cfg_attr(
feature = "unstable",
doc = "It will be one of [`FnKindReducer`] or [`FnKindProcedure`] in modules that compile successfully."
)]
#[cfg_attr(
not(feature = "unstable"),
doc = "It will be [`FnKindReducer`] in modules that compile successfully."
)]
#[diagnostic::on_unimplemented(
message = "invalid signature for scheduled table reducer or procedure",
note = "views cannot be scheduled",
note = "the scheduled function must take `{TableRow}` as its sole argument",
note = "e.g: `fn scheduled_reducer(ctx: &ReducerContext, arg: {TableRow})`",
// note = "or `fn scheduled_procedure(ctx: &mut ProcedureContext, arg: {TableRow})`"
)]
pub trait ExportFunctionForScheduledTable<'de, TableRow, FnKind> {}
impl<'de, TableRow: SpacetimeType + Serialize + Deserialize<'de>, F: Reducer<'de, (TableRow,)>>
ExportFunctionForScheduledTable<'de, TableRow, FnKindReducer> for F
{
}
#[cfg(feature = "unstable")]
impl<
'de,
TableRow: SpacetimeType + Serialize + Deserialize<'de>,
Ret: SpacetimeType + Serialize + Deserialize<'de>,
F: Procedure<'de, (TableRow,), Ret>,
> ExportFunctionForScheduledTable<'de, TableRow, FnKindProcedure<Ret>> for F
{
}
pub struct DummyTypespace;
impl TypespaceBuilder for DummyTypespace {
fn add(
&mut self,
_: std::any::TypeId,
_: Option<&'static str>,
_: impl FnOnce(&mut Self) -> spacetimedb_lib::AlgebraicType,
) -> spacetimedb_lib::AlgebraicType {
unreachable!()
}
}
#[diagnostic::on_unimplemented(
message = "the column type `{Self}` does not implement `SpacetimeType`",
note = "table column types all must implement `SpacetimeType`",
note = "if you own the type, try adding `#[derive(SpacetimeType)]` to its definition"
)]
pub trait TableColumn {
#[doc(hidden)]
const _ITEM: () = ();
}
impl<T: SpacetimeType> TableColumn for T {}
pub const fn assert_scheduled_table_primary_key<T: ScheduledTablePrimaryKey>() {}
mod sealed {
pub trait Sealed {}
}
#[diagnostic::on_unimplemented(
message = "scheduled table primary key must be a `u64`",
label = "should be `u64`, not `{Self}`"
)]
pub trait ScheduledTablePrimaryKey: sealed::Sealed {}
impl sealed::Sealed for u64 {}
impl ScheduledTablePrimaryKey for u64 {}
pub struct ContextArg;
struct ArgsVisitor<A> {
_marker: PhantomData<A>,
}
impl<'de, A: Args<'de>> de::ProductVisitor<'de> for ArgsVisitor<A> {
type Output = A;
fn product_name(&self) -> Option<&str> {
None
}
fn product_len(&self) -> usize {
A::LEN
}
fn product_kind(&self) -> de::ProductKind {
de::ProductKind::ReducerArgs
}
fn visit_seq_product<Acc: SeqProductAccess<'de>>(self, prod: Acc) -> Result<Self::Output, Acc::Error> {
A::visit_seq_product(prod)
}
fn visit_named_product<Acc: de::NamedProductAccess<'de>>(self, _prod: Acc) -> Result<Self::Output, Acc::Error> {
Err(Acc::Error::named_products_not_supported())
}
}
macro_rules! impl_reducer_procedure_view {
($($T1:ident $(, $T:ident)*)?) => {
impl_reducer_procedure_view!(@impl $($T1 $(, $T)*)?);
$(impl_reducer_procedure_view!($($T),*);)?
};
(@impl $($T:ident),*) => {
impl<'de, $($T: SpacetimeType + Deserialize<'de> + Serialize),*> Args<'de> for ($($T,)*) {
const LEN: usize = impl_reducer_procedure_view!(@count $($T)*);
#[allow(non_snake_case)]
#[allow(unused)]
fn visit_seq_product<Acc: SeqProductAccess<'de>>(mut prod: Acc) -> Result<Self, Acc::Error> {
let vis = ArgsVisitor { _marker: PhantomData::<Self> };
let i = 0;
$(
let $T = prod.next_element::<$T>()?.ok_or_else(|| de::Error::missing_field(i, None, &vis))?;
let i = i + 1;
)*
Ok(($($T,)*))
}
#[allow(non_snake_case)]
fn serialize_seq_product<Ser: SerializeSeqProduct>(&self, _prod: &mut Ser) -> Result<(), Ser::Error> {
let ($($T,)*) = self;
$(_prod.serialize_element($T)?;)*
Ok(())
}
#[inline]
#[allow(non_snake_case, irrefutable_let_patterns)]
fn schema<Info: FnInfo>(_typespace: &mut impl TypespaceBuilder) -> ProductType {
let [.., $($T),*] = Info::ARG_NAMES else { panic!() };
ProductType::new(vec![
$(ProductTypeElement {
name: $T.map(Into::into),
algebraic_type: <$T>::make_type(_typespace),
}),*
].into())
}
}
impl<'de, Func, Ret, $($T: SpacetimeType + Deserialize<'de> + Serialize),*> Reducer<'de, ($($T,)*)> for Func
where
Func: Fn(&ReducerContext, $($T),*) -> Ret,
Ret: IntoReducerResult
{
#[allow(non_snake_case)]
fn invoke(&self, ctx: &ReducerContext, args: ($($T,)*)) -> Result<(), Box<str>> {
let ($($T,)*) = args;
self(ctx, $($T),*).into_result()
}
}
#[cfg(feature = "unstable")]
impl<'de, Func, Ret, $($T: SpacetimeType + Deserialize<'de> + Serialize),*> Procedure<'de, ($($T,)*), Ret> for Func
where
Func: Fn(&mut ProcedureContext, $($T),*) -> Ret,
Ret: IntoProcedureResult,
{
#[allow(non_snake_case)]
fn invoke(&self, ctx: &mut ProcedureContext, args: ($($T,)*)) -> Ret {
let ($($T,)*) = args;
self(ctx, $($T),*)
}
}
impl<'de, Func, Retn, $($T),*>
View<'de, ($($T,)*), Retn> for Func
where
$($T: SpacetimeType + Deserialize<'de> + Serialize,)*
Func: Fn(&ViewContext, $($T),*) -> Retn,
Retn: ViewReturn,
{
#[allow(non_snake_case)]
fn invoke(&self, ctx: &ViewContext, args: ($($T,)*)) -> Retn {
let ($($T,)*) = args;
self(ctx, $($T),*)
}
}
impl<'de, Func, Retn, $($T),*>
AnonymousView<'de, ($($T,)*), Retn> for Func
where
$($T: SpacetimeType + Deserialize<'de> + Serialize,)*
Func: Fn(&AnonymousViewContext, $($T),*) -> Retn,
Retn: ViewReturn,
{
#[allow(non_snake_case)]
fn invoke(&self, ctx: &AnonymousViewContext, args: ($($T,)*)) -> Retn {
let ($($T,)*) = args;
self(ctx, $($T),*)
}
}
};
(@count $($T:ident)*) => {
0 $(+ impl_reducer_procedure_view!(@drop $T 1))*
};
(@drop $a:tt $b:tt) => { $b };
}
impl_reducer_procedure_view!(
A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z, AA, AB, AC, AD, AE, AF
);
struct SerDeArgs<A>(A);
impl_deserialize!(
[A: Args<'de>] SerDeArgs<A>,
de => de.deserialize_product(ArgsVisitor { _marker: PhantomData }).map(Self)
);
impl_serialize!(['de, A: Args<'de>] SerDeArgs<A>, (self, ser) => {
let mut prod = ser.serialize_seq_product(A::LEN)?;
self.0.serialize_seq_product(&mut prod)?;
prod.end()
});
pub trait RowLevelSecurityInfo {
const SQL: &'static str;
}
trait DescriberFn: FnMut(&mut ModuleBuilder) + Send + 'static {}
impl<F: FnMut(&mut ModuleBuilder) + Send + 'static> DescriberFn for F {}
fn register_describer(f: impl DescriberFn) {
DESCRIBERS.lock().unwrap().push(Box::new(f))
}
pub fn register_reftype<T: SpacetimeType>() {
register_describer(|module| {
T::make_type(&mut module.inner);
})
}
pub fn register_table<T: Table>() {
register_describer(|module| {
let product_type_ref = *T::Row::make_type(&mut module.inner).as_ref().unwrap();
if let Some(schedule) = T::SCHEDULE {
module.inner.add_schedule(
T::TABLE_NAME,
schedule.scheduled_at_column,
schedule.reducer_or_procedure_name,
);
}
let mut table = module
.inner
.build_table(T::TABLE_NAME, product_type_ref)
.with_type(TableType::User)
.with_access(T::TABLE_ACCESS)
.with_event(T::IS_EVENT);
for &col in T::UNIQUE_COLUMNS {
table = table.with_unique_constraint(col);
}
for index in T::INDEXES {
table = table.with_index(index.algo.into(), index.source_name, index.accessor_name);
}
if let Some(primary_key) = T::PRIMARY_KEY {
table = table.with_primary_key(primary_key);
}
for &col in T::SEQUENCES {
table = table.with_column_sequence(col);
}
for col in T::get_default_col_values().iter_mut() {
table = table.with_default_column_value(col.col_id, col.value.clone())
}
table.finish();
module.inner.add_explicit_names(T::explicit_names());
})
}
impl From<IndexAlgo<'_>> for RawIndexAlgorithm {
fn from(algo: IndexAlgo<'_>) -> RawIndexAlgorithm {
match algo {
IndexAlgo::BTree { columns } => RawIndexAlgorithm::BTree {
columns: columns.iter().copied().collect(),
},
IndexAlgo::Hash { columns } => RawIndexAlgorithm::Hash {
columns: columns.iter().copied().collect(),
},
IndexAlgo::Direct { column } => RawIndexAlgorithm::Direct { column: column.into() },
}
}
}
pub fn register_reducer<'a, A: Args<'a>, I: FnInfo<Invoke = ReducerFn>>(_: impl Reducer<'a, A>) {
register_describer(|module| {
let params = A::schema::<I>(&mut module.inner);
if let Some(lifecycle) = I::LIFECYCLE {
module.inner.add_lifecycle_reducer(lifecycle, I::NAME, params);
} else {
module.inner.add_reducer(I::NAME, params);
}
module.reducers.push(I::INVOKE);
module.inner.add_explicit_names(I::explicit_names());
})
}
#[cfg(feature = "unstable")]
pub fn register_procedure<'a, A, Ret, I>(_: impl Procedure<'a, A, Ret>)
where
A: Args<'a>,
Ret: SpacetimeType + Serialize,
I: FnInfo<Invoke = ProcedureFn>,
{
register_describer(|module| {
let params = A::schema::<I>(&mut module.inner);
let ret_ty = <Ret as SpacetimeType>::make_type(&mut module.inner);
module.inner.add_procedure(I::NAME, params, ret_ty);
module.procedures.push(I::INVOKE);
module.inner.add_explicit_names(I::explicit_names());
})
}
pub fn register_view<'a, A, I, T>(_: impl View<'a, A, T>)
where
A: Args<'a>,
I: FnInfo<Invoke = ViewFn>,
T: ViewReturn,
{
register_describer(|module| {
let params = A::schema::<I>(&mut module.inner);
let return_type = I::return_type(&mut module.inner).unwrap();
module
.inner
.add_view(I::NAME, module.views.len(), true, false, params, return_type);
module.views.push(I::INVOKE);
module.inner.add_explicit_names(I::explicit_names());
})
}
pub fn register_anonymous_view<'a, A, I, T>(_: impl AnonymousView<'a, A, T>)
where
A: Args<'a>,
I: FnInfo<Invoke = AnonymousFn>,
T: ViewReturn,
{
register_describer(|module| {
let params = A::schema::<I>(&mut module.inner);
let return_type = I::return_type(&mut module.inner).unwrap();
module
.inner
.add_view(I::NAME, module.views_anon.len(), true, true, params, return_type);
module.views_anon.push(I::INVOKE);
module.inner.add_explicit_names(I::explicit_names());
})
}
pub fn register_row_level_security(sql: &'static str) {
register_describer(|module| {
module.inner.add_row_level_security(sql);
})
}
#[doc(hidden)]
pub fn register_case_conversion_policy(policy: CaseConversionPolicy) {
register_describer(move |module| {
module.inner.set_case_conversion_policy(policy);
})
}
#[derive(Default)]
pub struct ModuleBuilder {
inner: RawModuleDefV10Builder,
reducers: Vec<ReducerFn>,
#[cfg(feature = "unstable")]
procedures: Vec<ProcedureFn>,
views: Vec<ViewFn>,
views_anon: Vec<AnonymousFn>,
}
static DESCRIBERS: Mutex<Vec<Box<dyn DescriberFn>>> = Mutex::new(Vec::new());
pub type ReducerFn = fn(&ReducerContext, &[u8]) -> ReducerResult;
static REDUCERS: OnceLock<Vec<ReducerFn>> = OnceLock::new();
#[cfg(feature = "unstable")]
pub type ProcedureFn = fn(&mut ProcedureContext, &[u8]) -> ProcedureResult;
#[cfg(feature = "unstable")]
static PROCEDURES: OnceLock<Vec<ProcedureFn>> = OnceLock::new();
pub type ViewFn = fn(ViewContext, &[u8]) -> Vec<u8>;
static VIEWS: OnceLock<Vec<ViewFn>> = OnceLock::new();
pub type AnonymousFn = fn(AnonymousViewContext, &[u8]) -> Vec<u8>;
static ANONYMOUS_VIEWS: OnceLock<Vec<AnonymousFn>> = OnceLock::new();
#[unsafe(no_mangle)]
extern "C" fn __describe_module__(description: BytesSink) {
let mut module = ModuleBuilder::default();
for describer in &mut *DESCRIBERS.lock().unwrap() {
describer(&mut module)
}
let module_def = module.inner.finish();
let module_def = RawModuleDef::V10(module_def);
let bytes = bsatn::to_vec(&module_def).expect("unable to serialize typespace");
REDUCERS.set(module.reducers).ok().unwrap();
#[cfg(feature = "unstable")]
PROCEDURES.set(module.procedures).ok().unwrap();
VIEWS.set(module.views).ok().unwrap();
ANONYMOUS_VIEWS.set(module.views_anon).ok().unwrap();
write_to_sink(description, &bytes);
}
#[unsafe(no_mangle)]
extern "C" fn __call_reducer__(
id: usize,
sender_0: u64,
sender_1: u64,
sender_2: u64,
sender_3: u64,
conn_id_0: u64,
conn_id_1: u64,
timestamp: u64,
args: BytesSource,
error: BytesSink,
) -> i16 {
let sender = reconstruct_sender_identity(sender_0, sender_1, sender_2, sender_3);
let conn_id = reconstruct_connection_id(conn_id_0, conn_id_1);
let timestamp = Timestamp::from_micros_since_unix_epoch(timestamp as i64);
let ctx = ReducerContext::new(crate::Local {}, sender, conn_id, timestamp);
let reducers = REDUCERS.get().unwrap();
let res = with_read_args(args, |args| reducers[id](&ctx, args));
convert_err_to_errno(res, error)
}
fn reconstruct_sender_identity(sender_0: u64, sender_1: u64, sender_2: u64, sender_3: u64) -> Identity {
let sender = [sender_0, sender_1, sender_2, sender_3];
let sender: [u8; 32] = bytemuck::must_cast(sender);
Identity::from_byte_array(sender) }
fn reconstruct_connection_id(conn_id_0: u64, conn_id_1: u64) -> Option<ConnectionId> {
let conn_id = [conn_id_0, conn_id_1];
let conn_id: [u8; 16] = bytemuck::must_cast(conn_id);
let conn_id = ConnectionId::from_le_byte_array(conn_id); (conn_id != ConnectionId::ZERO).then_some(conn_id)
}
fn convert_err_to_errno(res: Result<(), Box<str>>, out: BytesSink) -> i16 {
match res {
Ok(()) => 0,
Err(msg) => {
write_to_sink(out, msg.as_bytes());
errno::HOST_CALL_FAILURE.get() as i16
}
}
}
#[cfg(feature = "unstable")]
#[unsafe(no_mangle)]
extern "C" fn __call_procedure__(
id: usize,
sender_0: u64,
sender_1: u64,
sender_2: u64,
sender_3: u64,
conn_id_0: u64,
conn_id_1: u64,
timestamp: u64,
args: BytesSource,
result_sink: BytesSink,
) -> i16 {
let sender = reconstruct_sender_identity(sender_0, sender_1, sender_2, sender_3);
let conn_id = reconstruct_connection_id(conn_id_0, conn_id_1);
let timestamp = Timestamp::from_micros_since_unix_epoch(timestamp as i64);
let mut ctx = ProcedureContext::new(sender, conn_id, timestamp);
let procedures = PROCEDURES.get().unwrap();
let res = with_read_args(args, |args| procedures[id](&mut ctx, args));
write_to_sink(result_sink, &res);
0
}
#[unsafe(no_mangle)]
extern "C" fn __call_view_anon__(id: usize, args: BytesSource, sink: BytesSink) -> i16 {
let views = ANONYMOUS_VIEWS.get().unwrap();
write_to_sink(
sink,
&with_read_args(args, |args| views[id](AnonymousViewContext::default(), args)),
);
2
}
#[unsafe(no_mangle)]
extern "C" fn __call_view__(
id: usize,
sender_0: u64,
sender_1: u64,
sender_2: u64,
sender_3: u64,
args: BytesSource,
sink: BytesSink,
) -> i16 {
let sender = [sender_0, sender_1, sender_2, sender_3];
let sender: [u8; 32] = bytemuck::must_cast(sender);
let sender = Identity::from_byte_array(sender);
let views = VIEWS.get().unwrap();
write_to_sink(
sink,
&with_read_args(args, |args| views[id](ViewContext::new(sender), args)),
);
2
}
fn with_read_args<R>(args: BytesSource, logic: impl FnOnce(&[u8]) -> R) -> R {
if args == BytesSource::INVALID {
return logic(&[]);
}
let mut buf = IterBuf::take();
read_bytes_source_into(args, &mut buf);
logic(&buf)
}
const NO_SPACE: u16 = errno::NO_SPACE.get();
const NO_SUCH_BYTES: u16 = errno::NO_SUCH_BYTES.get();
pub fn get_jwt(connection_id: ConnectionId) -> Option<String> {
let mut buf = IterBuf::take();
let source = sys::get_jwt(connection_id.as_le_byte_array())?;
if source == BytesSource::INVALID {
return None;
}
read_bytes_source_into(source, &mut buf);
Some(std::str::from_utf8(&buf).unwrap().to_string())
}
pub(crate) fn read_bytes_source_into(source: BytesSource, buf: &mut Vec<u8>) {
const INVALID: i16 = NO_SUCH_BYTES as i16;
let len = {
let mut len = 0;
let ret = unsafe { sys::raw::bytes_source_remaining_length(source, &raw mut len) };
match ret {
0 => len,
INVALID => panic!("invalid source passed"),
_ => unreachable!(),
}
};
buf.reserve(buf.len().saturating_sub(len as usize));
loop {
let buf_ptr = buf.spare_capacity_mut();
let spare_len = buf_ptr.len();
let mut buf_len = buf_ptr.len();
let buf_ptr = buf_ptr.as_mut_ptr().cast();
let ret = unsafe { sys::raw::bytes_source_read(source, buf_ptr, &mut buf_len) };
if ret <= 0 {
unsafe { buf.set_len(buf.len() + buf_len) };
}
match ret {
-1 => break,
0 if spare_len == buf_len => buf.reserve(1024),
0 => {}
INVALID => panic!("invalid source passed"),
_ => unreachable!(),
}
}
}
fn write_to_sink(sink: BytesSink, mut buf: &[u8]) {
loop {
let len = &mut buf.len();
match unsafe { sys::raw::bytes_sink_write(sink, buf.as_ptr(), len) } {
0 => {
(_, buf) = buf.split_at(*len);
if buf.is_empty() {
break;
}
}
NO_SUCH_BYTES => panic!("invalid sink passed"),
NO_SPACE => panic!("no space left at sink"),
_ => unreachable!(),
}
}
}
#[macro_export]
#[doc(hidden)]
macro_rules! __make_register_reftype {
($ty:ty, $name:literal) => {
const _: () = {
#[unsafe(export_name = concat!("__preinit__20_register_describer_", $name))]
extern "C" fn __register_describer() {
$crate::rt::register_reftype::<$ty>()
}
};
};
}
#[cfg(feature = "unstable")]
#[doc(hidden)]
pub fn volatile_nonatomic_schedule_immediate<'de, A: Args<'de>, R: Reducer<'de, A>, R2: FnInfo<Invoke = ReducerFn>>(
_reducer: R,
args: A,
) {
let arg_bytes = bsatn::to_vec(&SerDeArgs(args)).unwrap();
sys::volatile_nonatomic_schedule_immediate(R2::NAME, &arg_bytes)
}
#[cfg_attr(not(feature = "unstable"), allow(unused))]
pub(crate) fn read_bytes_source_as<T: DeserializeOwned + 'static>(source: BytesSource) -> T {
let mut buf = IterBuf::take();
read_bytes_source_into(source, &mut buf);
bsatn::from_slice::<T>(&buf)
.unwrap_or_else(|err| panic!("Failed to BSATN-deserialize `{}`: {err:#?}", std::any::type_name::<T>()))
}
pub trait ExplicitNames {
fn explicit_names() -> RawExplicitNames {
RawExplicitNames::default()
}
}