spacetimedb/host/
module_host.rs

1use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler};
2use crate::client::messages::{OneOffQueryResponseMessage, SerializableMessage};
3use crate::client::{ClientActorId, ClientConnectionSender};
4use crate::database_logger::{LogLevel, Record};
5use crate::db::datastore::locking_tx_datastore::MutTxId;
6use crate::db::datastore::traits::{IsolationLevel, Program, TxData};
7use crate::energy::EnergyQuanta;
8use crate::error::DBError;
9use crate::estimation::estimate_rows_scanned;
10use crate::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
11use crate::hash::Hash;
12use crate::identity::Identity;
13use crate::messages::control_db::Database;
14use crate::replica_context::ReplicaContext;
15use crate::sql::ast::SchemaViewer;
16use crate::sql::parser::RowLevelExpr;
17use crate::subscription::execute_plan;
18use crate::subscription::module_subscription_actor::ModuleSubscriptions;
19use crate::subscription::tx::DeltaTx;
20use crate::util::asyncify;
21use crate::util::jobs::{JobCore, JobThread, JobThreadClosed, WeakJobThread};
22use crate::vm::check_row_limit;
23use crate::worker_metrics::WORKER_METRICS;
24use anyhow::Context;
25use bytes::Bytes;
26use derive_more::From;
27use indexmap::IndexSet;
28use itertools::Itertools;
29use prometheus::{Histogram, IntGauge};
30use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat};
31use spacetimedb_data_structures::error_stream::ErrorStream;
32use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
33use spacetimedb_execution::pipelined::PipelinedProject;
34use spacetimedb_lib::db::raw_def::v9::Lifecycle;
35use spacetimedb_lib::identity::{AuthCtx, RequestId};
36use spacetimedb_lib::metrics::ExecutionMetrics;
37use spacetimedb_lib::ConnectionId;
38use spacetimedb_lib::Timestamp;
39use spacetimedb_primitives::TableId;
40use spacetimedb_query::compile_subscription;
41use spacetimedb_sats::ProductValue;
42use spacetimedb_schema::auto_migrate::AutoMigrateError;
43use spacetimedb_schema::def::deserialize::ReducerArgsDeserializeSeed;
44use spacetimedb_schema::def::{ModuleDef, ReducerDef};
45use spacetimedb_schema::schema::{Schema, TableSchema};
46use spacetimedb_vm::relation::RelValue;
47use std::fmt;
48use std::sync::{Arc, Weak};
49use std::time::{Duration, Instant};
50
51#[derive(Debug, Default, Clone, From)]
52pub struct DatabaseUpdate {
53    pub tables: Vec<DatabaseTableUpdate>,
54}
55
56impl FromIterator<DatabaseTableUpdate> for DatabaseUpdate {
57    fn from_iter<T: IntoIterator<Item = DatabaseTableUpdate>>(iter: T) -> Self {
58        DatabaseUpdate {
59            tables: iter.into_iter().collect(),
60        }
61    }
62}
63
64impl DatabaseUpdate {
65    pub fn is_empty(&self) -> bool {
66        if self.tables.len() == 0 {
67            return true;
68        }
69        false
70    }
71
72    pub fn from_writes(tx_data: &TxData) -> Self {
73        let mut map: IntMap<TableId, DatabaseTableUpdate> = IntMap::new();
74        let new_update = |table_id, table_name: &str| DatabaseTableUpdate {
75            table_id,
76            table_name: table_name.into(),
77            inserts: [].into(),
78            deletes: [].into(),
79        };
80        for (table_id, table_name, rows) in tx_data.inserts_with_table_name() {
81            map.entry(*table_id)
82                .or_insert_with(|| new_update(*table_id, table_name))
83                .inserts = rows.clone();
84        }
85        for (table_id, table_name, rows) in tx_data.deletes_with_table_name() {
86            map.entry(*table_id)
87                .or_insert_with(|| new_update(*table_id, table_name))
88                .deletes = rows.clone();
89        }
90        DatabaseUpdate {
91            tables: map.into_values().collect(),
92        }
93    }
94
95    /// The number of rows in the payload
96    pub fn num_rows(&self) -> usize {
97        self.tables.iter().map(|t| t.inserts.len() + t.deletes.len()).sum()
98    }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct DatabaseTableUpdate {
103    pub table_id: TableId,
104    pub table_name: Box<str>,
105    // Note: `Arc<[ProductValue]>` allows to cheaply
106    // use the values from `TxData` without cloning the
107    // contained `ProductValue`s.
108    pub inserts: Arc<[ProductValue]>,
109    pub deletes: Arc<[ProductValue]>,
110}
111
112#[derive(Debug)]
113pub struct DatabaseUpdateRelValue<'a> {
114    pub tables: Vec<DatabaseTableUpdateRelValue<'a>>,
115}
116
117#[derive(PartialEq, Debug)]
118pub struct DatabaseTableUpdateRelValue<'a> {
119    pub table_id: TableId,
120    pub table_name: Box<str>,
121    pub updates: UpdatesRelValue<'a>,
122}
123
124#[derive(Default, PartialEq, Debug)]
125pub struct UpdatesRelValue<'a> {
126    pub deletes: Vec<RelValue<'a>>,
127    pub inserts: Vec<RelValue<'a>>,
128}
129
130impl UpdatesRelValue<'_> {
131    /// Returns whether there are any updates.
132    pub fn has_updates(&self) -> bool {
133        !(self.deletes.is_empty() && self.inserts.is_empty())
134    }
135
136    pub fn encode<F: WebsocketFormat>(&self) -> (F::QueryUpdate, u64, usize) {
137        let (deletes, nr_del) = F::encode_list(self.deletes.iter());
138        let (inserts, nr_ins) = F::encode_list(self.inserts.iter());
139        let num_rows = nr_del + nr_ins;
140        let num_bytes = deletes.num_bytes() + inserts.num_bytes();
141        let qu = QueryUpdate { deletes, inserts };
142        // We don't compress individual table updates.
143        // Previously we were, but the benefits, if any, were unclear.
144        // Note, each message is still compressed before being sent to clients,
145        // but we no longer have to hold a tx lock when doing so.
146        let cqu = F::into_query_update(qu, Compression::None);
147        (cqu, num_rows, num_bytes)
148    }
149}
150
151#[derive(Debug, Clone)]
152pub enum EventStatus {
153    Committed(DatabaseUpdate),
154    Failed(String),
155    OutOfEnergy,
156}
157
158impl EventStatus {
159    pub fn database_update(&self) -> Option<&DatabaseUpdate> {
160        match self {
161            EventStatus::Committed(upd) => Some(upd),
162            _ => None,
163        }
164    }
165}
166
167#[derive(Debug, Clone, Default)]
168pub struct ModuleFunctionCall {
169    pub reducer: String,
170    pub reducer_id: ReducerId,
171    pub args: ArgsTuple,
172}
173
174#[derive(Debug, Clone)]
175pub struct ModuleEvent {
176    pub timestamp: Timestamp,
177    pub caller_identity: Identity,
178    pub caller_connection_id: Option<ConnectionId>,
179    pub function_call: ModuleFunctionCall,
180    pub status: EventStatus,
181    pub energy_quanta_used: EnergyQuanta,
182    pub host_execution_duration: Duration,
183    pub request_id: Option<RequestId>,
184    pub timer: Option<Instant>,
185}
186
187/// Information about a running module.
188#[derive(Debug)]
189pub struct ModuleInfo {
190    /// The definition of the module.
191    /// Loaded by loading the module's program from the system tables, extracting its definition,
192    /// and validating.
193    pub module_def: ModuleDef,
194    /// The identity of the module.
195    pub owner_identity: Identity,
196    /// The identity of the database.
197    pub database_identity: Identity,
198    /// The hash of the module.
199    pub module_hash: Hash,
200    /// Allows subscribing to module logs.
201    pub log_tx: tokio::sync::broadcast::Sender<bytes::Bytes>,
202    /// Subscriptions to this module.
203    pub subscriptions: ModuleSubscriptions,
204    /// Metrics handles for this module.
205    pub metrics: ModuleMetrics,
206}
207
208#[derive(Debug)]
209pub struct ModuleMetrics {
210    pub connected_clients: IntGauge,
211    pub ws_clients_spawned: IntGauge,
212    pub ws_clients_aborted: IntGauge,
213    pub request_round_trip_subscribe: Histogram,
214    pub request_round_trip_unsubscribe: Histogram,
215    pub request_round_trip_sql: Histogram,
216}
217
218impl ModuleMetrics {
219    fn new(db: &Identity) -> Self {
220        let connected_clients = WORKER_METRICS.connected_clients.with_label_values(db);
221        let ws_clients_spawned = WORKER_METRICS.ws_clients_spawned.with_label_values(db);
222        let ws_clients_aborted = WORKER_METRICS.ws_clients_aborted.with_label_values(db);
223        let request_round_trip_subscribe =
224            WORKER_METRICS
225                .request_round_trip
226                .with_label_values(&WorkloadType::Subscribe, db, "");
227        let request_round_trip_unsubscribe =
228            WORKER_METRICS
229                .request_round_trip
230                .with_label_values(&WorkloadType::Unsubscribe, db, "");
231        let request_round_trip_sql = WORKER_METRICS
232            .request_round_trip
233            .with_label_values(&WorkloadType::Sql, db, "");
234        Self {
235            connected_clients,
236            ws_clients_spawned,
237            ws_clients_aborted,
238            request_round_trip_subscribe,
239            request_round_trip_unsubscribe,
240            request_round_trip_sql,
241        }
242    }
243}
244
245impl ModuleInfo {
246    /// Create a new `ModuleInfo`.
247    /// Reducers are sorted alphabetically by name and assigned IDs.
248    pub fn new(
249        module_def: ModuleDef,
250        owner_identity: Identity,
251        database_identity: Identity,
252        module_hash: Hash,
253        log_tx: tokio::sync::broadcast::Sender<bytes::Bytes>,
254        subscriptions: ModuleSubscriptions,
255    ) -> Arc<Self> {
256        let metrics = ModuleMetrics::new(&database_identity);
257        Arc::new(ModuleInfo {
258            module_def,
259            owner_identity,
260            database_identity,
261            module_hash,
262            log_tx,
263            subscriptions,
264            metrics,
265        })
266    }
267}
268
269/// A bidirectional map between `Identifiers` (reducer names) and `ReducerId`s.
270/// Invariant: the reducer names are in the same order as they were declared in the `ModuleDef`.
271pub struct ReducersMap(IndexSet<Box<str>>);
272
273impl<'a> FromIterator<&'a str> for ReducersMap {
274    fn from_iter<T: IntoIterator<Item = &'a str>>(iter: T) -> Self {
275        Self(iter.into_iter().map_into().collect())
276    }
277}
278
279impl fmt::Debug for ReducersMap {
280    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
281        self.0.fmt(f)
282    }
283}
284
285impl ReducersMap {
286    /// Lookup the ID for a reducer name.
287    pub fn lookup_id(&self, reducer_name: &str) -> Option<ReducerId> {
288        self.0.get_index_of(reducer_name).map(ReducerId::from)
289    }
290
291    /// Lookup the name for a reducer ID.
292    pub fn lookup_name(&self, reducer_id: ReducerId) -> Option<&str> {
293        let result = self.0.get_index(reducer_id.0 as _)?;
294        Some(&**result)
295    }
296}
297
298pub trait DynModule: Send + Sync + 'static {
299    fn replica_ctx(&self) -> &Arc<ReplicaContext>;
300    fn scheduler(&self) -> &Scheduler;
301}
302
303pub trait Module: DynModule {
304    type Instance: ModuleInstance;
305    type InitialInstances<'a>: IntoIterator<Item = Self::Instance> + 'a;
306    fn initial_instances(&mut self) -> Self::InitialInstances<'_>;
307    fn info(&self) -> Arc<ModuleInfo>;
308    fn create_instance(&self) -> Self::Instance;
309}
310
311pub trait ModuleInstance: Send + 'static {
312    fn trapped(&self) -> bool;
313
314    /// Update the module instance's database to match the schema of the module instance.
315    fn update_database(
316        &mut self,
317        program: Program,
318        old_module_info: Arc<ModuleInfo>,
319    ) -> anyhow::Result<UpdateDatabaseResult>;
320
321    fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult;
322}
323
324/// If the module instance's replica_ctx is uninitialized, initialize it.
325fn init_database(
326    replica_ctx: &ReplicaContext,
327    module_def: &ModuleDef,
328    inst: &mut dyn ModuleInstance,
329    program: Program,
330) -> anyhow::Result<Option<ReducerCallResult>> {
331    log::debug!("init database");
332    let timestamp = Timestamp::now();
333    let stdb = &*replica_ctx.relational_db;
334    let logger = replica_ctx.logger.system_logger();
335
336    let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
337    let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity);
338    let (tx, ()) = stdb
339        .with_auto_rollback(tx, |tx| {
340            let mut table_defs: Vec<_> = module_def.tables().collect();
341            table_defs.sort_by(|a, b| a.name.cmp(&b.name));
342
343            for def in table_defs {
344                let table_name = &def.name;
345                logger.info(&format!("Creating table `{table_name}`"));
346                let schema = TableSchema::from_module_def(module_def, def, (), TableId::SENTINEL);
347                stdb.create_table(tx, schema)
348                    .with_context(|| format!("failed to create table {table_name}"))?;
349            }
350            // Insert the late-bound row-level security expressions.
351            for rls in module_def.row_level_security() {
352                logger.info(&format!("Creating row level security `{}`", rls.sql));
353
354                let rls = RowLevelExpr::build_row_level_expr(tx, &auth_ctx, rls)
355                    .with_context(|| format!("failed to create row-level security: `{}`", rls.sql))?;
356                let table_id = rls.def.table_id;
357                let sql = rls.def.sql.clone();
358                stdb.create_row_level_security(tx, rls.def)
359                    .with_context(|| format!("failed to create row-level security for table `{table_id}`: `{sql}`",))?;
360            }
361
362            stdb.set_initialized(tx, replica_ctx.host_type, program)?;
363
364            anyhow::Ok(())
365        })
366        .inspect_err(|e| log::error!("{e:?}"))?;
367
368    let rcr = match module_def.lifecycle_reducer(Lifecycle::Init) {
369        None => {
370            if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
371                stdb.report(&reducer, &tx_metrics, Some(&tx_data));
372            }
373            None
374        }
375
376        Some((reducer_id, _)) => {
377            logger.info("Invoking `init` reducer");
378            let caller_identity = replica_ctx.database.owner_identity;
379            Some(inst.call_reducer(
380                Some(tx),
381                CallReducerParams {
382                    timestamp,
383                    caller_identity,
384                    caller_connection_id: ConnectionId::ZERO,
385                    client: None,
386                    request_id: None,
387                    timer: None,
388                    reducer_id,
389                    args: ArgsTuple::nullary(),
390                },
391            ))
392        }
393    };
394
395    logger.info("Database initialized");
396    Ok(rcr)
397}
398
399pub struct CallReducerParams {
400    pub timestamp: Timestamp,
401    pub caller_identity: Identity,
402    pub caller_connection_id: ConnectionId,
403    pub client: Option<Arc<ClientConnectionSender>>,
404    pub request_id: Option<RequestId>,
405    pub timer: Option<Instant>,
406    pub reducer_id: ReducerId,
407    pub args: ArgsTuple,
408}
409
410// TODO: figure out how we want to handle traps. maybe it should just not return to the LendingPool and
411//       let the get_instance logic handle it?
412struct AutoReplacingModuleInstance<T: Module> {
413    inst: T::Instance,
414    module: Arc<T>,
415}
416
417impl<T: Module> AutoReplacingModuleInstance<T> {
418    fn check_trap(&mut self) {
419        if self.inst.trapped() {
420            self.inst = self.module.create_instance()
421        }
422    }
423}
424
425impl<T: Module> ModuleInstance for AutoReplacingModuleInstance<T> {
426    fn trapped(&self) -> bool {
427        self.inst.trapped()
428    }
429    fn update_database(
430        &mut self,
431        program: Program,
432        old_module_info: Arc<ModuleInfo>,
433    ) -> anyhow::Result<UpdateDatabaseResult> {
434        let ret = self.inst.update_database(program, old_module_info);
435        self.check_trap();
436        ret
437    }
438    fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
439        let ret = self.inst.call_reducer(tx, params);
440        self.check_trap();
441        ret
442    }
443}
444
445#[derive(Clone)]
446pub struct ModuleHost {
447    pub info: Arc<ModuleInfo>,
448    module: Arc<dyn DynModule>,
449    /// Called whenever a reducer call on this host panics.
450    on_panic: Arc<dyn Fn() + Send + Sync + 'static>,
451    job_tx: JobThread<dyn ModuleInstance>,
452}
453
454impl fmt::Debug for ModuleHost {
455    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
456        f.debug_struct("ModuleHost")
457            .field("info", &self.info)
458            .field("module", &Arc::as_ptr(&self.module))
459            .finish()
460    }
461}
462
463pub struct WeakModuleHost {
464    info: Arc<ModuleInfo>,
465    inner: Weak<dyn DynModule>,
466    on_panic: Weak<dyn Fn() + Send + Sync + 'static>,
467    tx: WeakJobThread<dyn ModuleInstance>,
468}
469
470#[derive(Debug)]
471pub enum UpdateDatabaseResult {
472    NoUpdateNeeded,
473    UpdatePerformed,
474    AutoMigrateError(ErrorStream<AutoMigrateError>),
475    ErrorExecutingMigration(anyhow::Error),
476}
477impl UpdateDatabaseResult {
478    /// Check if a database update was successful.
479    pub fn was_successful(&self) -> bool {
480        matches!(
481            self,
482            UpdateDatabaseResult::UpdatePerformed | UpdateDatabaseResult::NoUpdateNeeded
483        )
484    }
485}
486
487#[derive(thiserror::Error, Debug)]
488#[error("no such module")]
489pub struct NoSuchModule;
490
491#[derive(thiserror::Error, Debug)]
492pub enum ReducerCallError {
493    #[error(transparent)]
494    Args(#[from] InvalidReducerArguments),
495    #[error(transparent)]
496    NoSuchModule(#[from] NoSuchModule),
497    #[error("no such reducer")]
498    NoSuchReducer,
499    #[error("no such scheduled reducer")]
500    ScheduleReducerNotFound,
501    #[error("can't directly call special {0:?} lifecycle reducer")]
502    LifecycleReducer(Lifecycle),
503}
504
505#[derive(thiserror::Error, Debug)]
506pub enum InitDatabaseError {
507    #[error(transparent)]
508    Args(#[from] InvalidReducerArguments),
509    #[error(transparent)]
510    NoSuchModule(#[from] NoSuchModule),
511    #[error(transparent)]
512    Other(anyhow::Error),
513}
514
515#[derive(thiserror::Error, Debug)]
516pub enum ClientConnectedError {
517    #[error(transparent)]
518    ReducerCall(#[from] ReducerCallError),
519    #[error("Failed to insert `st_client` row for module without client_connected reducer: {0}")]
520    DBError(#[from] DBError),
521    #[error("Connection rejected by `client_connected` reducer: {0}")]
522    Rejected(String),
523    #[error("Insufficient energy balance to run `client_connected` reducer")]
524    OutOfEnergy,
525}
526
527impl ModuleHost {
528    pub(super) fn new(module: impl Module, on_panic: impl Fn() + Send + Sync + 'static, core: JobCore) -> Self {
529        let info = module.info();
530        let module = Arc::new(module);
531        let on_panic = Arc::new(on_panic);
532
533        let module_clone = module.clone();
534        let job_tx = core.start(
535            move || AutoReplacingModuleInstance {
536                inst: module_clone.create_instance(),
537                module: module_clone,
538            },
539            |x| x as &mut dyn ModuleInstance,
540        );
541        ModuleHost {
542            info,
543            module,
544            on_panic,
545            job_tx,
546        }
547    }
548
549    #[inline]
550    pub fn info(&self) -> &ModuleInfo {
551        &self.info
552    }
553
554    #[inline]
555    pub fn subscriptions(&self) -> &ModuleSubscriptions {
556        &self.info.subscriptions
557    }
558
559    async fn call<F, R>(&self, reducer: &str, f: F) -> Result<R, NoSuchModule>
560    where
561        F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static,
562        R: Send + 'static,
563    {
564        // Record the time until our function starts running.
565        let queue_timer = WORKER_METRICS
566            .reducer_wait_time
567            .with_label_values(&self.info.database_identity, reducer)
568            .start_timer();
569        let queue_length_gauge = WORKER_METRICS
570            .instance_queue_length
571            .with_label_values(&self.info.database_identity);
572        queue_length_gauge.inc();
573        {
574            let queue_length = queue_length_gauge.get();
575            WORKER_METRICS
576                .instance_queue_length_histogram
577                .with_label_values(&self.info.database_identity)
578                .observe(queue_length as f64);
579        }
580        // Ensure that we always decrement the gauge.
581        let timer_guard = scopeguard::guard((), move |_| {
582            // Decrement the queue length gauge when we're done.
583            // This is done in a defer so that it happens even if the reducer call panics.
584            queue_length_gauge.dec();
585            queue_timer.stop_and_record();
586        });
587
588        // Operations on module instances (e.g. calling reducers) is blocking,
589        // partially because the computation can potentialyl take a long time
590        // and partially because interacting with the database requires taking
591        // a blocking lock. So, we run `f` inside of `asyncify()`, which runs
592        // the provided closure in a tokio blocking task, and bubbles up any
593        // panic that may occur.
594
595        // If a reducer call panics, we **must** ensure to call `self.on_panic`
596        // so that the module is discarded by the host controller.
597        scopeguard::defer_on_unwind!({
598            log::warn!("reducer {reducer} panicked");
599            (self.on_panic)();
600        });
601        self.job_tx
602            .run(move |inst| {
603                drop(timer_guard);
604                f(inst)
605            })
606            .await
607            .map_err(|_: JobThreadClosed| NoSuchModule)
608    }
609
610    pub async fn disconnect_client(&self, client_id: ClientActorId) {
611        log::trace!("disconnecting client {}", client_id);
612        let this = self.clone();
613        asyncify(move || this.subscriptions().remove_subscriber(client_id)).await;
614        // ignore NoSuchModule; if the module's already closed, that's fine
615        if let Err(e) = self
616            .call_identity_disconnected(client_id.identity, client_id.connection_id)
617            .await
618        {
619            log::error!("Error from client_disconnected transaction: {e}");
620        }
621    }
622
623    /// Invoke the module's `client_connected` reducer, if it has one,
624    /// and insert a new row into `st_client` for `(caller_identity, caller_connection_id)`.
625    ///
626    /// The host inspects `st_client` when restarting in order to run `client_disconnected` reducers
627    /// for clients that were connected at the time when the host went down.
628    /// This ensures that every client connection eventually has `client_disconnected` invoked.
629    ///
630    /// If this method returns `Ok`, then the client connection has been approved,
631    /// and the new row has been inserted into `st_client`.
632    ///
633    /// If this method returns `Err`, then the client connection has either failed or been rejected,
634    /// and `st_client` has not been modified.
635    /// In this case, the caller should terminate the connection.
636    pub async fn call_identity_connected(
637        &self,
638        caller_identity: Identity,
639        caller_connection_id: ConnectionId,
640    ) -> Result<(), ClientConnectedError> {
641        let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnConnect);
642
643        if let Some((reducer_id, reducer_def)) = reducer_lookup {
644            // The module defined a lifecycle reducer to handle new connections.
645            // Call this reducer.
646            // If the call fails (as in, something unexpectedly goes wrong with WASM execution),
647            // abort the connection: we can't really recover.
648            let reducer_outcome = self
649                .call_reducer_inner(
650                    caller_identity,
651                    Some(caller_connection_id),
652                    None,
653                    None,
654                    None,
655                    reducer_id,
656                    reducer_def,
657                    ReducerArgs::Nullary,
658                )
659                .await?;
660
661            match reducer_outcome.outcome {
662                // If the reducer committed successfully, we're done.
663                // `WasmModuleInstance::call_reducer_with_tx` has already ensured
664                // that `st_client` is updated appropriately.
665                //
666                // It's necessary to spread out the responsibility for updating `st_client` in this way
667                // because it's important that `call_identity_connected` commit at most one transaction.
668                // A naive implementation of this method would just run the reducer first,
669                // then insert into `st_client`,
670                // but if we crashed in between, we'd be left in an inconsistent state
671                // where the reducer had run but `st_client` was not yet updated.
672                ReducerOutcome::Committed => Ok(()),
673
674                // If the reducer returned an error or couldn't run due to insufficient energy,
675                // abort the connection: the module code has decided it doesn't want this client.
676                ReducerOutcome::Failed(message) => Err(ClientConnectedError::Rejected(message)),
677                ReducerOutcome::BudgetExceeded => Err(ClientConnectedError::OutOfEnergy),
678            }
679        } else {
680            // The module doesn't define a client_connected reducer.
681            // Commit a transaction to update `st_clients`
682            // and to ensure we always have those events paired in the commitlog.
683            //
684            // This is necessary to be able to disconnect clients after a server crash.
685            let reducer_name = reducer_lookup
686                .as_ref()
687                .map(|(_, def)| &*def.name)
688                .unwrap_or("__identity_connected__");
689
690            let workload = Workload::Reducer(ReducerContext {
691                name: reducer_name.to_owned(),
692                caller_identity,
693                caller_connection_id,
694                timestamp: Timestamp::now(),
695                arg_bsatn: Bytes::new(),
696            });
697
698            let stdb = self.module.replica_ctx().relational_db.clone();
699            asyncify(move || {
700                stdb.with_auto_commit(workload, |mut_tx| {
701                    mut_tx
702                        .insert_st_client(caller_identity, caller_connection_id)
703                        .map_err(DBError::from)
704                })
705            })
706            .await
707            .inspect_err(|e| {
708                log::error!("`call_identity_connected`: fallback transaction to insert into `st_client` failed: {e:#?}")
709            })
710            .map_err(DBError::from)
711            .map_err(Into::into)
712        }
713    }
714
715    /// Invoke the module's `client_disconnected` reducer, if it has one,
716    /// and delete the client's row from `st_client`, if any.
717    ///
718    /// The host inspects `st_client` when restarting in order to run `client_disconnected` reducers
719    /// for clients that were connected at the time when the host went down.
720    /// This ensures that every client connection eventually has `client_disconnected` invoked.
721    ///
722    /// Unlike [`Self::call_identity_connected`],
723    /// this method swallows errors returned by the `client_disconnected` reducer.
724    /// The database can't reject a disconnection - the client's gone, whether the database likes it or not.
725    ///
726    /// If this method returns an error, the database is likely to wind up in a bad state,
727    /// as that means we've somehow failed to delete from `st_client`.
728    /// We cannot meaningfully handle this.
729    /// Sometimes it just means that the database no longer exists, though, which is fine.
730    pub async fn call_identity_disconnected(
731        &self,
732        caller_identity: Identity,
733        caller_connection_id: ConnectionId,
734    ) -> Result<(), ReducerCallError> {
735        let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
736
737        // A fallback transaction that deletes the client from `st_client`.
738        let fallback = || async {
739            let reducer_name = reducer_lookup
740                .as_ref()
741                .map(|(_, def)| &*def.name)
742                .unwrap_or("__identity_disconnected__");
743
744            let workload = Workload::Reducer(ReducerContext {
745                name: reducer_name.to_owned(),
746                caller_identity,
747                caller_connection_id,
748                timestamp: Timestamp::now(),
749                arg_bsatn: Bytes::new(),
750            });
751            let stdb = self.module.replica_ctx().relational_db.clone();
752            let database_identity = self.info.database_identity;
753            asyncify(move || {
754                stdb.with_auto_commit(workload, |mut_tx| {
755                    mut_tx
756                        .delete_st_client(caller_identity, caller_connection_id, database_identity)
757                        .map_err(DBError::from)
758                })
759            })
760            .await
761            .map_err(|err| {
762                log::error!(
763                    "`call_identity_disconnected`: fallback transaction to delete from `st_client` failed: {err}"
764                );
765                InvalidReducerArguments {
766                    err: err.into(),
767                    reducer: reducer_name.into(),
768                }
769                .into()
770            })
771        };
772
773        if let Some((reducer_id, reducer_def)) = reducer_lookup {
774            // The module defined a lifecycle reducer to handle disconnects. Call it.
775            // If it succeeds, `WasmModuleInstance::call_reducer_with_tx` has already ensured
776            // that `st_client` is updated appropriately.
777            let result = self
778                .call_reducer_inner(
779                    caller_identity,
780                    Some(caller_connection_id),
781                    None,
782                    None,
783                    None,
784                    reducer_id,
785                    reducer_def,
786                    ReducerArgs::Nullary,
787                )
788                .await;
789
790            // If it failed, we still need to update `st_client`: the client's not coming back.
791            // Commit a separate transaction that just updates `st_client`.
792            //
793            // It's OK for this to not be atomic with the previous transaction,
794            // since that transaction didn't commit. If we crash before committing this one,
795            // we'll run the `client_disconnected` reducer again unnecessarily,
796            // but the commitlog won't contain two invocations of it, which is what we care about.
797            match result {
798                Err(e) => {
799                    log::error!("call_reducer_inner of client_disconnected failed: {e:#?}");
800                    fallback().await
801                }
802                Ok(ReducerCallResult {
803                    outcome: ReducerOutcome::Failed(_) | ReducerOutcome::BudgetExceeded,
804                    ..
805                }) => fallback().await,
806
807                // If it succeeded, as mentioned above, `st_client` is already updated.
808                Ok(ReducerCallResult {
809                    outcome: ReducerOutcome::Committed,
810                    ..
811                }) => Ok(()),
812            }
813        } else {
814            // The module doesn't define a `client_disconnected` reducer.
815            // Commit a transaction to update `st_clients`.
816            fallback().await
817        }
818    }
819
820    async fn call_reducer_inner(
821        &self,
822        caller_identity: Identity,
823        caller_connection_id: Option<ConnectionId>,
824        client: Option<Arc<ClientConnectionSender>>,
825        request_id: Option<RequestId>,
826        timer: Option<Instant>,
827        reducer_id: ReducerId,
828        reducer_def: &ReducerDef,
829        args: ReducerArgs,
830    ) -> Result<ReducerCallResult, ReducerCallError> {
831        let reducer_seed = ReducerArgsDeserializeSeed(self.info.module_def.typespace().with_type(reducer_def));
832        let args = args.into_tuple(reducer_seed)?;
833        let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
834
835        self.call(&reducer_def.name, move |inst| {
836            inst.call_reducer(
837                None,
838                CallReducerParams {
839                    timestamp: Timestamp::now(),
840                    caller_identity,
841                    caller_connection_id,
842                    client,
843                    request_id,
844                    timer,
845                    reducer_id,
846                    args,
847                },
848            )
849        })
850        .await
851        .map_err(Into::into)
852    }
853
854    pub async fn call_reducer(
855        &self,
856        caller_identity: Identity,
857        caller_connection_id: Option<ConnectionId>,
858        client: Option<Arc<ClientConnectionSender>>,
859        request_id: Option<RequestId>,
860        timer: Option<Instant>,
861        reducer_name: &str,
862        args: ReducerArgs,
863    ) -> Result<ReducerCallResult, ReducerCallError> {
864        let res = async {
865            let (reducer_id, reducer_def) = self
866                .info
867                .module_def
868                .reducer_full(reducer_name)
869                .ok_or(ReducerCallError::NoSuchReducer)?;
870            if let Some(lifecycle) = reducer_def.lifecycle {
871                return Err(ReducerCallError::LifecycleReducer(lifecycle));
872            }
873            self.call_reducer_inner(
874                caller_identity,
875                caller_connection_id,
876                client,
877                request_id,
878                timer,
879                reducer_id,
880                reducer_def,
881                args,
882            )
883            .await
884        }
885        .await;
886
887        let log_message = match &res {
888            Err(ReducerCallError::NoSuchReducer) => Some(format!(
889                "External attempt to call nonexistent reducer \"{}\" failed. Have you run `spacetime generate` recently?",
890                reducer_name
891            )),
892            Err(ReducerCallError::Args(_)) => Some(format!(
893                "External attempt to call reducer \"{}\" failed, invalid arguments.\n\
894                 This is likely due to a mismatched client schema, have you run `spacetime generate` recently?",
895                reducer_name,
896            )),
897            _ => None,
898        };
899        if let Some(log_message) = log_message {
900            self.inject_logs(LogLevel::Error, &log_message)
901        }
902
903        res
904    }
905
906    // Scheduled reducers require a different function here to call their reducer
907    // because their reducer arguments are stored in the database and need to be fetched
908    // within the same transaction as the reducer call.
909    pub async fn call_scheduled_reducer(
910        &self,
911        call_reducer_params: impl FnOnce(&MutTxId) -> anyhow::Result<Option<CallReducerParams>> + Send + 'static,
912    ) -> Result<ReducerCallResult, ReducerCallError> {
913        let db = self.module.replica_ctx().relational_db.clone();
914        // scheduled reducer name not fetched yet, anyway this is only for logging purpose
915        const REDUCER: &str = "scheduled_reducer";
916        let module = self.info.clone();
917        self.call(REDUCER, move |inst: &mut dyn ModuleInstance| {
918            let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
919
920            match call_reducer_params(&mut tx) {
921                Ok(Some(params)) => {
922                    // Is necessary to patch the context with the actual calling reducer
923                    let reducer_def = module
924                        .module_def
925                        .get_reducer_by_id(params.reducer_id)
926                        .ok_or(ReducerCallError::ScheduleReducerNotFound)?;
927                    let reducer = &*reducer_def.name;
928
929                    tx.ctx = ExecutionContext::with_workload(
930                        tx.ctx.database_identity(),
931                        Workload::Reducer(ReducerContext {
932                            name: reducer.into(),
933                            caller_identity: params.caller_identity,
934                            caller_connection_id: params.caller_connection_id,
935                            timestamp: Timestamp::now(),
936                            arg_bsatn: params.args.get_bsatn().clone(),
937                        }),
938                    );
939
940                    Ok(inst.call_reducer(Some(tx), params))
941                }
942                Ok(None) => Err(ReducerCallError::ScheduleReducerNotFound),
943                Err(err) => Err(ReducerCallError::Args(InvalidReducerArguments {
944                    err,
945                    reducer: REDUCER.into(),
946                })),
947            }
948        })
949        .await
950        .unwrap_or_else(|e| Err(e.into()))
951        .map_err(Into::into)
952    }
953
954    pub fn subscribe_to_logs(&self) -> anyhow::Result<tokio::sync::broadcast::Receiver<bytes::Bytes>> {
955        Ok(self.info().log_tx.subscribe())
956    }
957
958    pub async fn init_database(&self, program: Program) -> Result<Option<ReducerCallResult>, InitDatabaseError> {
959        let replica_ctx = self.module.replica_ctx().clone();
960        let info = self.info.clone();
961        self.call("<init_database>", move |inst| {
962            init_database(&replica_ctx, &info.module_def, inst, program)
963        })
964        .await?
965        .map_err(InitDatabaseError::Other)
966    }
967
968    pub async fn update_database(
969        &self,
970        program: Program,
971        old_module_info: Arc<ModuleInfo>,
972    ) -> Result<UpdateDatabaseResult, anyhow::Error> {
973        self.call("<update_database>", move |inst| {
974            inst.update_database(program, old_module_info)
975        })
976        .await?
977        .map_err(Into::into)
978    }
979
980    pub async fn exit(&self) {
981        self.module.scheduler().close();
982        self.job_tx.close();
983        self.exited().await;
984    }
985
986    pub async fn exited(&self) {
987        tokio::join!(self.module.scheduler().closed(), self.job_tx.closed());
988    }
989
990    pub fn inject_logs(&self, log_level: LogLevel, message: &str) {
991        self.replica_ctx().logger.write(
992            log_level,
993            &Record {
994                ts: chrono::Utc::now(),
995                target: None,
996                filename: Some("external"),
997                line_number: None,
998                message,
999            },
1000            &(),
1001        )
1002    }
1003
1004    /// Execute a one-off query and send the results to the given client.
1005    /// This only returns an error if there is a db-level problem.
1006    /// An error with the query itself will be sent to the client.
1007    #[tracing::instrument(level = "trace", skip_all)]
1008    pub async fn one_off_query<F: WebsocketFormat>(
1009        &self,
1010        caller_identity: Identity,
1011        query: String,
1012        client: Arc<ClientConnectionSender>,
1013        message_id: Vec<u8>,
1014        timer: Instant,
1015        // We take this because we only have a way to convert with the concrete types (Bsatn and Json)
1016        into_message: impl FnOnce(OneOffQueryResponseMessage<F>) -> SerializableMessage + Send + 'static,
1017    ) -> Result<(), anyhow::Error> {
1018        let replica_ctx = self.replica_ctx();
1019        let db = replica_ctx.relational_db.clone();
1020        let subscriptions = replica_ctx.subscriptions.clone();
1021        let auth = AuthCtx::new(replica_ctx.owner_identity, caller_identity);
1022        log::debug!("One-off query: {query}");
1023        let metrics = asyncify(move || {
1024            db.with_read_only(Workload::Sql, |tx| {
1025                // We wrap the actual query in a closure so we can use ? to handle errors without making
1026                // the entire transaction abort with an error.
1027                let result: Result<(OneOffTable<F>, ExecutionMetrics), anyhow::Error> = (|| {
1028                    let tx = SchemaViewer::new(tx, &auth);
1029
1030                    let (
1031                        // A query may compile down to several plans.
1032                        // This happens when there are multiple RLS rules per table.
1033                        // The original query is the union of these plans.
1034                        plans,
1035                        _,
1036                        table_name,
1037                        _,
1038                    ) = compile_subscription(&query, &tx, &auth)?;
1039
1040                    // Optimize each fragment
1041                    let optimized = plans
1042                        .into_iter()
1043                        .map(|plan| plan.optimize())
1044                        .collect::<Result<Vec<_>, _>>()?;
1045
1046                    check_row_limit(
1047                        &optimized,
1048                        &db,
1049                        &tx,
1050                        // Estimate the number of rows this query will scan
1051                        |plan, tx| estimate_rows_scanned(tx, plan),
1052                        &auth,
1053                    )?;
1054
1055                    let optimized = optimized
1056                        .into_iter()
1057                        // Convert into something we can execute
1058                        .map(PipelinedProject::from)
1059                        .collect::<Vec<_>>();
1060
1061                    // Execute the union and return the results
1062                    execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx))
1063                        .map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics))
1064                        .context("One-off queries are not allowed to modify the database")
1065                })();
1066
1067                let total_host_execution_duration = timer.elapsed().into();
1068                let (message, metrics): (SerializableMessage, Option<ExecutionMetrics>) = match result {
1069                    Ok((rows, metrics)) => (
1070                        into_message(OneOffQueryResponseMessage {
1071                            message_id,
1072                            error: None,
1073                            results: vec![rows],
1074                            total_host_execution_duration,
1075                        }),
1076                        Some(metrics),
1077                    ),
1078                    Err(err) => (
1079                        into_message(OneOffQueryResponseMessage {
1080                            message_id,
1081                            error: Some(format!("{}", err)),
1082                            results: vec![],
1083                            total_host_execution_duration,
1084                        }),
1085                        None,
1086                    ),
1087                };
1088
1089                subscriptions.send_client_message(client, message, tx)?;
1090                Ok::<Option<ExecutionMetrics>, anyhow::Error>(metrics)
1091            })
1092        })
1093        .await?;
1094
1095        if let Some(metrics) = metrics {
1096            // Record the metrics for the one-off query
1097            replica_ctx
1098                .relational_db
1099                .exec_counters_for(WorkloadType::Sql)
1100                .record(&metrics);
1101        }
1102
1103        Ok(())
1104    }
1105
1106    /// FIXME(jgilles): this is a temporary workaround for deleting not currently being supported
1107    /// for tables without primary keys. It is only used in the benchmarks.
1108    /// Note: this doesn't drop the table, it just clears it!
1109    pub fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error> {
1110        let db = &*self.replica_ctx().relational_db;
1111
1112        db.with_auto_commit(Workload::Internal, |tx| {
1113            let tables = db.get_all_tables_mut(tx)?;
1114            // We currently have unique table names,
1115            // so we can assume there's only one table to clear.
1116            if let Some(table_id) = tables
1117                .iter()
1118                .find_map(|t| (&*t.table_name == table_name).then_some(t.table_id))
1119            {
1120                db.clear_table(tx, table_id)?;
1121            }
1122            Ok(())
1123        })
1124    }
1125
1126    pub fn downgrade(&self) -> WeakModuleHost {
1127        WeakModuleHost {
1128            info: self.info.clone(),
1129            inner: Arc::downgrade(&self.module),
1130            on_panic: Arc::downgrade(&self.on_panic),
1131            tx: self.job_tx.downgrade(),
1132        }
1133    }
1134
1135    pub fn database_info(&self) -> &Database {
1136        &self.replica_ctx().database
1137    }
1138
1139    pub(crate) fn replica_ctx(&self) -> &ReplicaContext {
1140        self.module.replica_ctx()
1141    }
1142}
1143
1144impl WeakModuleHost {
1145    pub fn upgrade(&self) -> Option<ModuleHost> {
1146        let inner = self.inner.upgrade()?;
1147        let on_panic = self.on_panic.upgrade()?;
1148        let tx = self.tx.upgrade()?;
1149        Some(ModuleHost {
1150            info: self.info.clone(),
1151            module: inner,
1152            on_panic,
1153            job_tx: tx,
1154        })
1155    }
1156}