1use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler};
2use crate::client::messages::{OneOffQueryResponseMessage, SerializableMessage};
3use crate::client::{ClientActorId, ClientConnectionSender};
4use crate::database_logger::{LogLevel, Record};
5use crate::db::relational_db::RelationalDB;
6use crate::energy::EnergyQuanta;
7use crate::error::DBError;
8use crate::estimation::estimate_rows_scanned;
9use crate::hash::Hash;
10use crate::identity::Identity;
11use crate::messages::control_db::Database;
12use crate::module_host_context::ModuleCreationContext;
13use crate::replica_context::ReplicaContext;
14use crate::sql::ast::SchemaViewer;
15use crate::sql::parser::RowLevelExpr;
16use crate::subscription::execute_plan;
17use crate::subscription::module_subscription_actor::ModuleSubscriptions;
18use crate::subscription::tx::DeltaTx;
19use crate::util::jobs::{JobCore, JobThread, JobThreadClosed, WeakJobThread};
20use crate::vm::check_row_limit;
21use crate::worker_metrics::WORKER_METRICS;
22use anyhow::Context;
23use bytes::Bytes;
24use derive_more::From;
25use indexmap::IndexSet;
26use itertools::Itertools;
27use prometheus::{Histogram, IntGauge};
28use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat};
29use spacetimedb_data_structures::error_stream::ErrorStream;
30use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
31use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
32use spacetimedb_datastore::locking_tx_datastore::MutTxId;
33use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
34use spacetimedb_execution::pipelined::PipelinedProject;
35use spacetimedb_lib::db::raw_def::v9::Lifecycle;
36use spacetimedb_lib::identity::{AuthCtx, RequestId};
37use spacetimedb_lib::metrics::ExecutionMetrics;
38use spacetimedb_lib::ConnectionId;
39use spacetimedb_lib::Timestamp;
40use spacetimedb_primitives::TableId;
41use spacetimedb_query::compile_subscription;
42use spacetimedb_sats::ProductValue;
43use spacetimedb_schema::auto_migrate::AutoMigrateError;
44use spacetimedb_schema::def::deserialize::ReducerArgsDeserializeSeed;
45use spacetimedb_schema::def::{ModuleDef, ReducerDef, TableDef};
46use spacetimedb_schema::schema::{Schema, TableSchema};
47use spacetimedb_vm::relation::RelValue;
48use std::fmt;
49use std::sync::{Arc, Weak};
50use std::time::{Duration, Instant};
51
52#[derive(Debug, Default, Clone, From)]
53pub struct DatabaseUpdate {
54 pub tables: Vec<DatabaseTableUpdate>,
55}
56
57impl FromIterator<DatabaseTableUpdate> for DatabaseUpdate {
58 fn from_iter<T: IntoIterator<Item = DatabaseTableUpdate>>(iter: T) -> Self {
59 DatabaseUpdate {
60 tables: iter.into_iter().collect(),
61 }
62 }
63}
64
65impl DatabaseUpdate {
66 pub fn is_empty(&self) -> bool {
67 if self.tables.len() == 0 {
68 return true;
69 }
70 false
71 }
72
73 pub fn from_writes(tx_data: &TxData) -> Self {
74 let mut map: IntMap<TableId, DatabaseTableUpdate> = IntMap::new();
75 let new_update = |table_id, table_name: &str| DatabaseTableUpdate {
76 table_id,
77 table_name: table_name.into(),
78 inserts: [].into(),
79 deletes: [].into(),
80 };
81 for (table_id, table_name, rows) in tx_data.inserts_with_table_name() {
82 map.entry(*table_id)
83 .or_insert_with(|| new_update(*table_id, table_name))
84 .inserts = rows.clone();
85 }
86 for (table_id, table_name, rows) in tx_data.deletes_with_table_name() {
87 map.entry(*table_id)
88 .or_insert_with(|| new_update(*table_id, table_name))
89 .deletes = rows.clone();
90 }
91 DatabaseUpdate {
92 tables: map.into_values().collect(),
93 }
94 }
95
96 pub fn num_rows(&self) -> usize {
98 self.tables.iter().map(|t| t.inserts.len() + t.deletes.len()).sum()
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct DatabaseTableUpdate {
104 pub table_id: TableId,
105 pub table_name: Box<str>,
106 pub inserts: Arc<[ProductValue]>,
110 pub deletes: Arc<[ProductValue]>,
111}
112
113#[derive(Debug)]
114pub struct DatabaseUpdateRelValue<'a> {
115 pub tables: Vec<DatabaseTableUpdateRelValue<'a>>,
116}
117
118#[derive(PartialEq, Debug)]
119pub struct DatabaseTableUpdateRelValue<'a> {
120 pub table_id: TableId,
121 pub table_name: Box<str>,
122 pub updates: UpdatesRelValue<'a>,
123}
124
125#[derive(Default, PartialEq, Debug)]
126pub struct UpdatesRelValue<'a> {
127 pub deletes: Vec<RelValue<'a>>,
128 pub inserts: Vec<RelValue<'a>>,
129}
130
131impl UpdatesRelValue<'_> {
132 pub fn has_updates(&self) -> bool {
134 !(self.deletes.is_empty() && self.inserts.is_empty())
135 }
136
137 pub fn encode<F: WebsocketFormat>(&self) -> (F::QueryUpdate, u64, usize) {
138 let (deletes, nr_del) = F::encode_list(self.deletes.iter());
139 let (inserts, nr_ins) = F::encode_list(self.inserts.iter());
140 let num_rows = nr_del + nr_ins;
141 let num_bytes = deletes.num_bytes() + inserts.num_bytes();
142 let qu = QueryUpdate { deletes, inserts };
143 let cqu = F::into_query_update(qu, Compression::None);
148 (cqu, num_rows, num_bytes)
149 }
150}
151
152#[derive(Debug, Clone)]
153pub enum EventStatus {
154 Committed(DatabaseUpdate),
155 Failed(String),
156 OutOfEnergy,
157}
158
159impl EventStatus {
160 pub fn database_update(&self) -> Option<&DatabaseUpdate> {
161 match self {
162 EventStatus::Committed(upd) => Some(upd),
163 _ => None,
164 }
165 }
166}
167
168#[derive(Debug, Clone, Default)]
169pub struct ModuleFunctionCall {
170 pub reducer: String,
171 pub reducer_id: ReducerId,
172 pub args: ArgsTuple,
173}
174
175#[derive(Debug, Clone)]
176pub struct ModuleEvent {
177 pub timestamp: Timestamp,
178 pub caller_identity: Identity,
179 pub caller_connection_id: Option<ConnectionId>,
180 pub function_call: ModuleFunctionCall,
181 pub status: EventStatus,
182 pub energy_quanta_used: EnergyQuanta,
183 pub host_execution_duration: Duration,
184 pub request_id: Option<RequestId>,
185 pub timer: Option<Instant>,
186}
187
188pub struct ModuleInfo {
190 pub module_def: ModuleDef,
194 pub owner_identity: Identity,
196 pub database_identity: Identity,
198 pub module_hash: Hash,
200 pub log_tx: tokio::sync::broadcast::Sender<bytes::Bytes>,
202 pub subscriptions: ModuleSubscriptions,
204 pub metrics: ModuleMetrics,
206}
207
208impl fmt::Debug for ModuleInfo {
209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 f.debug_struct("ModuleInfo")
211 .field("module_def", &self.module_def)
212 .field("owner_identity", &self.owner_identity)
213 .field("database_identity", &self.database_identity)
214 .field("module_hash", &self.module_hash)
215 .finish()
216 }
217}
218
219#[derive(Debug)]
220pub struct ModuleMetrics {
221 pub connected_clients: IntGauge,
222 pub ws_clients_spawned: IntGauge,
223 pub ws_clients_aborted: IntGauge,
224 pub request_round_trip_subscribe: Histogram,
225 pub request_round_trip_unsubscribe: Histogram,
226 pub request_round_trip_sql: Histogram,
227}
228
229impl ModuleMetrics {
230 fn new(db: &Identity) -> Self {
231 let connected_clients = WORKER_METRICS.connected_clients.with_label_values(db);
232 let ws_clients_spawned = WORKER_METRICS.ws_clients_spawned.with_label_values(db);
233 let ws_clients_aborted = WORKER_METRICS.ws_clients_aborted.with_label_values(db);
234 let request_round_trip_subscribe =
235 WORKER_METRICS
236 .request_round_trip
237 .with_label_values(&WorkloadType::Subscribe, db, "");
238 let request_round_trip_unsubscribe =
239 WORKER_METRICS
240 .request_round_trip
241 .with_label_values(&WorkloadType::Unsubscribe, db, "");
242 let request_round_trip_sql = WORKER_METRICS
243 .request_round_trip
244 .with_label_values(&WorkloadType::Sql, db, "");
245 Self {
246 connected_clients,
247 ws_clients_spawned,
248 ws_clients_aborted,
249 request_round_trip_subscribe,
250 request_round_trip_unsubscribe,
251 request_round_trip_sql,
252 }
253 }
254}
255
256impl ModuleInfo {
257 pub fn new(
260 module_def: ModuleDef,
261 owner_identity: Identity,
262 database_identity: Identity,
263 module_hash: Hash,
264 log_tx: tokio::sync::broadcast::Sender<bytes::Bytes>,
265 subscriptions: ModuleSubscriptions,
266 ) -> Arc<Self> {
267 let metrics = ModuleMetrics::new(&database_identity);
268 Arc::new(ModuleInfo {
269 module_def,
270 owner_identity,
271 database_identity,
272 module_hash,
273 log_tx,
274 subscriptions,
275 metrics,
276 })
277 }
278}
279
280pub struct ReducersMap(IndexSet<Box<str>>);
283
284impl<'a> FromIterator<&'a str> for ReducersMap {
285 fn from_iter<T: IntoIterator<Item = &'a str>>(iter: T) -> Self {
286 Self(iter.into_iter().map_into().collect())
287 }
288}
289
290impl fmt::Debug for ReducersMap {
291 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292 self.0.fmt(f)
293 }
294}
295
296impl ReducersMap {
297 pub fn lookup_id(&self, reducer_name: &str) -> Option<ReducerId> {
299 self.0.get_index_of(reducer_name).map(ReducerId::from)
300 }
301
302 pub fn lookup_name(&self, reducer_id: ReducerId) -> Option<&str> {
304 let result = self.0.get_index(reducer_id.0 as _)?;
305 Some(&**result)
306 }
307}
308
309pub trait ModuleRuntime {
311 fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result<impl Module>;
313}
314
315pub trait DynModule: Send + Sync + 'static {
316 fn replica_ctx(&self) -> &Arc<ReplicaContext>;
317 fn scheduler(&self) -> &Scheduler;
318}
319
320pub trait Module: DynModule {
321 type Instance: ModuleInstance;
322 type InitialInstances<'a>: IntoIterator<Item = Self::Instance> + 'a;
323 fn initial_instances(&mut self) -> Self::InitialInstances<'_>;
324 fn info(&self) -> Arc<ModuleInfo>;
325 fn create_instance(&self) -> Self::Instance;
326}
327
328pub trait ModuleInstance: Send + 'static {
329 fn trapped(&self) -> bool;
330
331 fn update_database(
333 &mut self,
334 program: Program,
335 old_module_info: Arc<ModuleInfo>,
336 ) -> anyhow::Result<UpdateDatabaseResult>;
337
338 fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult;
339}
340
341pub fn create_table_from_def(
343 stdb: &RelationalDB,
344 tx: &mut MutTxId,
345 module_def: &ModuleDef,
346 table_def: &TableDef,
347) -> anyhow::Result<()> {
348 let schema = TableSchema::from_module_def(module_def, table_def, (), TableId::SENTINEL);
349 stdb.create_table(tx, schema)
350 .with_context(|| format!("failed to create table {}", &table_def.name))?;
351 Ok(())
352}
353
354fn init_database(
356 replica_ctx: &ReplicaContext,
357 module_def: &ModuleDef,
358 inst: &mut dyn ModuleInstance,
359 program: Program,
360) -> anyhow::Result<Option<ReducerCallResult>> {
361 log::debug!("init database");
362 let timestamp = Timestamp::now();
363 let stdb = &*replica_ctx.relational_db;
364 let logger = replica_ctx.logger.system_logger();
365
366 let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
367 let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity);
368 let (tx, ()) = stdb
369 .with_auto_rollback(tx, |tx| {
370 let mut table_defs: Vec<_> = module_def.tables().collect();
371 table_defs.sort_by(|a, b| a.name.cmp(&b.name));
372
373 for def in table_defs {
374 logger.info(&format!("Creating table `{}`", &def.name));
375 create_table_from_def(stdb, tx, module_def, def)?;
376 }
377 for rls in module_def.row_level_security() {
379 logger.info(&format!("Creating row level security `{}`", rls.sql));
380
381 let rls = RowLevelExpr::build_row_level_expr(tx, &auth_ctx, rls)
382 .with_context(|| format!("failed to create row-level security: `{}`", rls.sql))?;
383 let table_id = rls.def.table_id;
384 let sql = rls.def.sql.clone();
385 stdb.create_row_level_security(tx, rls.def)
386 .with_context(|| format!("failed to create row-level security for table `{table_id}`: `{sql}`",))?;
387 }
388
389 stdb.set_initialized(tx, replica_ctx.host_type, program)?;
390
391 anyhow::Ok(())
392 })
393 .inspect_err(|e| log::error!("{e:?}"))?;
394
395 let rcr = match module_def.lifecycle_reducer(Lifecycle::Init) {
396 None => {
397 if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
398 stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data));
399 }
400 None
401 }
402
403 Some((reducer_id, _)) => {
404 logger.info("Invoking `init` reducer");
405 let caller_identity = replica_ctx.database.owner_identity;
406 Some(inst.call_reducer(
407 Some(tx),
408 CallReducerParams {
409 timestamp,
410 caller_identity,
411 caller_connection_id: ConnectionId::ZERO,
412 client: None,
413 request_id: None,
414 timer: None,
415 reducer_id,
416 args: ArgsTuple::nullary(),
417 },
418 ))
419 }
420 };
421
422 logger.info("Database initialized");
423 Ok(rcr)
424}
425
426pub struct CallReducerParams {
427 pub timestamp: Timestamp,
428 pub caller_identity: Identity,
429 pub caller_connection_id: ConnectionId,
430 pub client: Option<Arc<ClientConnectionSender>>,
431 pub request_id: Option<RequestId>,
432 pub timer: Option<Instant>,
433 pub reducer_id: ReducerId,
434 pub args: ArgsTuple,
435}
436
437struct AutoReplacingModuleInstance<T: Module> {
440 inst: T::Instance,
441 module: Arc<T>,
442}
443
444impl<T: Module> AutoReplacingModuleInstance<T> {
445 fn check_trap(&mut self) {
446 if self.inst.trapped() {
447 self.inst = self.module.create_instance()
448 }
449 }
450}
451
452impl<T: Module> ModuleInstance for AutoReplacingModuleInstance<T> {
453 fn trapped(&self) -> bool {
454 self.inst.trapped()
455 }
456 fn update_database(
457 &mut self,
458 program: Program,
459 old_module_info: Arc<ModuleInfo>,
460 ) -> anyhow::Result<UpdateDatabaseResult> {
461 let ret = self.inst.update_database(program, old_module_info);
462 self.check_trap();
463 ret
464 }
465 fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
466 let ret = self.inst.call_reducer(tx, params);
467 self.check_trap();
468 ret
469 }
470}
471
472#[derive(Clone)]
473pub struct ModuleHost {
474 pub info: Arc<ModuleInfo>,
475 module: Arc<dyn DynModule>,
476 on_panic: Arc<dyn Fn() + Send + Sync + 'static>,
478 job_tx: JobThread<dyn ModuleInstance>,
479}
480
481impl fmt::Debug for ModuleHost {
482 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483 f.debug_struct("ModuleHost")
484 .field("info", &self.info)
485 .field("module", &Arc::as_ptr(&self.module))
486 .finish()
487 }
488}
489
490pub struct WeakModuleHost {
491 info: Arc<ModuleInfo>,
492 inner: Weak<dyn DynModule>,
493 on_panic: Weak<dyn Fn() + Send + Sync + 'static>,
494 tx: WeakJobThread<dyn ModuleInstance>,
495}
496
497#[derive(Debug)]
498pub enum UpdateDatabaseResult {
499 NoUpdateNeeded,
500 UpdatePerformed,
501 AutoMigrateError(ErrorStream<AutoMigrateError>),
502 ErrorExecutingMigration(anyhow::Error),
503}
504impl UpdateDatabaseResult {
505 pub fn was_successful(&self) -> bool {
507 matches!(
508 self,
509 UpdateDatabaseResult::UpdatePerformed | UpdateDatabaseResult::NoUpdateNeeded
510 )
511 }
512}
513
514#[derive(thiserror::Error, Debug)]
515#[error("no such module")]
516pub struct NoSuchModule;
517
518#[derive(thiserror::Error, Debug)]
519pub enum ReducerCallError {
520 #[error(transparent)]
521 Args(#[from] InvalidReducerArguments),
522 #[error(transparent)]
523 NoSuchModule(#[from] NoSuchModule),
524 #[error("no such reducer")]
525 NoSuchReducer,
526 #[error("no such scheduled reducer")]
527 ScheduleReducerNotFound,
528 #[error("can't directly call special {0:?} lifecycle reducer")]
529 LifecycleReducer(Lifecycle),
530}
531
532#[derive(thiserror::Error, Debug)]
533pub enum InitDatabaseError {
534 #[error(transparent)]
535 Args(#[from] InvalidReducerArguments),
536 #[error(transparent)]
537 NoSuchModule(#[from] NoSuchModule),
538 #[error(transparent)]
539 Other(anyhow::Error),
540}
541
542#[derive(thiserror::Error, Debug)]
543pub enum ClientConnectedError {
544 #[error(transparent)]
545 ReducerCall(#[from] ReducerCallError),
546 #[error("Failed to insert `st_client` row for module without client_connected reducer: {0}")]
547 DBError(#[from] DBError),
548 #[error("Connection rejected by `client_connected` reducer: {0}")]
549 Rejected(String),
550 #[error("Insufficient energy balance to run `client_connected` reducer")]
551 OutOfEnergy,
552}
553
554impl ModuleHost {
555 pub(super) fn new(module: impl Module, on_panic: impl Fn() + Send + Sync + 'static, core: JobCore) -> Self {
556 let info = module.info();
557 let module = Arc::new(module);
558 let on_panic = Arc::new(on_panic);
559
560 let module_clone = module.clone();
561 let job_tx = core.start(
562 move || AutoReplacingModuleInstance {
563 inst: module_clone.create_instance(),
564 module: module_clone,
565 },
566 |x| x as &mut dyn ModuleInstance,
567 );
568 ModuleHost {
569 info,
570 module,
571 on_panic,
572 job_tx,
573 }
574 }
575
576 #[inline]
577 pub fn info(&self) -> &ModuleInfo {
578 &self.info
579 }
580
581 #[inline]
582 pub fn subscriptions(&self) -> &ModuleSubscriptions {
583 &self.info.subscriptions
584 }
585
586 pub async fn on_module_thread<F, R>(&self, label: &str, f: F) -> Result<R, anyhow::Error>
590 where
591 F: FnOnce() -> R + Send + 'static,
592 R: Send + 'static,
593 {
594 self.call(label, |_| f())
598 .await
599 .map_err(|_| anyhow::Error::from(NoSuchModule))
600 }
601
602 async fn call<F, R>(&self, label: &str, f: F) -> Result<R, NoSuchModule>
604 where
605 F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static,
606 R: Send + 'static,
607 {
608 let queue_timer = WORKER_METRICS
610 .reducer_wait_time
611 .with_label_values(&self.info.database_identity, label)
612 .start_timer();
613 let queue_length_gauge = WORKER_METRICS
614 .instance_queue_length
615 .with_label_values(&self.info.database_identity);
616 queue_length_gauge.inc();
617 {
618 let queue_length = queue_length_gauge.get();
619 WORKER_METRICS
620 .instance_queue_length_histogram
621 .with_label_values(&self.info.database_identity)
622 .observe(queue_length as f64);
623 }
624 let timer_guard = scopeguard::guard((), move |_| {
626 queue_length_gauge.dec();
629 queue_timer.stop_and_record();
630 });
631
632 scopeguard::defer_on_unwind!({
642 log::warn!("reducer {label} panicked");
643 (self.on_panic)();
644 });
645 self.job_tx
646 .run(move |inst| {
647 drop(timer_guard);
648 f(inst)
649 })
650 .await
651 .map_err(|_: JobThreadClosed| NoSuchModule)
652 }
653
654 pub async fn disconnect_client(&self, client_id: ClientActorId) {
655 log::trace!("disconnecting client {client_id}");
656 let this = self.clone();
657 if let Err(e) = self
658 .call("disconnect_client", move |inst| {
659 this.subscriptions().remove_subscriber(client_id);
662 this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst)
663 })
664 .await
665 {
666 log::error!("Error from client_disconnected transaction: {e}");
667 }
668 }
669
670 pub async fn call_identity_connected(
684 &self,
685 caller_identity: Identity,
686 caller_connection_id: ConnectionId,
687 ) -> Result<(), ClientConnectedError> {
688 let me = self.clone();
689 self.call("call_identity_connected", move |inst| {
690 let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnConnect);
691
692 if let Some((reducer_id, reducer_def)) = reducer_lookup {
693 let reducer_outcome = me.call_reducer_inner_with_inst(
698 caller_identity,
699 Some(caller_connection_id),
700 None,
701 None,
702 None,
703 reducer_id,
704 reducer_def,
705 ReducerArgs::Nullary,
706 inst,
707 )?;
708
709 match reducer_outcome.outcome {
710 ReducerOutcome::Committed => Ok(()),
721
722 ReducerOutcome::Failed(message) => Err(ClientConnectedError::Rejected(message)),
725 ReducerOutcome::BudgetExceeded => Err(ClientConnectedError::OutOfEnergy),
726 }
727 } else {
728 let reducer_name = reducer_lookup
734 .as_ref()
735 .map(|(_, def)| &*def.name)
736 .unwrap_or("__identity_connected__");
737
738 let workload = Workload::Reducer(ReducerContext {
739 name: reducer_name.to_owned(),
740 caller_identity,
741 caller_connection_id,
742 timestamp: Timestamp::now(),
743 arg_bsatn: Bytes::new(),
744 });
745
746 let stdb = me.module.replica_ctx().relational_db.clone();
747 stdb.with_auto_commit(workload, |mut_tx| {
748 mut_tx
749 .insert_st_client(caller_identity, caller_connection_id)
750 .map_err(DBError::from)
751 })
752 .inspect_err(|e| {
753 log::error!(
754 "`call_identity_connected`: fallback transaction to insert into `st_client` failed: {e:#?}"
755 )
756 })
757 .map_err(Into::into)
758 }
759 })
760 .await
761 .map_err(Into::<ReducerCallError>::into)?
762 }
763
764 pub fn call_identity_disconnected_inner(
765 &self,
766 caller_identity: Identity,
767 caller_connection_id: ConnectionId,
768 inst: &mut dyn ModuleInstance,
769 ) -> Result<(), ReducerCallError> {
770 let me = self.clone();
771 let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
772
773 let fallback = || {
775 let reducer_name = reducer_lookup
776 .as_ref()
777 .map(|(_, def)| &*def.name)
778 .unwrap_or("__identity_disconnected__");
779
780 let workload = Workload::Reducer(ReducerContext {
781 name: reducer_name.to_owned(),
782 caller_identity,
783 caller_connection_id,
784 timestamp: Timestamp::now(),
785 arg_bsatn: Bytes::new(),
786 });
787 let stdb = me.module.replica_ctx().relational_db.clone();
788 let database_identity = me.info.database_identity;
789 stdb.with_auto_commit(workload, |mut_tx| {
790 mut_tx
791 .delete_st_client(caller_identity, caller_connection_id, database_identity)
792 .map_err(DBError::from)
793 })
794 .map_err(|err| {
795 log::error!(
796 "`call_identity_disconnected`: fallback transaction to delete from `st_client` failed: {err}"
797 );
798 InvalidReducerArguments {
799 err: err.into(),
800 reducer: reducer_name.into(),
801 }
802 .into()
803 })
804 };
805
806 if let Some((reducer_id, reducer_def)) = reducer_lookup {
807 let result = me.call_reducer_inner_with_inst(
811 caller_identity,
812 Some(caller_connection_id),
813 None,
814 None,
815 None,
816 reducer_id,
817 reducer_def,
818 ReducerArgs::Nullary,
819 inst,
820 );
821
822 match result {
830 Err(e) => {
831 log::error!("call_reducer_inner of client_disconnected failed: {e:#?}");
832 fallback()
833 }
834 Ok(ReducerCallResult {
835 outcome: ReducerOutcome::Failed(_) | ReducerOutcome::BudgetExceeded,
836 ..
837 }) => fallback(),
838
839 Ok(ReducerCallResult {
841 outcome: ReducerOutcome::Committed,
842 ..
843 }) => Ok(()),
844 }
845 } else {
846 fallback()
849 }
850 }
851
852 pub async fn call_identity_disconnected(
868 &self,
869 caller_identity: Identity,
870 caller_connection_id: ConnectionId,
871 ) -> Result<(), ReducerCallError> {
872 let me = self.clone();
873 self.call("call_identity_disconnected", move |inst| {
874 me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst)
875 })
876 .await
877 .map_err(Into::<ReducerCallError>::into)?
878 }
879
880 async fn call_reducer_inner(
881 &self,
882 caller_identity: Identity,
883 caller_connection_id: Option<ConnectionId>,
884 client: Option<Arc<ClientConnectionSender>>,
885 request_id: Option<RequestId>,
886 timer: Option<Instant>,
887 reducer_id: ReducerId,
888 reducer_def: &ReducerDef,
889 args: ReducerArgs,
890 ) -> Result<ReducerCallResult, ReducerCallError> {
891 let reducer_seed = ReducerArgsDeserializeSeed(self.info.module_def.typespace().with_type(reducer_def));
892 let args = args.into_tuple(reducer_seed)?;
893 let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
894
895 self.call(&reducer_def.name, move |inst| {
896 inst.call_reducer(
897 None,
898 CallReducerParams {
899 timestamp: Timestamp::now(),
900 caller_identity,
901 caller_connection_id,
902 client,
903 request_id,
904 timer,
905 reducer_id,
906 args,
907 },
908 )
909 })
910 .await
911 .map_err(Into::into)
912 }
913 fn call_reducer_inner_with_inst(
914 &self,
915 caller_identity: Identity,
916 caller_connection_id: Option<ConnectionId>,
917 client: Option<Arc<ClientConnectionSender>>,
918 request_id: Option<RequestId>,
919 timer: Option<Instant>,
920 reducer_id: ReducerId,
921 reducer_def: &ReducerDef,
922 args: ReducerArgs,
923 module_instance: &mut dyn ModuleInstance,
924 ) -> Result<ReducerCallResult, ReducerCallError> {
925 let reducer_seed = ReducerArgsDeserializeSeed(self.info.module_def.typespace().with_type(reducer_def));
926 let args = args.into_tuple(reducer_seed)?;
927 let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
928
929 Ok(module_instance.call_reducer(
930 None,
931 CallReducerParams {
932 timestamp: Timestamp::now(),
933 caller_identity,
934 caller_connection_id,
935 client,
936 request_id,
937 timer,
938 reducer_id,
939 args,
940 },
941 ))
942 }
943
944 pub async fn call_reducer(
945 &self,
946 caller_identity: Identity,
947 caller_connection_id: Option<ConnectionId>,
948 client: Option<Arc<ClientConnectionSender>>,
949 request_id: Option<RequestId>,
950 timer: Option<Instant>,
951 reducer_name: &str,
952 args: ReducerArgs,
953 ) -> Result<ReducerCallResult, ReducerCallError> {
954 let res = async {
955 let (reducer_id, reducer_def) = self
956 .info
957 .module_def
958 .reducer_full(reducer_name)
959 .ok_or(ReducerCallError::NoSuchReducer)?;
960 if let Some(lifecycle) = reducer_def.lifecycle {
961 return Err(ReducerCallError::LifecycleReducer(lifecycle));
962 }
963 self.call_reducer_inner(
964 caller_identity,
965 caller_connection_id,
966 client,
967 request_id,
968 timer,
969 reducer_id,
970 reducer_def,
971 args,
972 )
973 .await
974 }
975 .await;
976
977 let log_message = match &res {
978 Err(ReducerCallError::NoSuchReducer) => Some(format!(
979 "External attempt to call nonexistent reducer \"{reducer_name}\" failed. Have you run `spacetime generate` recently?"
980 )),
981 Err(ReducerCallError::Args(_)) => Some(format!(
982 "External attempt to call reducer \"{reducer_name}\" failed, invalid arguments.\n\
983 This is likely due to a mismatched client schema, have you run `spacetime generate` recently?",
984 )),
985 _ => None,
986 };
987 if let Some(log_message) = log_message {
988 self.inject_logs(LogLevel::Error, &log_message)
989 }
990
991 res
992 }
993
994 pub async fn call_scheduled_reducer(
998 &self,
999 call_reducer_params: impl FnOnce(&MutTxId) -> anyhow::Result<Option<CallReducerParams>> + Send + 'static,
1000 ) -> Result<ReducerCallResult, ReducerCallError> {
1001 let db = self.module.replica_ctx().relational_db.clone();
1002 const REDUCER: &str = "scheduled_reducer";
1004 let module = self.info.clone();
1005 self.call(REDUCER, move |inst: &mut dyn ModuleInstance| {
1006 let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
1007
1008 match call_reducer_params(&mut tx) {
1009 Ok(Some(params)) => {
1010 let reducer_def = module
1012 .module_def
1013 .get_reducer_by_id(params.reducer_id)
1014 .ok_or(ReducerCallError::ScheduleReducerNotFound)?;
1015 let reducer = &*reducer_def.name;
1016
1017 tx.ctx = ExecutionContext::with_workload(
1018 tx.ctx.database_identity(),
1019 Workload::Reducer(ReducerContext {
1020 name: reducer.into(),
1021 caller_identity: params.caller_identity,
1022 caller_connection_id: params.caller_connection_id,
1023 timestamp: Timestamp::now(),
1024 arg_bsatn: params.args.get_bsatn().clone(),
1025 }),
1026 );
1027
1028 Ok(inst.call_reducer(Some(tx), params))
1029 }
1030 Ok(None) => Err(ReducerCallError::ScheduleReducerNotFound),
1031 Err(err) => Err(ReducerCallError::Args(InvalidReducerArguments {
1032 err,
1033 reducer: REDUCER.into(),
1034 })),
1035 }
1036 })
1037 .await
1038 .unwrap_or_else(|e| Err(e.into()))
1039 }
1040
1041 pub fn subscribe_to_logs(&self) -> anyhow::Result<tokio::sync::broadcast::Receiver<bytes::Bytes>> {
1042 Ok(self.info().log_tx.subscribe())
1043 }
1044
1045 pub async fn init_database(&self, program: Program) -> Result<Option<ReducerCallResult>, InitDatabaseError> {
1046 let replica_ctx = self.module.replica_ctx().clone();
1047 let info = self.info.clone();
1048 self.call("<init_database>", move |inst| {
1049 init_database(&replica_ctx, &info.module_def, inst, program)
1050 })
1051 .await?
1052 .map_err(InitDatabaseError::Other)
1053 }
1054
1055 pub async fn update_database(
1056 &self,
1057 program: Program,
1058 old_module_info: Arc<ModuleInfo>,
1059 ) -> Result<UpdateDatabaseResult, anyhow::Error> {
1060 self.call("<update_database>", move |inst| {
1061 inst.update_database(program, old_module_info)
1062 })
1063 .await?
1064 }
1065
1066 pub async fn exit(&self) {
1067 self.module.scheduler().close();
1068 self.job_tx.close();
1069 self.exited().await;
1070 }
1071
1072 pub async fn exited(&self) {
1073 tokio::join!(self.module.scheduler().closed(), self.job_tx.closed());
1074 }
1075
1076 pub fn inject_logs(&self, log_level: LogLevel, message: &str) {
1077 self.replica_ctx().logger.write(
1078 log_level,
1079 &Record {
1080 ts: chrono::Utc::now(),
1081 target: None,
1082 filename: Some("external"),
1083 line_number: None,
1084 message,
1085 },
1086 &(),
1087 )
1088 }
1089
1090 #[tracing::instrument(level = "trace", skip_all)]
1094 pub async fn one_off_query<F: WebsocketFormat>(
1095 &self,
1096 caller_identity: Identity,
1097 query: String,
1098 client: Arc<ClientConnectionSender>,
1099 message_id: Vec<u8>,
1100 timer: Instant,
1101 into_message: impl FnOnce(OneOffQueryResponseMessage<F>) -> SerializableMessage + Send + 'static,
1103 ) -> Result<(), anyhow::Error> {
1104 let replica_ctx = self.replica_ctx();
1105 let db = replica_ctx.relational_db.clone();
1106 let subscriptions = replica_ctx.subscriptions.clone();
1107 let auth = AuthCtx::new(replica_ctx.owner_identity, caller_identity);
1108 log::debug!("One-off query: {query}");
1109 let metrics = self
1110 .on_module_thread("one_off_query", move || {
1111 db.with_read_only(Workload::Sql, |tx| {
1112 let result: Result<(OneOffTable<F>, ExecutionMetrics), anyhow::Error> = (|| {
1115 let tx = SchemaViewer::new(tx, &auth);
1116
1117 let (
1118 plans,
1122 _,
1123 table_name,
1124 _,
1125 ) = compile_subscription(&query, &tx, &auth)?;
1126
1127 let optimized = plans
1129 .into_iter()
1130 .map(|plan| plan.optimize())
1131 .collect::<Result<Vec<_>, _>>()?;
1132
1133 check_row_limit(
1134 &optimized,
1135 &db,
1136 &tx,
1137 |plan, tx| estimate_rows_scanned(tx, plan),
1139 &auth,
1140 )?;
1141
1142 let optimized = optimized
1143 .into_iter()
1144 .map(PipelinedProject::from)
1146 .collect::<Vec<_>>();
1147
1148 execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx))
1150 .map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics))
1151 .context("One-off queries are not allowed to modify the database")
1152 })();
1153
1154 let total_host_execution_duration = timer.elapsed().into();
1155 let (message, metrics): (SerializableMessage, Option<ExecutionMetrics>) = match result {
1156 Ok((rows, metrics)) => (
1157 into_message(OneOffQueryResponseMessage {
1158 message_id,
1159 error: None,
1160 results: vec![rows],
1161 total_host_execution_duration,
1162 }),
1163 Some(metrics),
1164 ),
1165 Err(err) => (
1166 into_message(OneOffQueryResponseMessage {
1167 message_id,
1168 error: Some(format!("{err}")),
1169 results: vec![],
1170 total_host_execution_duration,
1171 }),
1172 None,
1173 ),
1174 };
1175
1176 subscriptions.send_client_message(client, message, tx)?;
1177 Ok::<Option<ExecutionMetrics>, anyhow::Error>(metrics)
1178 })
1179 })
1180 .await??;
1181
1182 if let Some(metrics) = metrics {
1183 replica_ctx
1185 .relational_db
1186 .exec_counters_for(WorkloadType::Sql)
1187 .record(&metrics);
1188 }
1189
1190 Ok(())
1191 }
1192
1193 pub fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error> {
1197 let db = &*self.replica_ctx().relational_db;
1198
1199 db.with_auto_commit(Workload::Internal, |tx| {
1200 let tables = db.get_all_tables_mut(tx)?;
1201 if let Some(table_id) = tables
1204 .iter()
1205 .find_map(|t| (&*t.table_name == table_name).then_some(t.table_id))
1206 {
1207 db.clear_table(tx, table_id)?;
1208 }
1209 Ok(())
1210 })
1211 }
1212
1213 pub fn downgrade(&self) -> WeakModuleHost {
1214 WeakModuleHost {
1215 info: self.info.clone(),
1216 inner: Arc::downgrade(&self.module),
1217 on_panic: Arc::downgrade(&self.on_panic),
1218 tx: self.job_tx.downgrade(),
1219 }
1220 }
1221
1222 pub fn database_info(&self) -> &Database {
1223 &self.replica_ctx().database
1224 }
1225
1226 pub(crate) fn replica_ctx(&self) -> &ReplicaContext {
1227 self.module.replica_ctx()
1228 }
1229}
1230
1231impl WeakModuleHost {
1232 pub fn upgrade(&self) -> Option<ModuleHost> {
1233 let inner = self.inner.upgrade()?;
1234 let on_panic = self.on_panic.upgrade()?;
1235 let tx = self.tx.upgrade()?;
1236 Some(ModuleHost {
1237 info: self.info.clone(),
1238 module: inner,
1239 on_panic,
1240 job_tx: tx,
1241 })
1242 }
1243}