1use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler};
2use crate::client::messages::{OneOffQueryResponseMessage, SerializableMessage};
3use crate::client::{ClientActorId, ClientConnectionSender};
4use crate::database_logger::{LogLevel, Record};
5use crate::db::datastore::locking_tx_datastore::MutTxId;
6use crate::db::datastore::traits::{IsolationLevel, Program, TxData};
7use crate::energy::EnergyQuanta;
8use crate::error::DBError;
9use crate::estimation::estimate_rows_scanned;
10use crate::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
11use crate::hash::Hash;
12use crate::identity::Identity;
13use crate::messages::control_db::Database;
14use crate::replica_context::ReplicaContext;
15use crate::sql::ast::SchemaViewer;
16use crate::sql::parser::RowLevelExpr;
17use crate::subscription::execute_plan;
18use crate::subscription::module_subscription_actor::ModuleSubscriptions;
19use crate::subscription::tx::DeltaTx;
20use crate::util::asyncify;
21use crate::util::jobs::{JobCore, JobThread, JobThreadClosed, WeakJobThread};
22use crate::vm::check_row_limit;
23use crate::worker_metrics::WORKER_METRICS;
24use anyhow::Context;
25use bytes::Bytes;
26use derive_more::From;
27use indexmap::IndexSet;
28use itertools::Itertools;
29use prometheus::{Histogram, IntGauge};
30use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat};
31use spacetimedb_data_structures::error_stream::ErrorStream;
32use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
33use spacetimedb_execution::pipelined::PipelinedProject;
34use spacetimedb_lib::db::raw_def::v9::Lifecycle;
35use spacetimedb_lib::identity::{AuthCtx, RequestId};
36use spacetimedb_lib::metrics::ExecutionMetrics;
37use spacetimedb_lib::ConnectionId;
38use spacetimedb_lib::Timestamp;
39use spacetimedb_primitives::TableId;
40use spacetimedb_query::compile_subscription;
41use spacetimedb_sats::ProductValue;
42use spacetimedb_schema::auto_migrate::AutoMigrateError;
43use spacetimedb_schema::def::deserialize::ReducerArgsDeserializeSeed;
44use spacetimedb_schema::def::{ModuleDef, ReducerDef};
45use spacetimedb_schema::schema::{Schema, TableSchema};
46use spacetimedb_vm::relation::RelValue;
47use std::fmt;
48use std::sync::{Arc, Weak};
49use std::time::{Duration, Instant};
50
51#[derive(Debug, Default, Clone, From)]
52pub struct DatabaseUpdate {
53 pub tables: Vec<DatabaseTableUpdate>,
54}
55
56impl FromIterator<DatabaseTableUpdate> for DatabaseUpdate {
57 fn from_iter<T: IntoIterator<Item = DatabaseTableUpdate>>(iter: T) -> Self {
58 DatabaseUpdate {
59 tables: iter.into_iter().collect(),
60 }
61 }
62}
63
64impl DatabaseUpdate {
65 pub fn is_empty(&self) -> bool {
66 if self.tables.len() == 0 {
67 return true;
68 }
69 false
70 }
71
72 pub fn from_writes(tx_data: &TxData) -> Self {
73 let mut map: IntMap<TableId, DatabaseTableUpdate> = IntMap::new();
74 let new_update = |table_id, table_name: &str| DatabaseTableUpdate {
75 table_id,
76 table_name: table_name.into(),
77 inserts: [].into(),
78 deletes: [].into(),
79 };
80 for (table_id, table_name, rows) in tx_data.inserts_with_table_name() {
81 map.entry(*table_id)
82 .or_insert_with(|| new_update(*table_id, table_name))
83 .inserts = rows.clone();
84 }
85 for (table_id, table_name, rows) in tx_data.deletes_with_table_name() {
86 map.entry(*table_id)
87 .or_insert_with(|| new_update(*table_id, table_name))
88 .deletes = rows.clone();
89 }
90 DatabaseUpdate {
91 tables: map.into_values().collect(),
92 }
93 }
94
95 pub fn num_rows(&self) -> usize {
97 self.tables.iter().map(|t| t.inserts.len() + t.deletes.len()).sum()
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct DatabaseTableUpdate {
103 pub table_id: TableId,
104 pub table_name: Box<str>,
105 pub inserts: Arc<[ProductValue]>,
109 pub deletes: Arc<[ProductValue]>,
110}
111
112#[derive(Debug)]
113pub struct DatabaseUpdateRelValue<'a> {
114 pub tables: Vec<DatabaseTableUpdateRelValue<'a>>,
115}
116
117#[derive(PartialEq, Debug)]
118pub struct DatabaseTableUpdateRelValue<'a> {
119 pub table_id: TableId,
120 pub table_name: Box<str>,
121 pub updates: UpdatesRelValue<'a>,
122}
123
124#[derive(Default, PartialEq, Debug)]
125pub struct UpdatesRelValue<'a> {
126 pub deletes: Vec<RelValue<'a>>,
127 pub inserts: Vec<RelValue<'a>>,
128}
129
130impl UpdatesRelValue<'_> {
131 pub fn has_updates(&self) -> bool {
133 !(self.deletes.is_empty() && self.inserts.is_empty())
134 }
135
136 pub fn encode<F: WebsocketFormat>(&self) -> (F::QueryUpdate, u64, usize) {
137 let (deletes, nr_del) = F::encode_list(self.deletes.iter());
138 let (inserts, nr_ins) = F::encode_list(self.inserts.iter());
139 let num_rows = nr_del + nr_ins;
140 let num_bytes = deletes.num_bytes() + inserts.num_bytes();
141 let qu = QueryUpdate { deletes, inserts };
142 let cqu = F::into_query_update(qu, Compression::None);
147 (cqu, num_rows, num_bytes)
148 }
149}
150
151#[derive(Debug, Clone)]
152pub enum EventStatus {
153 Committed(DatabaseUpdate),
154 Failed(String),
155 OutOfEnergy,
156}
157
158impl EventStatus {
159 pub fn database_update(&self) -> Option<&DatabaseUpdate> {
160 match self {
161 EventStatus::Committed(upd) => Some(upd),
162 _ => None,
163 }
164 }
165}
166
167#[derive(Debug, Clone, Default)]
168pub struct ModuleFunctionCall {
169 pub reducer: String,
170 pub reducer_id: ReducerId,
171 pub args: ArgsTuple,
172}
173
174#[derive(Debug, Clone)]
175pub struct ModuleEvent {
176 pub timestamp: Timestamp,
177 pub caller_identity: Identity,
178 pub caller_connection_id: Option<ConnectionId>,
179 pub function_call: ModuleFunctionCall,
180 pub status: EventStatus,
181 pub energy_quanta_used: EnergyQuanta,
182 pub host_execution_duration: Duration,
183 pub request_id: Option<RequestId>,
184 pub timer: Option<Instant>,
185}
186
187#[derive(Debug)]
189pub struct ModuleInfo {
190 pub module_def: ModuleDef,
194 pub owner_identity: Identity,
196 pub database_identity: Identity,
198 pub module_hash: Hash,
200 pub log_tx: tokio::sync::broadcast::Sender<bytes::Bytes>,
202 pub subscriptions: ModuleSubscriptions,
204 pub metrics: ModuleMetrics,
206}
207
208#[derive(Debug)]
209pub struct ModuleMetrics {
210 pub connected_clients: IntGauge,
211 pub ws_clients_spawned: IntGauge,
212 pub ws_clients_aborted: IntGauge,
213 pub request_round_trip_subscribe: Histogram,
214 pub request_round_trip_unsubscribe: Histogram,
215 pub request_round_trip_sql: Histogram,
216}
217
218impl ModuleMetrics {
219 fn new(db: &Identity) -> Self {
220 let connected_clients = WORKER_METRICS.connected_clients.with_label_values(db);
221 let ws_clients_spawned = WORKER_METRICS.ws_clients_spawned.with_label_values(db);
222 let ws_clients_aborted = WORKER_METRICS.ws_clients_aborted.with_label_values(db);
223 let request_round_trip_subscribe =
224 WORKER_METRICS
225 .request_round_trip
226 .with_label_values(&WorkloadType::Subscribe, db, "");
227 let request_round_trip_unsubscribe =
228 WORKER_METRICS
229 .request_round_trip
230 .with_label_values(&WorkloadType::Unsubscribe, db, "");
231 let request_round_trip_sql = WORKER_METRICS
232 .request_round_trip
233 .with_label_values(&WorkloadType::Sql, db, "");
234 Self {
235 connected_clients,
236 ws_clients_spawned,
237 ws_clients_aborted,
238 request_round_trip_subscribe,
239 request_round_trip_unsubscribe,
240 request_round_trip_sql,
241 }
242 }
243}
244
245impl ModuleInfo {
246 pub fn new(
249 module_def: ModuleDef,
250 owner_identity: Identity,
251 database_identity: Identity,
252 module_hash: Hash,
253 log_tx: tokio::sync::broadcast::Sender<bytes::Bytes>,
254 subscriptions: ModuleSubscriptions,
255 ) -> Arc<Self> {
256 let metrics = ModuleMetrics::new(&database_identity);
257 Arc::new(ModuleInfo {
258 module_def,
259 owner_identity,
260 database_identity,
261 module_hash,
262 log_tx,
263 subscriptions,
264 metrics,
265 })
266 }
267}
268
269pub struct ReducersMap(IndexSet<Box<str>>);
272
273impl<'a> FromIterator<&'a str> for ReducersMap {
274 fn from_iter<T: IntoIterator<Item = &'a str>>(iter: T) -> Self {
275 Self(iter.into_iter().map_into().collect())
276 }
277}
278
279impl fmt::Debug for ReducersMap {
280 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
281 self.0.fmt(f)
282 }
283}
284
285impl ReducersMap {
286 pub fn lookup_id(&self, reducer_name: &str) -> Option<ReducerId> {
288 self.0.get_index_of(reducer_name).map(ReducerId::from)
289 }
290
291 pub fn lookup_name(&self, reducer_id: ReducerId) -> Option<&str> {
293 let result = self.0.get_index(reducer_id.0 as _)?;
294 Some(&**result)
295 }
296}
297
298pub trait DynModule: Send + Sync + 'static {
299 fn replica_ctx(&self) -> &Arc<ReplicaContext>;
300 fn scheduler(&self) -> &Scheduler;
301}
302
303pub trait Module: DynModule {
304 type Instance: ModuleInstance;
305 type InitialInstances<'a>: IntoIterator<Item = Self::Instance> + 'a;
306 fn initial_instances(&mut self) -> Self::InitialInstances<'_>;
307 fn info(&self) -> Arc<ModuleInfo>;
308 fn create_instance(&self) -> Self::Instance;
309}
310
311pub trait ModuleInstance: Send + 'static {
312 fn trapped(&self) -> bool;
313
314 fn update_database(
316 &mut self,
317 program: Program,
318 old_module_info: Arc<ModuleInfo>,
319 ) -> anyhow::Result<UpdateDatabaseResult>;
320
321 fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult;
322}
323
324fn init_database(
326 replica_ctx: &ReplicaContext,
327 module_def: &ModuleDef,
328 inst: &mut dyn ModuleInstance,
329 program: Program,
330) -> anyhow::Result<Option<ReducerCallResult>> {
331 log::debug!("init database");
332 let timestamp = Timestamp::now();
333 let stdb = &*replica_ctx.relational_db;
334 let logger = replica_ctx.logger.system_logger();
335
336 let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
337 let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity);
338 let (tx, ()) = stdb
339 .with_auto_rollback(tx, |tx| {
340 let mut table_defs: Vec<_> = module_def.tables().collect();
341 table_defs.sort_by(|a, b| a.name.cmp(&b.name));
342
343 for def in table_defs {
344 let table_name = &def.name;
345 logger.info(&format!("Creating table `{table_name}`"));
346 let schema = TableSchema::from_module_def(module_def, def, (), TableId::SENTINEL);
347 stdb.create_table(tx, schema)
348 .with_context(|| format!("failed to create table {table_name}"))?;
349 }
350 for rls in module_def.row_level_security() {
352 logger.info(&format!("Creating row level security `{}`", rls.sql));
353
354 let rls = RowLevelExpr::build_row_level_expr(tx, &auth_ctx, rls)
355 .with_context(|| format!("failed to create row-level security: `{}`", rls.sql))?;
356 let table_id = rls.def.table_id;
357 let sql = rls.def.sql.clone();
358 stdb.create_row_level_security(tx, rls.def)
359 .with_context(|| format!("failed to create row-level security for table `{table_id}`: `{sql}`",))?;
360 }
361
362 stdb.set_initialized(tx, replica_ctx.host_type, program)?;
363
364 anyhow::Ok(())
365 })
366 .inspect_err(|e| log::error!("{e:?}"))?;
367
368 let rcr = match module_def.lifecycle_reducer(Lifecycle::Init) {
369 None => {
370 if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
371 stdb.report(&reducer, &tx_metrics, Some(&tx_data));
372 }
373 None
374 }
375
376 Some((reducer_id, _)) => {
377 logger.info("Invoking `init` reducer");
378 let caller_identity = replica_ctx.database.owner_identity;
379 Some(inst.call_reducer(
380 Some(tx),
381 CallReducerParams {
382 timestamp,
383 caller_identity,
384 caller_connection_id: ConnectionId::ZERO,
385 client: None,
386 request_id: None,
387 timer: None,
388 reducer_id,
389 args: ArgsTuple::nullary(),
390 },
391 ))
392 }
393 };
394
395 logger.info("Database initialized");
396 Ok(rcr)
397}
398
399pub struct CallReducerParams {
400 pub timestamp: Timestamp,
401 pub caller_identity: Identity,
402 pub caller_connection_id: ConnectionId,
403 pub client: Option<Arc<ClientConnectionSender>>,
404 pub request_id: Option<RequestId>,
405 pub timer: Option<Instant>,
406 pub reducer_id: ReducerId,
407 pub args: ArgsTuple,
408}
409
410struct AutoReplacingModuleInstance<T: Module> {
413 inst: T::Instance,
414 module: Arc<T>,
415}
416
417impl<T: Module> AutoReplacingModuleInstance<T> {
418 fn check_trap(&mut self) {
419 if self.inst.trapped() {
420 self.inst = self.module.create_instance()
421 }
422 }
423}
424
425impl<T: Module> ModuleInstance for AutoReplacingModuleInstance<T> {
426 fn trapped(&self) -> bool {
427 self.inst.trapped()
428 }
429 fn update_database(
430 &mut self,
431 program: Program,
432 old_module_info: Arc<ModuleInfo>,
433 ) -> anyhow::Result<UpdateDatabaseResult> {
434 let ret = self.inst.update_database(program, old_module_info);
435 self.check_trap();
436 ret
437 }
438 fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
439 let ret = self.inst.call_reducer(tx, params);
440 self.check_trap();
441 ret
442 }
443}
444
445#[derive(Clone)]
446pub struct ModuleHost {
447 pub info: Arc<ModuleInfo>,
448 module: Arc<dyn DynModule>,
449 on_panic: Arc<dyn Fn() + Send + Sync + 'static>,
451 job_tx: JobThread<dyn ModuleInstance>,
452}
453
454impl fmt::Debug for ModuleHost {
455 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
456 f.debug_struct("ModuleHost")
457 .field("info", &self.info)
458 .field("module", &Arc::as_ptr(&self.module))
459 .finish()
460 }
461}
462
463pub struct WeakModuleHost {
464 info: Arc<ModuleInfo>,
465 inner: Weak<dyn DynModule>,
466 on_panic: Weak<dyn Fn() + Send + Sync + 'static>,
467 tx: WeakJobThread<dyn ModuleInstance>,
468}
469
470#[derive(Debug)]
471pub enum UpdateDatabaseResult {
472 NoUpdateNeeded,
473 UpdatePerformed,
474 AutoMigrateError(ErrorStream<AutoMigrateError>),
475 ErrorExecutingMigration(anyhow::Error),
476}
477impl UpdateDatabaseResult {
478 pub fn was_successful(&self) -> bool {
480 matches!(
481 self,
482 UpdateDatabaseResult::UpdatePerformed | UpdateDatabaseResult::NoUpdateNeeded
483 )
484 }
485}
486
487#[derive(thiserror::Error, Debug)]
488#[error("no such module")]
489pub struct NoSuchModule;
490
491#[derive(thiserror::Error, Debug)]
492pub enum ReducerCallError {
493 #[error(transparent)]
494 Args(#[from] InvalidReducerArguments),
495 #[error(transparent)]
496 NoSuchModule(#[from] NoSuchModule),
497 #[error("no such reducer")]
498 NoSuchReducer,
499 #[error("no such scheduled reducer")]
500 ScheduleReducerNotFound,
501 #[error("can't directly call special {0:?} lifecycle reducer")]
502 LifecycleReducer(Lifecycle),
503}
504
505#[derive(thiserror::Error, Debug)]
506pub enum InitDatabaseError {
507 #[error(transparent)]
508 Args(#[from] InvalidReducerArguments),
509 #[error(transparent)]
510 NoSuchModule(#[from] NoSuchModule),
511 #[error(transparent)]
512 Other(anyhow::Error),
513}
514
515#[derive(thiserror::Error, Debug)]
516pub enum ClientConnectedError {
517 #[error(transparent)]
518 ReducerCall(#[from] ReducerCallError),
519 #[error("Failed to insert `st_client` row for module without client_connected reducer: {0}")]
520 DBError(#[from] DBError),
521 #[error("Connection rejected by `client_connected` reducer: {0}")]
522 Rejected(String),
523 #[error("Insufficient energy balance to run `client_connected` reducer")]
524 OutOfEnergy,
525}
526
527impl ModuleHost {
528 pub(super) fn new(module: impl Module, on_panic: impl Fn() + Send + Sync + 'static, core: JobCore) -> Self {
529 let info = module.info();
530 let module = Arc::new(module);
531 let on_panic = Arc::new(on_panic);
532
533 let module_clone = module.clone();
534 let job_tx = core.start(
535 move || AutoReplacingModuleInstance {
536 inst: module_clone.create_instance(),
537 module: module_clone,
538 },
539 |x| x as &mut dyn ModuleInstance,
540 );
541 ModuleHost {
542 info,
543 module,
544 on_panic,
545 job_tx,
546 }
547 }
548
549 #[inline]
550 pub fn info(&self) -> &ModuleInfo {
551 &self.info
552 }
553
554 #[inline]
555 pub fn subscriptions(&self) -> &ModuleSubscriptions {
556 &self.info.subscriptions
557 }
558
559 async fn call<F, R>(&self, reducer: &str, f: F) -> Result<R, NoSuchModule>
560 where
561 F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static,
562 R: Send + 'static,
563 {
564 let queue_timer = WORKER_METRICS
566 .reducer_wait_time
567 .with_label_values(&self.info.database_identity, reducer)
568 .start_timer();
569 let queue_length_gauge = WORKER_METRICS
570 .instance_queue_length
571 .with_label_values(&self.info.database_identity);
572 queue_length_gauge.inc();
573 {
574 let queue_length = queue_length_gauge.get();
575 WORKER_METRICS
576 .instance_queue_length_histogram
577 .with_label_values(&self.info.database_identity)
578 .observe(queue_length as f64);
579 }
580 let timer_guard = scopeguard::guard((), move |_| {
582 queue_length_gauge.dec();
585 queue_timer.stop_and_record();
586 });
587
588 scopeguard::defer_on_unwind!({
598 log::warn!("reducer {reducer} panicked");
599 (self.on_panic)();
600 });
601 self.job_tx
602 .run(move |inst| {
603 drop(timer_guard);
604 f(inst)
605 })
606 .await
607 .map_err(|_: JobThreadClosed| NoSuchModule)
608 }
609
610 pub async fn disconnect_client(&self, client_id: ClientActorId) {
611 log::trace!("disconnecting client {}", client_id);
612 let this = self.clone();
613 asyncify(move || this.subscriptions().remove_subscriber(client_id)).await;
614 if let Err(e) = self
616 .call_identity_disconnected(client_id.identity, client_id.connection_id)
617 .await
618 {
619 log::error!("Error from client_disconnected transaction: {e}");
620 }
621 }
622
623 pub async fn call_identity_connected(
637 &self,
638 caller_identity: Identity,
639 caller_connection_id: ConnectionId,
640 ) -> Result<(), ClientConnectedError> {
641 let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnConnect);
642
643 if let Some((reducer_id, reducer_def)) = reducer_lookup {
644 let reducer_outcome = self
649 .call_reducer_inner(
650 caller_identity,
651 Some(caller_connection_id),
652 None,
653 None,
654 None,
655 reducer_id,
656 reducer_def,
657 ReducerArgs::Nullary,
658 )
659 .await?;
660
661 match reducer_outcome.outcome {
662 ReducerOutcome::Committed => Ok(()),
673
674 ReducerOutcome::Failed(message) => Err(ClientConnectedError::Rejected(message)),
677 ReducerOutcome::BudgetExceeded => Err(ClientConnectedError::OutOfEnergy),
678 }
679 } else {
680 let reducer_name = reducer_lookup
686 .as_ref()
687 .map(|(_, def)| &*def.name)
688 .unwrap_or("__identity_connected__");
689
690 let workload = Workload::Reducer(ReducerContext {
691 name: reducer_name.to_owned(),
692 caller_identity,
693 caller_connection_id,
694 timestamp: Timestamp::now(),
695 arg_bsatn: Bytes::new(),
696 });
697
698 let stdb = self.module.replica_ctx().relational_db.clone();
699 asyncify(move || {
700 stdb.with_auto_commit(workload, |mut_tx| {
701 mut_tx
702 .insert_st_client(caller_identity, caller_connection_id)
703 .map_err(DBError::from)
704 })
705 })
706 .await
707 .inspect_err(|e| {
708 log::error!("`call_identity_connected`: fallback transaction to insert into `st_client` failed: {e:#?}")
709 })
710 .map_err(DBError::from)
711 .map_err(Into::into)
712 }
713 }
714
715 pub async fn call_identity_disconnected(
731 &self,
732 caller_identity: Identity,
733 caller_connection_id: ConnectionId,
734 ) -> Result<(), ReducerCallError> {
735 let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
736
737 let fallback = || async {
739 let reducer_name = reducer_lookup
740 .as_ref()
741 .map(|(_, def)| &*def.name)
742 .unwrap_or("__identity_disconnected__");
743
744 let workload = Workload::Reducer(ReducerContext {
745 name: reducer_name.to_owned(),
746 caller_identity,
747 caller_connection_id,
748 timestamp: Timestamp::now(),
749 arg_bsatn: Bytes::new(),
750 });
751 let stdb = self.module.replica_ctx().relational_db.clone();
752 let database_identity = self.info.database_identity;
753 asyncify(move || {
754 stdb.with_auto_commit(workload, |mut_tx| {
755 mut_tx
756 .delete_st_client(caller_identity, caller_connection_id, database_identity)
757 .map_err(DBError::from)
758 })
759 })
760 .await
761 .map_err(|err| {
762 log::error!(
763 "`call_identity_disconnected`: fallback transaction to delete from `st_client` failed: {err}"
764 );
765 InvalidReducerArguments {
766 err: err.into(),
767 reducer: reducer_name.into(),
768 }
769 .into()
770 })
771 };
772
773 if let Some((reducer_id, reducer_def)) = reducer_lookup {
774 let result = self
778 .call_reducer_inner(
779 caller_identity,
780 Some(caller_connection_id),
781 None,
782 None,
783 None,
784 reducer_id,
785 reducer_def,
786 ReducerArgs::Nullary,
787 )
788 .await;
789
790 match result {
798 Err(e) => {
799 log::error!("call_reducer_inner of client_disconnected failed: {e:#?}");
800 fallback().await
801 }
802 Ok(ReducerCallResult {
803 outcome: ReducerOutcome::Failed(_) | ReducerOutcome::BudgetExceeded,
804 ..
805 }) => fallback().await,
806
807 Ok(ReducerCallResult {
809 outcome: ReducerOutcome::Committed,
810 ..
811 }) => Ok(()),
812 }
813 } else {
814 fallback().await
817 }
818 }
819
820 async fn call_reducer_inner(
821 &self,
822 caller_identity: Identity,
823 caller_connection_id: Option<ConnectionId>,
824 client: Option<Arc<ClientConnectionSender>>,
825 request_id: Option<RequestId>,
826 timer: Option<Instant>,
827 reducer_id: ReducerId,
828 reducer_def: &ReducerDef,
829 args: ReducerArgs,
830 ) -> Result<ReducerCallResult, ReducerCallError> {
831 let reducer_seed = ReducerArgsDeserializeSeed(self.info.module_def.typespace().with_type(reducer_def));
832 let args = args.into_tuple(reducer_seed)?;
833 let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
834
835 self.call(&reducer_def.name, move |inst| {
836 inst.call_reducer(
837 None,
838 CallReducerParams {
839 timestamp: Timestamp::now(),
840 caller_identity,
841 caller_connection_id,
842 client,
843 request_id,
844 timer,
845 reducer_id,
846 args,
847 },
848 )
849 })
850 .await
851 .map_err(Into::into)
852 }
853
854 pub async fn call_reducer(
855 &self,
856 caller_identity: Identity,
857 caller_connection_id: Option<ConnectionId>,
858 client: Option<Arc<ClientConnectionSender>>,
859 request_id: Option<RequestId>,
860 timer: Option<Instant>,
861 reducer_name: &str,
862 args: ReducerArgs,
863 ) -> Result<ReducerCallResult, ReducerCallError> {
864 let res = async {
865 let (reducer_id, reducer_def) = self
866 .info
867 .module_def
868 .reducer_full(reducer_name)
869 .ok_or(ReducerCallError::NoSuchReducer)?;
870 if let Some(lifecycle) = reducer_def.lifecycle {
871 return Err(ReducerCallError::LifecycleReducer(lifecycle));
872 }
873 self.call_reducer_inner(
874 caller_identity,
875 caller_connection_id,
876 client,
877 request_id,
878 timer,
879 reducer_id,
880 reducer_def,
881 args,
882 )
883 .await
884 }
885 .await;
886
887 let log_message = match &res {
888 Err(ReducerCallError::NoSuchReducer) => Some(format!(
889 "External attempt to call nonexistent reducer \"{}\" failed. Have you run `spacetime generate` recently?",
890 reducer_name
891 )),
892 Err(ReducerCallError::Args(_)) => Some(format!(
893 "External attempt to call reducer \"{}\" failed, invalid arguments.\n\
894 This is likely due to a mismatched client schema, have you run `spacetime generate` recently?",
895 reducer_name,
896 )),
897 _ => None,
898 };
899 if let Some(log_message) = log_message {
900 self.inject_logs(LogLevel::Error, &log_message)
901 }
902
903 res
904 }
905
906 pub async fn call_scheduled_reducer(
910 &self,
911 call_reducer_params: impl FnOnce(&MutTxId) -> anyhow::Result<Option<CallReducerParams>> + Send + 'static,
912 ) -> Result<ReducerCallResult, ReducerCallError> {
913 let db = self.module.replica_ctx().relational_db.clone();
914 const REDUCER: &str = "scheduled_reducer";
916 let module = self.info.clone();
917 self.call(REDUCER, move |inst: &mut dyn ModuleInstance| {
918 let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
919
920 match call_reducer_params(&mut tx) {
921 Ok(Some(params)) => {
922 let reducer_def = module
924 .module_def
925 .get_reducer_by_id(params.reducer_id)
926 .ok_or(ReducerCallError::ScheduleReducerNotFound)?;
927 let reducer = &*reducer_def.name;
928
929 tx.ctx = ExecutionContext::with_workload(
930 tx.ctx.database_identity(),
931 Workload::Reducer(ReducerContext {
932 name: reducer.into(),
933 caller_identity: params.caller_identity,
934 caller_connection_id: params.caller_connection_id,
935 timestamp: Timestamp::now(),
936 arg_bsatn: params.args.get_bsatn().clone(),
937 }),
938 );
939
940 Ok(inst.call_reducer(Some(tx), params))
941 }
942 Ok(None) => Err(ReducerCallError::ScheduleReducerNotFound),
943 Err(err) => Err(ReducerCallError::Args(InvalidReducerArguments {
944 err,
945 reducer: REDUCER.into(),
946 })),
947 }
948 })
949 .await
950 .unwrap_or_else(|e| Err(e.into()))
951 .map_err(Into::into)
952 }
953
954 pub fn subscribe_to_logs(&self) -> anyhow::Result<tokio::sync::broadcast::Receiver<bytes::Bytes>> {
955 Ok(self.info().log_tx.subscribe())
956 }
957
958 pub async fn init_database(&self, program: Program) -> Result<Option<ReducerCallResult>, InitDatabaseError> {
959 let replica_ctx = self.module.replica_ctx().clone();
960 let info = self.info.clone();
961 self.call("<init_database>", move |inst| {
962 init_database(&replica_ctx, &info.module_def, inst, program)
963 })
964 .await?
965 .map_err(InitDatabaseError::Other)
966 }
967
968 pub async fn update_database(
969 &self,
970 program: Program,
971 old_module_info: Arc<ModuleInfo>,
972 ) -> Result<UpdateDatabaseResult, anyhow::Error> {
973 self.call("<update_database>", move |inst| {
974 inst.update_database(program, old_module_info)
975 })
976 .await?
977 .map_err(Into::into)
978 }
979
980 pub async fn exit(&self) {
981 self.module.scheduler().close();
982 self.job_tx.close();
983 self.exited().await;
984 }
985
986 pub async fn exited(&self) {
987 tokio::join!(self.module.scheduler().closed(), self.job_tx.closed());
988 }
989
990 pub fn inject_logs(&self, log_level: LogLevel, message: &str) {
991 self.replica_ctx().logger.write(
992 log_level,
993 &Record {
994 ts: chrono::Utc::now(),
995 target: None,
996 filename: Some("external"),
997 line_number: None,
998 message,
999 },
1000 &(),
1001 )
1002 }
1003
1004 #[tracing::instrument(level = "trace", skip_all)]
1008 pub async fn one_off_query<F: WebsocketFormat>(
1009 &self,
1010 caller_identity: Identity,
1011 query: String,
1012 client: Arc<ClientConnectionSender>,
1013 message_id: Vec<u8>,
1014 timer: Instant,
1015 into_message: impl FnOnce(OneOffQueryResponseMessage<F>) -> SerializableMessage + Send + 'static,
1017 ) -> Result<(), anyhow::Error> {
1018 let replica_ctx = self.replica_ctx();
1019 let db = replica_ctx.relational_db.clone();
1020 let subscriptions = replica_ctx.subscriptions.clone();
1021 let auth = AuthCtx::new(replica_ctx.owner_identity, caller_identity);
1022 log::debug!("One-off query: {query}");
1023 let metrics = asyncify(move || {
1024 db.with_read_only(Workload::Sql, |tx| {
1025 let result: Result<(OneOffTable<F>, ExecutionMetrics), anyhow::Error> = (|| {
1028 let tx = SchemaViewer::new(tx, &auth);
1029
1030 let (
1031 plans,
1035 _,
1036 table_name,
1037 _,
1038 ) = compile_subscription(&query, &tx, &auth)?;
1039
1040 let optimized = plans
1042 .into_iter()
1043 .map(|plan| plan.optimize())
1044 .collect::<Result<Vec<_>, _>>()?;
1045
1046 check_row_limit(
1047 &optimized,
1048 &db,
1049 &tx,
1050 |plan, tx| estimate_rows_scanned(tx, plan),
1052 &auth,
1053 )?;
1054
1055 let optimized = optimized
1056 .into_iter()
1057 .map(PipelinedProject::from)
1059 .collect::<Vec<_>>();
1060
1061 execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx))
1063 .map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics))
1064 .context("One-off queries are not allowed to modify the database")
1065 })();
1066
1067 let total_host_execution_duration = timer.elapsed().into();
1068 let (message, metrics): (SerializableMessage, Option<ExecutionMetrics>) = match result {
1069 Ok((rows, metrics)) => (
1070 into_message(OneOffQueryResponseMessage {
1071 message_id,
1072 error: None,
1073 results: vec![rows],
1074 total_host_execution_duration,
1075 }),
1076 Some(metrics),
1077 ),
1078 Err(err) => (
1079 into_message(OneOffQueryResponseMessage {
1080 message_id,
1081 error: Some(format!("{}", err)),
1082 results: vec![],
1083 total_host_execution_duration,
1084 }),
1085 None,
1086 ),
1087 };
1088
1089 subscriptions.send_client_message(client, message, tx)?;
1090 Ok::<Option<ExecutionMetrics>, anyhow::Error>(metrics)
1091 })
1092 })
1093 .await?;
1094
1095 if let Some(metrics) = metrics {
1096 replica_ctx
1098 .relational_db
1099 .exec_counters_for(WorkloadType::Sql)
1100 .record(&metrics);
1101 }
1102
1103 Ok(())
1104 }
1105
1106 pub fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error> {
1110 let db = &*self.replica_ctx().relational_db;
1111
1112 db.with_auto_commit(Workload::Internal, |tx| {
1113 let tables = db.get_all_tables_mut(tx)?;
1114 if let Some(table_id) = tables
1117 .iter()
1118 .find_map(|t| (&*t.table_name == table_name).then_some(t.table_id))
1119 {
1120 db.clear_table(tx, table_id)?;
1121 }
1122 Ok(())
1123 })
1124 }
1125
1126 pub fn downgrade(&self) -> WeakModuleHost {
1127 WeakModuleHost {
1128 info: self.info.clone(),
1129 inner: Arc::downgrade(&self.module),
1130 on_panic: Arc::downgrade(&self.on_panic),
1131 tx: self.job_tx.downgrade(),
1132 }
1133 }
1134
1135 pub fn database_info(&self) -> &Database {
1136 &self.replica_ctx().database
1137 }
1138
1139 pub(crate) fn replica_ctx(&self) -> &ReplicaContext {
1140 self.module.replica_ctx()
1141 }
1142}
1143
1144impl WeakModuleHost {
1145 pub fn upgrade(&self) -> Option<ModuleHost> {
1146 let inner = self.inner.upgrade()?;
1147 let on_panic = self.on_panic.upgrade()?;
1148 let tx = self.tx.upgrade()?;
1149 Some(ModuleHost {
1150 info: self.info.clone(),
1151 module: inner,
1152 on_panic,
1153 job_tx: tx,
1154 })
1155 }
1156}