spacetimedb/host/
module_host.rs

1use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler};
2use crate::client::messages::{OneOffQueryResponseMessage, SerializableMessage};
3use crate::client::{ClientActorId, ClientConnectionSender};
4use crate::database_logger::{LogLevel, Record};
5use crate::db::relational_db::RelationalDB;
6use crate::energy::EnergyQuanta;
7use crate::error::DBError;
8use crate::estimation::estimate_rows_scanned;
9use crate::hash::Hash;
10use crate::identity::Identity;
11use crate::messages::control_db::Database;
12use crate::module_host_context::ModuleCreationContext;
13use crate::replica_context::ReplicaContext;
14use crate::sql::ast::SchemaViewer;
15use crate::sql::parser::RowLevelExpr;
16use crate::subscription::execute_plan;
17use crate::subscription::module_subscription_actor::ModuleSubscriptions;
18use crate::subscription::tx::DeltaTx;
19use crate::util::jobs::{JobCore, JobThread, JobThreadClosed, WeakJobThread};
20use crate::vm::check_row_limit;
21use crate::worker_metrics::WORKER_METRICS;
22use anyhow::Context;
23use bytes::Bytes;
24use derive_more::From;
25use indexmap::IndexSet;
26use itertools::Itertools;
27use prometheus::{Histogram, IntGauge};
28use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat};
29use spacetimedb_data_structures::error_stream::ErrorStream;
30use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
31use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
32use spacetimedb_datastore::locking_tx_datastore::MutTxId;
33use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
34use spacetimedb_execution::pipelined::PipelinedProject;
35use spacetimedb_lib::db::raw_def::v9::Lifecycle;
36use spacetimedb_lib::identity::{AuthCtx, RequestId};
37use spacetimedb_lib::metrics::ExecutionMetrics;
38use spacetimedb_lib::ConnectionId;
39use spacetimedb_lib::Timestamp;
40use spacetimedb_primitives::TableId;
41use spacetimedb_query::compile_subscription;
42use spacetimedb_sats::ProductValue;
43use spacetimedb_schema::auto_migrate::AutoMigrateError;
44use spacetimedb_schema::def::deserialize::ReducerArgsDeserializeSeed;
45use spacetimedb_schema::def::{ModuleDef, ReducerDef, TableDef};
46use spacetimedb_schema::schema::{Schema, TableSchema};
47use spacetimedb_vm::relation::RelValue;
48use std::fmt;
49use std::sync::{Arc, Weak};
50use std::time::{Duration, Instant};
51
52#[derive(Debug, Default, Clone, From)]
53pub struct DatabaseUpdate {
54    pub tables: Vec<DatabaseTableUpdate>,
55}
56
57impl FromIterator<DatabaseTableUpdate> for DatabaseUpdate {
58    fn from_iter<T: IntoIterator<Item = DatabaseTableUpdate>>(iter: T) -> Self {
59        DatabaseUpdate {
60            tables: iter.into_iter().collect(),
61        }
62    }
63}
64
65impl DatabaseUpdate {
66    pub fn is_empty(&self) -> bool {
67        if self.tables.len() == 0 {
68            return true;
69        }
70        false
71    }
72
73    pub fn from_writes(tx_data: &TxData) -> Self {
74        let mut map: IntMap<TableId, DatabaseTableUpdate> = IntMap::new();
75        let new_update = |table_id, table_name: &str| DatabaseTableUpdate {
76            table_id,
77            table_name: table_name.into(),
78            inserts: [].into(),
79            deletes: [].into(),
80        };
81        for (table_id, table_name, rows) in tx_data.inserts_with_table_name() {
82            map.entry(*table_id)
83                .or_insert_with(|| new_update(*table_id, table_name))
84                .inserts = rows.clone();
85        }
86        for (table_id, table_name, rows) in tx_data.deletes_with_table_name() {
87            map.entry(*table_id)
88                .or_insert_with(|| new_update(*table_id, table_name))
89                .deletes = rows.clone();
90        }
91        DatabaseUpdate {
92            tables: map.into_values().collect(),
93        }
94    }
95
96    /// The number of rows in the payload
97    pub fn num_rows(&self) -> usize {
98        self.tables.iter().map(|t| t.inserts.len() + t.deletes.len()).sum()
99    }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct DatabaseTableUpdate {
104    pub table_id: TableId,
105    pub table_name: Box<str>,
106    // Note: `Arc<[ProductValue]>` allows to cheaply
107    // use the values from `TxData` without cloning the
108    // contained `ProductValue`s.
109    pub inserts: Arc<[ProductValue]>,
110    pub deletes: Arc<[ProductValue]>,
111}
112
113#[derive(Debug)]
114pub struct DatabaseUpdateRelValue<'a> {
115    pub tables: Vec<DatabaseTableUpdateRelValue<'a>>,
116}
117
118#[derive(PartialEq, Debug)]
119pub struct DatabaseTableUpdateRelValue<'a> {
120    pub table_id: TableId,
121    pub table_name: Box<str>,
122    pub updates: UpdatesRelValue<'a>,
123}
124
125#[derive(Default, PartialEq, Debug)]
126pub struct UpdatesRelValue<'a> {
127    pub deletes: Vec<RelValue<'a>>,
128    pub inserts: Vec<RelValue<'a>>,
129}
130
131impl UpdatesRelValue<'_> {
132    /// Returns whether there are any updates.
133    pub fn has_updates(&self) -> bool {
134        !(self.deletes.is_empty() && self.inserts.is_empty())
135    }
136
137    pub fn encode<F: WebsocketFormat>(&self) -> (F::QueryUpdate, u64, usize) {
138        let (deletes, nr_del) = F::encode_list(self.deletes.iter());
139        let (inserts, nr_ins) = F::encode_list(self.inserts.iter());
140        let num_rows = nr_del + nr_ins;
141        let num_bytes = deletes.num_bytes() + inserts.num_bytes();
142        let qu = QueryUpdate { deletes, inserts };
143        // We don't compress individual table updates.
144        // Previously we were, but the benefits, if any, were unclear.
145        // Note, each message is still compressed before being sent to clients,
146        // but we no longer have to hold a tx lock when doing so.
147        let cqu = F::into_query_update(qu, Compression::None);
148        (cqu, num_rows, num_bytes)
149    }
150}
151
152#[derive(Debug, Clone)]
153pub enum EventStatus {
154    Committed(DatabaseUpdate),
155    Failed(String),
156    OutOfEnergy,
157}
158
159impl EventStatus {
160    pub fn database_update(&self) -> Option<&DatabaseUpdate> {
161        match self {
162            EventStatus::Committed(upd) => Some(upd),
163            _ => None,
164        }
165    }
166}
167
168#[derive(Debug, Clone, Default)]
169pub struct ModuleFunctionCall {
170    pub reducer: String,
171    pub reducer_id: ReducerId,
172    pub args: ArgsTuple,
173}
174
175#[derive(Debug, Clone)]
176pub struct ModuleEvent {
177    pub timestamp: Timestamp,
178    pub caller_identity: Identity,
179    pub caller_connection_id: Option<ConnectionId>,
180    pub function_call: ModuleFunctionCall,
181    pub status: EventStatus,
182    pub energy_quanta_used: EnergyQuanta,
183    pub host_execution_duration: Duration,
184    pub request_id: Option<RequestId>,
185    pub timer: Option<Instant>,
186}
187
188/// Information about a running module.
189pub struct ModuleInfo {
190    /// The definition of the module.
191    /// Loaded by loading the module's program from the system tables, extracting its definition,
192    /// and validating.
193    pub module_def: ModuleDef,
194    /// The identity of the module.
195    pub owner_identity: Identity,
196    /// The identity of the database.
197    pub database_identity: Identity,
198    /// The hash of the module.
199    pub module_hash: Hash,
200    /// Allows subscribing to module logs.
201    pub log_tx: tokio::sync::broadcast::Sender<bytes::Bytes>,
202    /// Subscriptions to this module.
203    pub subscriptions: ModuleSubscriptions,
204    /// Metrics handles for this module.
205    pub metrics: ModuleMetrics,
206}
207
208impl fmt::Debug for ModuleInfo {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        f.debug_struct("ModuleInfo")
211            .field("module_def", &self.module_def)
212            .field("owner_identity", &self.owner_identity)
213            .field("database_identity", &self.database_identity)
214            .field("module_hash", &self.module_hash)
215            .finish()
216    }
217}
218
219#[derive(Debug)]
220pub struct ModuleMetrics {
221    pub connected_clients: IntGauge,
222    pub ws_clients_spawned: IntGauge,
223    pub ws_clients_aborted: IntGauge,
224    pub request_round_trip_subscribe: Histogram,
225    pub request_round_trip_unsubscribe: Histogram,
226    pub request_round_trip_sql: Histogram,
227}
228
229impl ModuleMetrics {
230    fn new(db: &Identity) -> Self {
231        let connected_clients = WORKER_METRICS.connected_clients.with_label_values(db);
232        let ws_clients_spawned = WORKER_METRICS.ws_clients_spawned.with_label_values(db);
233        let ws_clients_aborted = WORKER_METRICS.ws_clients_aborted.with_label_values(db);
234        let request_round_trip_subscribe =
235            WORKER_METRICS
236                .request_round_trip
237                .with_label_values(&WorkloadType::Subscribe, db, "");
238        let request_round_trip_unsubscribe =
239            WORKER_METRICS
240                .request_round_trip
241                .with_label_values(&WorkloadType::Unsubscribe, db, "");
242        let request_round_trip_sql = WORKER_METRICS
243            .request_round_trip
244            .with_label_values(&WorkloadType::Sql, db, "");
245        Self {
246            connected_clients,
247            ws_clients_spawned,
248            ws_clients_aborted,
249            request_round_trip_subscribe,
250            request_round_trip_unsubscribe,
251            request_round_trip_sql,
252        }
253    }
254}
255
256impl ModuleInfo {
257    /// Create a new `ModuleInfo`.
258    /// Reducers are sorted alphabetically by name and assigned IDs.
259    pub fn new(
260        module_def: ModuleDef,
261        owner_identity: Identity,
262        database_identity: Identity,
263        module_hash: Hash,
264        log_tx: tokio::sync::broadcast::Sender<bytes::Bytes>,
265        subscriptions: ModuleSubscriptions,
266    ) -> Arc<Self> {
267        let metrics = ModuleMetrics::new(&database_identity);
268        Arc::new(ModuleInfo {
269            module_def,
270            owner_identity,
271            database_identity,
272            module_hash,
273            log_tx,
274            subscriptions,
275            metrics,
276        })
277    }
278}
279
280/// A bidirectional map between `Identifiers` (reducer names) and `ReducerId`s.
281/// Invariant: the reducer names are in the same order as they were declared in the `ModuleDef`.
282pub struct ReducersMap(IndexSet<Box<str>>);
283
284impl<'a> FromIterator<&'a str> for ReducersMap {
285    fn from_iter<T: IntoIterator<Item = &'a str>>(iter: T) -> Self {
286        Self(iter.into_iter().map_into().collect())
287    }
288}
289
290impl fmt::Debug for ReducersMap {
291    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292        self.0.fmt(f)
293    }
294}
295
296impl ReducersMap {
297    /// Lookup the ID for a reducer name.
298    pub fn lookup_id(&self, reducer_name: &str) -> Option<ReducerId> {
299        self.0.get_index_of(reducer_name).map(ReducerId::from)
300    }
301
302    /// Lookup the name for a reducer ID.
303    pub fn lookup_name(&self, reducer_id: ReducerId) -> Option<&str> {
304        let result = self.0.get_index(reducer_id.0 as _)?;
305        Some(&**result)
306    }
307}
308
309/// A runtime that can create modules.
310pub trait ModuleRuntime {
311    /// Creates a module based on the context `mcc`.
312    fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result<impl Module>;
313}
314
315pub trait DynModule: Send + Sync + 'static {
316    fn replica_ctx(&self) -> &Arc<ReplicaContext>;
317    fn scheduler(&self) -> &Scheduler;
318}
319
320pub trait Module: DynModule {
321    type Instance: ModuleInstance;
322    type InitialInstances<'a>: IntoIterator<Item = Self::Instance> + 'a;
323    fn initial_instances(&mut self) -> Self::InitialInstances<'_>;
324    fn info(&self) -> Arc<ModuleInfo>;
325    fn create_instance(&self) -> Self::Instance;
326}
327
328pub trait ModuleInstance: Send + 'static {
329    fn trapped(&self) -> bool;
330
331    /// Update the module instance's database to match the schema of the module instance.
332    fn update_database(
333        &mut self,
334        program: Program,
335        old_module_info: Arc<ModuleInfo>,
336    ) -> anyhow::Result<UpdateDatabaseResult>;
337
338    fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult;
339}
340
341/// Creates the table for `table_def` in `stdb`.
342pub fn create_table_from_def(
343    stdb: &RelationalDB,
344    tx: &mut MutTxId,
345    module_def: &ModuleDef,
346    table_def: &TableDef,
347) -> anyhow::Result<()> {
348    let schema = TableSchema::from_module_def(module_def, table_def, (), TableId::SENTINEL);
349    stdb.create_table(tx, schema)
350        .with_context(|| format!("failed to create table {}", &table_def.name))?;
351    Ok(())
352}
353
354/// If the module instance's replica_ctx is uninitialized, initialize it.
355fn init_database(
356    replica_ctx: &ReplicaContext,
357    module_def: &ModuleDef,
358    inst: &mut dyn ModuleInstance,
359    program: Program,
360) -> anyhow::Result<Option<ReducerCallResult>> {
361    log::debug!("init database");
362    let timestamp = Timestamp::now();
363    let stdb = &*replica_ctx.relational_db;
364    let logger = replica_ctx.logger.system_logger();
365
366    let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
367    let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity);
368    let (tx, ()) = stdb
369        .with_auto_rollback(tx, |tx| {
370            let mut table_defs: Vec<_> = module_def.tables().collect();
371            table_defs.sort_by(|a, b| a.name.cmp(&b.name));
372
373            for def in table_defs {
374                logger.info(&format!("Creating table `{}`", &def.name));
375                create_table_from_def(stdb, tx, module_def, def)?;
376            }
377            // Insert the late-bound row-level security expressions.
378            for rls in module_def.row_level_security() {
379                logger.info(&format!("Creating row level security `{}`", rls.sql));
380
381                let rls = RowLevelExpr::build_row_level_expr(tx, &auth_ctx, rls)
382                    .with_context(|| format!("failed to create row-level security: `{}`", rls.sql))?;
383                let table_id = rls.def.table_id;
384                let sql = rls.def.sql.clone();
385                stdb.create_row_level_security(tx, rls.def)
386                    .with_context(|| format!("failed to create row-level security for table `{table_id}`: `{sql}`",))?;
387            }
388
389            stdb.set_initialized(tx, replica_ctx.host_type, program)?;
390
391            anyhow::Ok(())
392        })
393        .inspect_err(|e| log::error!("{e:?}"))?;
394
395    let rcr = match module_def.lifecycle_reducer(Lifecycle::Init) {
396        None => {
397            if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
398                stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data));
399            }
400            None
401        }
402
403        Some((reducer_id, _)) => {
404            logger.info("Invoking `init` reducer");
405            let caller_identity = replica_ctx.database.owner_identity;
406            Some(inst.call_reducer(
407                Some(tx),
408                CallReducerParams {
409                    timestamp,
410                    caller_identity,
411                    caller_connection_id: ConnectionId::ZERO,
412                    client: None,
413                    request_id: None,
414                    timer: None,
415                    reducer_id,
416                    args: ArgsTuple::nullary(),
417                },
418            ))
419        }
420    };
421
422    logger.info("Database initialized");
423    Ok(rcr)
424}
425
426pub struct CallReducerParams {
427    pub timestamp: Timestamp,
428    pub caller_identity: Identity,
429    pub caller_connection_id: ConnectionId,
430    pub client: Option<Arc<ClientConnectionSender>>,
431    pub request_id: Option<RequestId>,
432    pub timer: Option<Instant>,
433    pub reducer_id: ReducerId,
434    pub args: ArgsTuple,
435}
436
437// TODO: figure out how we want to handle traps. maybe it should just not return to the LendingPool and
438//       let the get_instance logic handle it?
439struct AutoReplacingModuleInstance<T: Module> {
440    inst: T::Instance,
441    module: Arc<T>,
442}
443
444impl<T: Module> AutoReplacingModuleInstance<T> {
445    fn check_trap(&mut self) {
446        if self.inst.trapped() {
447            self.inst = self.module.create_instance()
448        }
449    }
450}
451
452impl<T: Module> ModuleInstance for AutoReplacingModuleInstance<T> {
453    fn trapped(&self) -> bool {
454        self.inst.trapped()
455    }
456    fn update_database(
457        &mut self,
458        program: Program,
459        old_module_info: Arc<ModuleInfo>,
460    ) -> anyhow::Result<UpdateDatabaseResult> {
461        let ret = self.inst.update_database(program, old_module_info);
462        self.check_trap();
463        ret
464    }
465    fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
466        let ret = self.inst.call_reducer(tx, params);
467        self.check_trap();
468        ret
469    }
470}
471
472#[derive(Clone)]
473pub struct ModuleHost {
474    pub info: Arc<ModuleInfo>,
475    module: Arc<dyn DynModule>,
476    /// Called whenever a reducer call on this host panics.
477    on_panic: Arc<dyn Fn() + Send + Sync + 'static>,
478    job_tx: JobThread<dyn ModuleInstance>,
479}
480
481impl fmt::Debug for ModuleHost {
482    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483        f.debug_struct("ModuleHost")
484            .field("info", &self.info)
485            .field("module", &Arc::as_ptr(&self.module))
486            .finish()
487    }
488}
489
490pub struct WeakModuleHost {
491    info: Arc<ModuleInfo>,
492    inner: Weak<dyn DynModule>,
493    on_panic: Weak<dyn Fn() + Send + Sync + 'static>,
494    tx: WeakJobThread<dyn ModuleInstance>,
495}
496
497#[derive(Debug)]
498pub enum UpdateDatabaseResult {
499    NoUpdateNeeded,
500    UpdatePerformed,
501    AutoMigrateError(ErrorStream<AutoMigrateError>),
502    ErrorExecutingMigration(anyhow::Error),
503}
504impl UpdateDatabaseResult {
505    /// Check if a database update was successful.
506    pub fn was_successful(&self) -> bool {
507        matches!(
508            self,
509            UpdateDatabaseResult::UpdatePerformed | UpdateDatabaseResult::NoUpdateNeeded
510        )
511    }
512}
513
514#[derive(thiserror::Error, Debug)]
515#[error("no such module")]
516pub struct NoSuchModule;
517
518#[derive(thiserror::Error, Debug)]
519pub enum ReducerCallError {
520    #[error(transparent)]
521    Args(#[from] InvalidReducerArguments),
522    #[error(transparent)]
523    NoSuchModule(#[from] NoSuchModule),
524    #[error("no such reducer")]
525    NoSuchReducer,
526    #[error("no such scheduled reducer")]
527    ScheduleReducerNotFound,
528    #[error("can't directly call special {0:?} lifecycle reducer")]
529    LifecycleReducer(Lifecycle),
530}
531
532#[derive(thiserror::Error, Debug)]
533pub enum InitDatabaseError {
534    #[error(transparent)]
535    Args(#[from] InvalidReducerArguments),
536    #[error(transparent)]
537    NoSuchModule(#[from] NoSuchModule),
538    #[error(transparent)]
539    Other(anyhow::Error),
540}
541
542#[derive(thiserror::Error, Debug)]
543pub enum ClientConnectedError {
544    #[error(transparent)]
545    ReducerCall(#[from] ReducerCallError),
546    #[error("Failed to insert `st_client` row for module without client_connected reducer: {0}")]
547    DBError(#[from] DBError),
548    #[error("Connection rejected by `client_connected` reducer: {0}")]
549    Rejected(String),
550    #[error("Insufficient energy balance to run `client_connected` reducer")]
551    OutOfEnergy,
552}
553
554impl ModuleHost {
555    pub(super) fn new(module: impl Module, on_panic: impl Fn() + Send + Sync + 'static, core: JobCore) -> Self {
556        let info = module.info();
557        let module = Arc::new(module);
558        let on_panic = Arc::new(on_panic);
559
560        let module_clone = module.clone();
561        let job_tx = core.start(
562            move || AutoReplacingModuleInstance {
563                inst: module_clone.create_instance(),
564                module: module_clone,
565            },
566            |x| x as &mut dyn ModuleInstance,
567        );
568        ModuleHost {
569            info,
570            module,
571            on_panic,
572            job_tx,
573        }
574    }
575
576    #[inline]
577    pub fn info(&self) -> &ModuleInfo {
578        &self.info
579    }
580
581    #[inline]
582    pub fn subscriptions(&self) -> &ModuleSubscriptions {
583        &self.info.subscriptions
584    }
585
586    /// Run a function on the JobThread for this module.
587    /// This would deadlock if it is called within another call to `on_module_thread`.
588    /// Since this is async, and `f` is sync, deadlocking shouldn't be a problem.
589    pub async fn on_module_thread<F, R>(&self, label: &str, f: F) -> Result<R, anyhow::Error>
590    where
591        F: FnOnce() -> R + Send + 'static,
592        R: Send + 'static,
593    {
594        // Run the provided function on the module instance.
595        // This is a convenience method that ensures the module instance is available
596        // and handles any errors that may occur.
597        self.call(label, |_| f())
598            .await
599            .map_err(|_| anyhow::Error::from(NoSuchModule))
600    }
601
602    /// Run a function on the JobThread for this module which has access to the module instance.
603    async fn call<F, R>(&self, label: &str, f: F) -> Result<R, NoSuchModule>
604    where
605        F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static,
606        R: Send + 'static,
607    {
608        // Record the time until our function starts running.
609        let queue_timer = WORKER_METRICS
610            .reducer_wait_time
611            .with_label_values(&self.info.database_identity, label)
612            .start_timer();
613        let queue_length_gauge = WORKER_METRICS
614            .instance_queue_length
615            .with_label_values(&self.info.database_identity);
616        queue_length_gauge.inc();
617        {
618            let queue_length = queue_length_gauge.get();
619            WORKER_METRICS
620                .instance_queue_length_histogram
621                .with_label_values(&self.info.database_identity)
622                .observe(queue_length as f64);
623        }
624        // Ensure that we always decrement the gauge.
625        let timer_guard = scopeguard::guard((), move |_| {
626            // Decrement the queue length gauge when we're done.
627            // This is done in a defer so that it happens even if the reducer call panics.
628            queue_length_gauge.dec();
629            queue_timer.stop_and_record();
630        });
631
632        // Operations on module instances (e.g. calling reducers) is blocking,
633        // partially because the computation can potentialyl take a long time
634        // and partially because interacting with the database requires taking
635        // a blocking lock. So, we run `f` inside of `asyncify()`, which runs
636        // the provided closure in a tokio blocking task, and bubbles up any
637        // panic that may occur.
638
639        // If a reducer call panics, we **must** ensure to call `self.on_panic`
640        // so that the module is discarded by the host controller.
641        scopeguard::defer_on_unwind!({
642            log::warn!("reducer {label} panicked");
643            (self.on_panic)();
644        });
645        self.job_tx
646            .run(move |inst| {
647                drop(timer_guard);
648                f(inst)
649            })
650            .await
651            .map_err(|_: JobThreadClosed| NoSuchModule)
652    }
653
654    pub async fn disconnect_client(&self, client_id: ClientActorId) {
655        log::trace!("disconnecting client {client_id}");
656        let this = self.clone();
657        if let Err(e) = self
658            .call("disconnect_client", move |inst| {
659                // Call the `client_disconnected` reducer, if it exists.
660                // This is a no-op if the module doesn't define such a reducer.
661                this.subscriptions().remove_subscriber(client_id);
662                this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst)
663            })
664            .await
665        {
666            log::error!("Error from client_disconnected transaction: {e}");
667        }
668    }
669
670    /// Invoke the module's `client_connected` reducer, if it has one,
671    /// and insert a new row into `st_client` for `(caller_identity, caller_connection_id)`.
672    ///
673    /// The host inspects `st_client` when restarting in order to run `client_disconnected` reducers
674    /// for clients that were connected at the time when the host went down.
675    /// This ensures that every client connection eventually has `client_disconnected` invoked.
676    ///
677    /// If this method returns `Ok`, then the client connection has been approved,
678    /// and the new row has been inserted into `st_client`.
679    ///
680    /// If this method returns `Err`, then the client connection has either failed or been rejected,
681    /// and `st_client` has not been modified.
682    /// In this case, the caller should terminate the connection.
683    pub async fn call_identity_connected(
684        &self,
685        caller_identity: Identity,
686        caller_connection_id: ConnectionId,
687    ) -> Result<(), ClientConnectedError> {
688        let me = self.clone();
689        self.call("call_identity_connected", move |inst| {
690            let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnConnect);
691
692            if let Some((reducer_id, reducer_def)) = reducer_lookup {
693                // The module defined a lifecycle reducer to handle new connections.
694                // Call this reducer.
695                // If the call fails (as in, something unexpectedly goes wrong with WASM execution),
696                // abort the connection: we can't really recover.
697                let reducer_outcome = me.call_reducer_inner_with_inst(
698                    caller_identity,
699                    Some(caller_connection_id),
700                    None,
701                    None,
702                    None,
703                    reducer_id,
704                    reducer_def,
705                    ReducerArgs::Nullary,
706                    inst,
707                )?;
708
709                match reducer_outcome.outcome {
710                    // If the reducer committed successfully, we're done.
711                    // `WasmModuleInstance::call_reducer_with_tx` has already ensured
712                    // that `st_client` is updated appropriately.
713                    //
714                    // It's necessary to spread out the responsibility for updating `st_client` in this way
715                    // because it's important that `call_identity_connected` commit at most one transaction.
716                    // A naive implementation of this method would just run the reducer first,
717                    // then insert into `st_client`,
718                    // but if we crashed in between, we'd be left in an inconsistent state
719                    // where the reducer had run but `st_client` was not yet updated.
720                    ReducerOutcome::Committed => Ok(()),
721
722                    // If the reducer returned an error or couldn't run due to insufficient energy,
723                    // abort the connection: the module code has decided it doesn't want this client.
724                    ReducerOutcome::Failed(message) => Err(ClientConnectedError::Rejected(message)),
725                    ReducerOutcome::BudgetExceeded => Err(ClientConnectedError::OutOfEnergy),
726                }
727            } else {
728                // The module doesn't define a client_connected reducer.
729                // Commit a transaction to update `st_clients`
730                // and to ensure we always have those events paired in the commitlog.
731                //
732                // This is necessary to be able to disconnect clients after a server crash.
733                let reducer_name = reducer_lookup
734                    .as_ref()
735                    .map(|(_, def)| &*def.name)
736                    .unwrap_or("__identity_connected__");
737
738                let workload = Workload::Reducer(ReducerContext {
739                    name: reducer_name.to_owned(),
740                    caller_identity,
741                    caller_connection_id,
742                    timestamp: Timestamp::now(),
743                    arg_bsatn: Bytes::new(),
744                });
745
746                let stdb = me.module.replica_ctx().relational_db.clone();
747                stdb.with_auto_commit(workload, |mut_tx| {
748                    mut_tx
749                        .insert_st_client(caller_identity, caller_connection_id)
750                        .map_err(DBError::from)
751                })
752                .inspect_err(|e| {
753                    log::error!(
754                        "`call_identity_connected`: fallback transaction to insert into `st_client` failed: {e:#?}"
755                    )
756                })
757                .map_err(Into::into)
758            }
759        })
760        .await
761        .map_err(Into::<ReducerCallError>::into)?
762    }
763
764    pub fn call_identity_disconnected_inner(
765        &self,
766        caller_identity: Identity,
767        caller_connection_id: ConnectionId,
768        inst: &mut dyn ModuleInstance,
769    ) -> Result<(), ReducerCallError> {
770        let me = self.clone();
771        let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
772
773        // A fallback transaction that deletes the client from `st_client`.
774        let fallback = || {
775            let reducer_name = reducer_lookup
776                .as_ref()
777                .map(|(_, def)| &*def.name)
778                .unwrap_or("__identity_disconnected__");
779
780            let workload = Workload::Reducer(ReducerContext {
781                name: reducer_name.to_owned(),
782                caller_identity,
783                caller_connection_id,
784                timestamp: Timestamp::now(),
785                arg_bsatn: Bytes::new(),
786            });
787            let stdb = me.module.replica_ctx().relational_db.clone();
788            let database_identity = me.info.database_identity;
789            stdb.with_auto_commit(workload, |mut_tx| {
790                mut_tx
791                    .delete_st_client(caller_identity, caller_connection_id, database_identity)
792                    .map_err(DBError::from)
793            })
794            .map_err(|err| {
795                log::error!(
796                    "`call_identity_disconnected`: fallback transaction to delete from `st_client` failed: {err}"
797                );
798                InvalidReducerArguments {
799                    err: err.into(),
800                    reducer: reducer_name.into(),
801                }
802                .into()
803            })
804        };
805
806        if let Some((reducer_id, reducer_def)) = reducer_lookup {
807            // The module defined a lifecycle reducer to handle disconnects. Call it.
808            // If it succeeds, `WasmModuleInstance::call_reducer_with_tx` has already ensured
809            // that `st_client` is updated appropriately.
810            let result = me.call_reducer_inner_with_inst(
811                caller_identity,
812                Some(caller_connection_id),
813                None,
814                None,
815                None,
816                reducer_id,
817                reducer_def,
818                ReducerArgs::Nullary,
819                inst,
820            );
821
822            // If it failed, we still need to update `st_client`: the client's not coming back.
823            // Commit a separate transaction that just updates `st_client`.
824            //
825            // It's OK for this to not be atomic with the previous transaction,
826            // since that transaction didn't commit. If we crash before committing this one,
827            // we'll run the `client_disconnected` reducer again unnecessarily,
828            // but the commitlog won't contain two invocations of it, which is what we care about.
829            match result {
830                Err(e) => {
831                    log::error!("call_reducer_inner of client_disconnected failed: {e:#?}");
832                    fallback()
833                }
834                Ok(ReducerCallResult {
835                    outcome: ReducerOutcome::Failed(_) | ReducerOutcome::BudgetExceeded,
836                    ..
837                }) => fallback(),
838
839                // If it succeeded, as mentioned above, `st_client` is already updated.
840                Ok(ReducerCallResult {
841                    outcome: ReducerOutcome::Committed,
842                    ..
843                }) => Ok(()),
844            }
845        } else {
846            // The module doesn't define a `client_disconnected` reducer.
847            // Commit a transaction to update `st_clients`.
848            fallback()
849        }
850    }
851
852    /// Invoke the module's `client_disconnected` reducer, if it has one,
853    /// and delete the client's row from `st_client`, if any.
854    ///
855    /// The host inspects `st_client` when restarting in order to run `client_disconnected` reducers
856    /// for clients that were connected at the time when the host went down.
857    /// This ensures that every client connection eventually has `client_disconnected` invoked.
858    ///
859    /// Unlike [`Self::call_identity_connected`],
860    /// this method swallows errors returned by the `client_disconnected` reducer.
861    /// The database can't reject a disconnection - the client's gone, whether the database likes it or not.
862    ///
863    /// If this method returns an error, the database is likely to wind up in a bad state,
864    /// as that means we've somehow failed to delete from `st_client`.
865    /// We cannot meaningfully handle this.
866    /// Sometimes it just means that the database no longer exists, though, which is fine.
867    pub async fn call_identity_disconnected(
868        &self,
869        caller_identity: Identity,
870        caller_connection_id: ConnectionId,
871    ) -> Result<(), ReducerCallError> {
872        let me = self.clone();
873        self.call("call_identity_disconnected", move |inst| {
874            me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst)
875        })
876        .await
877        .map_err(Into::<ReducerCallError>::into)?
878    }
879
880    async fn call_reducer_inner(
881        &self,
882        caller_identity: Identity,
883        caller_connection_id: Option<ConnectionId>,
884        client: Option<Arc<ClientConnectionSender>>,
885        request_id: Option<RequestId>,
886        timer: Option<Instant>,
887        reducer_id: ReducerId,
888        reducer_def: &ReducerDef,
889        args: ReducerArgs,
890    ) -> Result<ReducerCallResult, ReducerCallError> {
891        let reducer_seed = ReducerArgsDeserializeSeed(self.info.module_def.typespace().with_type(reducer_def));
892        let args = args.into_tuple(reducer_seed)?;
893        let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
894
895        self.call(&reducer_def.name, move |inst| {
896            inst.call_reducer(
897                None,
898                CallReducerParams {
899                    timestamp: Timestamp::now(),
900                    caller_identity,
901                    caller_connection_id,
902                    client,
903                    request_id,
904                    timer,
905                    reducer_id,
906                    args,
907                },
908            )
909        })
910        .await
911        .map_err(Into::into)
912    }
913    fn call_reducer_inner_with_inst(
914        &self,
915        caller_identity: Identity,
916        caller_connection_id: Option<ConnectionId>,
917        client: Option<Arc<ClientConnectionSender>>,
918        request_id: Option<RequestId>,
919        timer: Option<Instant>,
920        reducer_id: ReducerId,
921        reducer_def: &ReducerDef,
922        args: ReducerArgs,
923        module_instance: &mut dyn ModuleInstance,
924    ) -> Result<ReducerCallResult, ReducerCallError> {
925        let reducer_seed = ReducerArgsDeserializeSeed(self.info.module_def.typespace().with_type(reducer_def));
926        let args = args.into_tuple(reducer_seed)?;
927        let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
928
929        Ok(module_instance.call_reducer(
930            None,
931            CallReducerParams {
932                timestamp: Timestamp::now(),
933                caller_identity,
934                caller_connection_id,
935                client,
936                request_id,
937                timer,
938                reducer_id,
939                args,
940            },
941        ))
942    }
943
944    pub async fn call_reducer(
945        &self,
946        caller_identity: Identity,
947        caller_connection_id: Option<ConnectionId>,
948        client: Option<Arc<ClientConnectionSender>>,
949        request_id: Option<RequestId>,
950        timer: Option<Instant>,
951        reducer_name: &str,
952        args: ReducerArgs,
953    ) -> Result<ReducerCallResult, ReducerCallError> {
954        let res = async {
955            let (reducer_id, reducer_def) = self
956                .info
957                .module_def
958                .reducer_full(reducer_name)
959                .ok_or(ReducerCallError::NoSuchReducer)?;
960            if let Some(lifecycle) = reducer_def.lifecycle {
961                return Err(ReducerCallError::LifecycleReducer(lifecycle));
962            }
963            self.call_reducer_inner(
964                caller_identity,
965                caller_connection_id,
966                client,
967                request_id,
968                timer,
969                reducer_id,
970                reducer_def,
971                args,
972            )
973            .await
974        }
975        .await;
976
977        let log_message = match &res {
978            Err(ReducerCallError::NoSuchReducer) => Some(format!(
979                "External attempt to call nonexistent reducer \"{reducer_name}\" failed. Have you run `spacetime generate` recently?"
980            )),
981            Err(ReducerCallError::Args(_)) => Some(format!(
982                "External attempt to call reducer \"{reducer_name}\" failed, invalid arguments.\n\
983                 This is likely due to a mismatched client schema, have you run `spacetime generate` recently?",
984            )),
985            _ => None,
986        };
987        if let Some(log_message) = log_message {
988            self.inject_logs(LogLevel::Error, &log_message)
989        }
990
991        res
992    }
993
994    // Scheduled reducers require a different function here to call their reducer
995    // because their reducer arguments are stored in the database and need to be fetched
996    // within the same transaction as the reducer call.
997    pub async fn call_scheduled_reducer(
998        &self,
999        call_reducer_params: impl FnOnce(&MutTxId) -> anyhow::Result<Option<CallReducerParams>> + Send + 'static,
1000    ) -> Result<ReducerCallResult, ReducerCallError> {
1001        let db = self.module.replica_ctx().relational_db.clone();
1002        // scheduled reducer name not fetched yet, anyway this is only for logging purpose
1003        const REDUCER: &str = "scheduled_reducer";
1004        let module = self.info.clone();
1005        self.call(REDUCER, move |inst: &mut dyn ModuleInstance| {
1006            let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
1007
1008            match call_reducer_params(&mut tx) {
1009                Ok(Some(params)) => {
1010                    // Is necessary to patch the context with the actual calling reducer
1011                    let reducer_def = module
1012                        .module_def
1013                        .get_reducer_by_id(params.reducer_id)
1014                        .ok_or(ReducerCallError::ScheduleReducerNotFound)?;
1015                    let reducer = &*reducer_def.name;
1016
1017                    tx.ctx = ExecutionContext::with_workload(
1018                        tx.ctx.database_identity(),
1019                        Workload::Reducer(ReducerContext {
1020                            name: reducer.into(),
1021                            caller_identity: params.caller_identity,
1022                            caller_connection_id: params.caller_connection_id,
1023                            timestamp: Timestamp::now(),
1024                            arg_bsatn: params.args.get_bsatn().clone(),
1025                        }),
1026                    );
1027
1028                    Ok(inst.call_reducer(Some(tx), params))
1029                }
1030                Ok(None) => Err(ReducerCallError::ScheduleReducerNotFound),
1031                Err(err) => Err(ReducerCallError::Args(InvalidReducerArguments {
1032                    err,
1033                    reducer: REDUCER.into(),
1034                })),
1035            }
1036        })
1037        .await
1038        .unwrap_or_else(|e| Err(e.into()))
1039    }
1040
1041    pub fn subscribe_to_logs(&self) -> anyhow::Result<tokio::sync::broadcast::Receiver<bytes::Bytes>> {
1042        Ok(self.info().log_tx.subscribe())
1043    }
1044
1045    pub async fn init_database(&self, program: Program) -> Result<Option<ReducerCallResult>, InitDatabaseError> {
1046        let replica_ctx = self.module.replica_ctx().clone();
1047        let info = self.info.clone();
1048        self.call("<init_database>", move |inst| {
1049            init_database(&replica_ctx, &info.module_def, inst, program)
1050        })
1051        .await?
1052        .map_err(InitDatabaseError::Other)
1053    }
1054
1055    pub async fn update_database(
1056        &self,
1057        program: Program,
1058        old_module_info: Arc<ModuleInfo>,
1059    ) -> Result<UpdateDatabaseResult, anyhow::Error> {
1060        self.call("<update_database>", move |inst| {
1061            inst.update_database(program, old_module_info)
1062        })
1063        .await?
1064    }
1065
1066    pub async fn exit(&self) {
1067        self.module.scheduler().close();
1068        self.job_tx.close();
1069        self.exited().await;
1070    }
1071
1072    pub async fn exited(&self) {
1073        tokio::join!(self.module.scheduler().closed(), self.job_tx.closed());
1074    }
1075
1076    pub fn inject_logs(&self, log_level: LogLevel, message: &str) {
1077        self.replica_ctx().logger.write(
1078            log_level,
1079            &Record {
1080                ts: chrono::Utc::now(),
1081                target: None,
1082                filename: Some("external"),
1083                line_number: None,
1084                message,
1085            },
1086            &(),
1087        )
1088    }
1089
1090    /// Execute a one-off query and send the results to the given client.
1091    /// This only returns an error if there is a db-level problem.
1092    /// An error with the query itself will be sent to the client.
1093    #[tracing::instrument(level = "trace", skip_all)]
1094    pub async fn one_off_query<F: WebsocketFormat>(
1095        &self,
1096        caller_identity: Identity,
1097        query: String,
1098        client: Arc<ClientConnectionSender>,
1099        message_id: Vec<u8>,
1100        timer: Instant,
1101        // We take this because we only have a way to convert with the concrete types (Bsatn and Json)
1102        into_message: impl FnOnce(OneOffQueryResponseMessage<F>) -> SerializableMessage + Send + 'static,
1103    ) -> Result<(), anyhow::Error> {
1104        let replica_ctx = self.replica_ctx();
1105        let db = replica_ctx.relational_db.clone();
1106        let subscriptions = replica_ctx.subscriptions.clone();
1107        let auth = AuthCtx::new(replica_ctx.owner_identity, caller_identity);
1108        log::debug!("One-off query: {query}");
1109        let metrics = self
1110            .on_module_thread("one_off_query", move || {
1111                db.with_read_only(Workload::Sql, |tx| {
1112                    // We wrap the actual query in a closure so we can use ? to handle errors without making
1113                    // the entire transaction abort with an error.
1114                    let result: Result<(OneOffTable<F>, ExecutionMetrics), anyhow::Error> = (|| {
1115                        let tx = SchemaViewer::new(tx, &auth);
1116
1117                        let (
1118                            // A query may compile down to several plans.
1119                            // This happens when there are multiple RLS rules per table.
1120                            // The original query is the union of these plans.
1121                            plans,
1122                            _,
1123                            table_name,
1124                            _,
1125                        ) = compile_subscription(&query, &tx, &auth)?;
1126
1127                        // Optimize each fragment
1128                        let optimized = plans
1129                            .into_iter()
1130                            .map(|plan| plan.optimize())
1131                            .collect::<Result<Vec<_>, _>>()?;
1132
1133                        check_row_limit(
1134                            &optimized,
1135                            &db,
1136                            &tx,
1137                            // Estimate the number of rows this query will scan
1138                            |plan, tx| estimate_rows_scanned(tx, plan),
1139                            &auth,
1140                        )?;
1141
1142                        let optimized = optimized
1143                            .into_iter()
1144                            // Convert into something we can execute
1145                            .map(PipelinedProject::from)
1146                            .collect::<Vec<_>>();
1147
1148                        // Execute the union and return the results
1149                        execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx))
1150                            .map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics))
1151                            .context("One-off queries are not allowed to modify the database")
1152                    })();
1153
1154                    let total_host_execution_duration = timer.elapsed().into();
1155                    let (message, metrics): (SerializableMessage, Option<ExecutionMetrics>) = match result {
1156                        Ok((rows, metrics)) => (
1157                            into_message(OneOffQueryResponseMessage {
1158                                message_id,
1159                                error: None,
1160                                results: vec![rows],
1161                                total_host_execution_duration,
1162                            }),
1163                            Some(metrics),
1164                        ),
1165                        Err(err) => (
1166                            into_message(OneOffQueryResponseMessage {
1167                                message_id,
1168                                error: Some(format!("{err}")),
1169                                results: vec![],
1170                                total_host_execution_duration,
1171                            }),
1172                            None,
1173                        ),
1174                    };
1175
1176                    subscriptions.send_client_message(client, message, tx)?;
1177                    Ok::<Option<ExecutionMetrics>, anyhow::Error>(metrics)
1178                })
1179            })
1180            .await??;
1181
1182        if let Some(metrics) = metrics {
1183            // Record the metrics for the one-off query
1184            replica_ctx
1185                .relational_db
1186                .exec_counters_for(WorkloadType::Sql)
1187                .record(&metrics);
1188        }
1189
1190        Ok(())
1191    }
1192
1193    /// FIXME(jgilles): this is a temporary workaround for deleting not currently being supported
1194    /// for tables without primary keys. It is only used in the benchmarks.
1195    /// Note: this doesn't drop the table, it just clears it!
1196    pub fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error> {
1197        let db = &*self.replica_ctx().relational_db;
1198
1199        db.with_auto_commit(Workload::Internal, |tx| {
1200            let tables = db.get_all_tables_mut(tx)?;
1201            // We currently have unique table names,
1202            // so we can assume there's only one table to clear.
1203            if let Some(table_id) = tables
1204                .iter()
1205                .find_map(|t| (&*t.table_name == table_name).then_some(t.table_id))
1206            {
1207                db.clear_table(tx, table_id)?;
1208            }
1209            Ok(())
1210        })
1211    }
1212
1213    pub fn downgrade(&self) -> WeakModuleHost {
1214        WeakModuleHost {
1215            info: self.info.clone(),
1216            inner: Arc::downgrade(&self.module),
1217            on_panic: Arc::downgrade(&self.on_panic),
1218            tx: self.job_tx.downgrade(),
1219        }
1220    }
1221
1222    pub fn database_info(&self) -> &Database {
1223        &self.replica_ctx().database
1224    }
1225
1226    pub(crate) fn replica_ctx(&self) -> &ReplicaContext {
1227        self.module.replica_ctx()
1228    }
1229}
1230
1231impl WeakModuleHost {
1232    pub fn upgrade(&self) -> Option<ModuleHost> {
1233        let inner = self.inner.upgrade()?;
1234        let on_panic = self.on_panic.upgrade()?;
1235        let tx = self.tx.upgrade()?;
1236        Some(ModuleHost {
1237            info: self.info.clone(),
1238            module: inner,
1239            on_panic,
1240            job_tx: tx,
1241        })
1242    }
1243}