spacetimedb/host/
host_controller.rs

1use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule};
2use super::scheduler::SchedulerStarter;
3use super::wasmtime::WasmtimeRuntime;
4use super::{Scheduler, UpdateDatabaseResult};
5use crate::database_logger::DatabaseLogger;
6use crate::db::datastore::traits::Program;
7use crate::db::db_metrics::data_size::DATA_SIZE_METRICS;
8use crate::db::db_metrics::DB_METRICS;
9use crate::db::relational_db::{self, DiskSizeFn, RelationalDB, Txdata};
10use crate::db::{self};
11use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor};
12use crate::messages::control_db::{Database, HostType};
13use crate::module_host_context::ModuleCreationContext;
14use crate::replica_context::ReplicaContext;
15use crate::subscription::module_subscription_actor::ModuleSubscriptions;
16use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
17use crate::util::asyncify;
18use crate::util::jobs::{JobCore, JobCores};
19use crate::worker_metrics::WORKER_METRICS;
20use anyhow::{anyhow, ensure, Context};
21use async_trait::async_trait;
22use durability::{Durability, EmptyHistory};
23use log::{info, trace, warn};
24use parking_lot::Mutex;
25use spacetimedb_data_structures::map::IntMap;
26use spacetimedb_durability::{self as durability, TxOffset};
27use spacetimedb_lib::{hash_bytes, Identity};
28use spacetimedb_paths::server::{ReplicaDir, ServerDataDir};
29use spacetimedb_paths::FromPathUnchecked;
30use spacetimedb_sats::hash::Hash;
31use spacetimedb_schema::def::ModuleDef;
32use spacetimedb_table::page_pool::PagePool;
33use std::future::Future;
34use std::ops::Deref;
35use std::sync::Arc;
36use std::time::{Duration, Instant};
37use tempfile::TempDir;
38use tokio::sync::{watch, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock as AsyncRwLock};
39use tokio::task::AbortHandle;
40
41// TODO:
42//
43// - [db::Config] should be per-[Database]
44
45/// A shared mutable cell containing a module host and associated database.
46type HostCell = Arc<AsyncRwLock<Option<Host>>>;
47
48/// The registry of all running hosts.
49type Hosts = Arc<Mutex<IntMap<u64, HostCell>>>;
50
51pub type ExternalDurability = (Arc<dyn Durability<TxData = Txdata>>, DiskSizeFn);
52
53pub type StartSnapshotWatcher = Box<dyn FnOnce(watch::Receiver<TxOffset>)>;
54
55#[async_trait]
56pub trait DurabilityProvider: Send + Sync + 'static {
57    async fn durability(&self, replica_id: u64) -> anyhow::Result<(ExternalDurability, Option<StartSnapshotWatcher>)>;
58}
59
60#[async_trait]
61pub trait ExternalStorage: Send + Sync + 'static {
62    async fn lookup(&self, program_hash: Hash) -> anyhow::Result<Option<Box<[u8]>>>;
63}
64#[async_trait]
65impl<F, Fut> ExternalStorage for F
66where
67    F: Fn(Hash) -> Fut + Send + Sync + 'static,
68    Fut: Future<Output = anyhow::Result<Option<Box<[u8]>>>> + Send,
69{
70    async fn lookup(&self, program_hash: Hash) -> anyhow::Result<Option<Box<[u8]>>> {
71        self(program_hash).await
72    }
73}
74
75pub type ProgramStorage = Arc<dyn ExternalStorage>;
76
77/// A host controller manages the lifecycle of spacetime databases and their
78/// associated modules.
79///
80/// This type is, and must remain, cheap to clone.
81/// All of its fields should either be [`Copy`], enclosed in an [`Arc`],
82/// or have some other fast [`Clone`] implementation.
83#[derive(Clone)]
84pub struct HostController {
85    /// Map of all hosts managed by this controller,
86    /// keyed by database instance id.
87    hosts: Hosts,
88    /// The root directory for database data.
89    pub data_dir: Arc<ServerDataDir>,
90    /// The default configuration to use for databases created by this
91    /// controller.
92    default_config: db::Config,
93    /// The [`ProgramStorage`] to query when instantiating a module.
94    program_storage: ProgramStorage,
95    /// The [`EnergyMonitor`] used by this controller.
96    energy_monitor: Arc<dyn EnergyMonitor>,
97    /// Provides implementations of [`Durability`] for each replica.
98    durability: Arc<dyn DurabilityProvider>,
99    /// The page pool all databases will use by cloning the ref counted pool.
100    pub page_pool: PagePool,
101    /// The runtimes for running our modules.
102    runtimes: Arc<HostRuntimes>,
103    /// The CPU cores that are reserved for ModuleHost operations to run on.
104    db_cores: JobCores,
105}
106
107struct HostRuntimes {
108    wasmtime: WasmtimeRuntime,
109}
110
111impl HostRuntimes {
112    fn new(data_dir: Option<&ServerDataDir>) -> Arc<Self> {
113        let wasmtime = WasmtimeRuntime::new(data_dir);
114        Arc::new(Self { wasmtime })
115    }
116}
117
118#[derive(Clone, Debug)]
119pub struct ReducerCallResult {
120    pub outcome: ReducerOutcome,
121    pub energy_used: EnergyQuanta,
122    pub execution_duration: Duration,
123}
124
125impl ReducerCallResult {
126    pub fn is_err(&self) -> bool {
127        self.outcome.is_err()
128    }
129
130    pub fn is_ok(&self) -> bool {
131        !self.is_err()
132    }
133}
134
135impl From<ReducerCallResult> for Result<(), anyhow::Error> {
136    fn from(value: ReducerCallResult) -> Self {
137        value.outcome.into_result()
138    }
139}
140
141#[derive(Clone, Debug)]
142pub enum ReducerOutcome {
143    Committed,
144    Failed(String),
145    BudgetExceeded,
146}
147
148impl ReducerOutcome {
149    pub fn into_result(self) -> anyhow::Result<()> {
150        match self {
151            Self::Committed => Ok(()),
152            Self::Failed(e) => Err(anyhow::anyhow!(e)),
153            Self::BudgetExceeded => Err(anyhow::anyhow!("reducer ran out of energy")),
154        }
155    }
156
157    pub fn is_err(&self) -> bool {
158        !matches!(self, Self::Committed)
159    }
160}
161
162impl From<&EventStatus> for ReducerOutcome {
163    fn from(status: &EventStatus) -> Self {
164        match &status {
165            EventStatus::Committed(_) => ReducerOutcome::Committed,
166            EventStatus::Failed(e) => ReducerOutcome::Failed(e.clone()),
167            EventStatus::OutOfEnergy => ReducerOutcome::BudgetExceeded,
168        }
169    }
170}
171
172impl HostController {
173    pub fn new(
174        data_dir: Arc<ServerDataDir>,
175        default_config: db::Config,
176        program_storage: ProgramStorage,
177        energy_monitor: Arc<impl EnergyMonitor>,
178        durability: Arc<dyn DurabilityProvider>,
179        db_cores: JobCores,
180    ) -> Self {
181        Self {
182            hosts: <_>::default(),
183            default_config,
184            program_storage,
185            energy_monitor,
186            durability,
187            runtimes: HostRuntimes::new(Some(&data_dir)),
188            data_dir,
189            page_pool: PagePool::new(default_config.page_pool_max_size),
190            db_cores,
191        }
192    }
193
194    /// Replace the [`ProgramStorage`] used by this controller.
195    pub fn set_program_storage(&mut self, ps: ProgramStorage) {
196        self.program_storage = ps;
197    }
198
199    /// Get a [`ModuleHost`] managed by this controller, or launch it from
200    /// persistent state.
201    ///
202    /// If the host is not running, it is started according to the default
203    /// [`db::Config`] set for this controller.
204    ///   The underlying database is restored from existing data at its
205    /// canonical filesystem location _iff_ the default config mandates disk
206    /// storage.
207    ///
208    /// The module will be instantiated from the program bytes stored in an
209    /// existing database.
210    ///   If the database is empty, the `program_bytes_address` of the given
211    /// [`Database`] will be used to load the program from the controller's
212    /// [`ProgramStorage`]. The initialization procedure (schema creation,
213    /// `__init__` reducer) will be invoked on the found module, and the
214    /// database will be marked as initialized.
215    ///
216    /// See also: [`Self::get_module_host`]
217    #[tracing::instrument(level = "trace", skip_all)]
218    pub async fn get_or_launch_module_host(&self, database: Database, replica_id: u64) -> anyhow::Result<ModuleHost> {
219        let mut rx = self.watch_maybe_launch_module_host(database, replica_id).await?;
220        let module = rx.borrow_and_update();
221        Ok(module.clone())
222    }
223
224    /// Like [`Self::get_or_launch_module_host`], use a [`ModuleHost`] managed
225    /// by this controller, or launch it if it is not running.
226    ///
227    /// Instead of a [`ModuleHost`], this returns a [`watch::Receiver`] which
228    /// gets notified each time the module is updated.
229    ///
230    /// See also: [`Self::watch_module_host`]
231    #[tracing::instrument(level = "trace", skip_all)]
232    pub async fn watch_maybe_launch_module_host(
233        &self,
234        database: Database,
235        replica_id: u64,
236    ) -> anyhow::Result<watch::Receiver<ModuleHost>> {
237        // Try a read lock first.
238        {
239            let guard = self.acquire_read_lock(replica_id).await;
240            if let Some(host) = &*guard {
241                trace!("cached host {}/{}", database.database_identity, replica_id);
242                return Ok(host.module.subscribe());
243            }
244        }
245
246        // We didn't find a running module, so take a write lock.
247        // Since [`tokio::sync::RwLock`] doesn't support upgrading of read locks,
248        // we'll need to check again if a module was added meanwhile.
249        let mut guard = self.acquire_write_lock(replica_id).await;
250        if let Some(host) = &*guard {
251            trace!(
252                "cached host {}/{} (lock upgrade)",
253                database.database_identity,
254                replica_id
255            );
256            return Ok(host.module.subscribe());
257        }
258
259        trace!("launch host {}/{}", database.database_identity, replica_id);
260
261        // `HostController::clone` is fast,
262        // as all of its fields are either `Copy` or wrapped in `Arc`.
263        let this = self.clone();
264
265        // `try_init_host` is not cancel safe, as it will spawn other async tasks
266        // which hold a filesystem lock past when `try_init_host` returns or is cancelled.
267        // This means that, if `try_init_host` is cancelled, subsequent calls will fail.
268        //
269        // This is problematic because Axum will cancel its handler tasks if the client disconnects,
270        // and this method is called from Axum handlers, e.g. for the subscribe route.
271        // `tokio::spawn` a task to build the `Host` and install it in the `guard`,
272        // so that it will run to completion even if the caller goes away.
273        //
274        // Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
275        // at which point we won't be calling `try_init_host` again anyways.
276        let rx = tokio::spawn(async move {
277            let host = this.try_init_host(database, replica_id).await?;
278
279            let rx = host.module.subscribe();
280            *guard = Some(host);
281
282            Ok::<_, anyhow::Error>(rx)
283        })
284        .await??;
285
286        Ok(rx)
287    }
288
289    /// Construct an in-memory instance of `database` running `program`,
290    /// initialize it, then immediately destroy it.
291    ///
292    /// This is used during an initial, fresh publish operation
293    /// in order to check the `program`'s validity as a module,
294    /// since some validity checks we'd like to do (e.g. typechecking RLS filters)
295    /// require a fully instantiated database.
296    ///
297    /// This is not necessary during hotswap publishes,
298    /// as the automigration planner and executor accomplish the same validity checks.
299    pub async fn check_module_validity(&self, database: Database, program: Program) -> anyhow::Result<Arc<ModuleInfo>> {
300        Host::try_init_in_memory_to_check(
301            &self.runtimes,
302            self.page_pool.clone(),
303            database,
304            program,
305            // This takes a db core to check validity, and we will later take
306            // another core to actually run the module. Due to the round-robin
307            // algorithm that JobCores uses, that will likely just be the same
308            // core - there's not a concern that we'll only end up using 1/2
309            // of the actual cores.
310            self.db_cores.take(),
311        )
312        .await
313    }
314
315    /// Run a computation on the [`RelationalDB`] of a [`ModuleHost`] managed by
316    /// this controller, launching the host if necessary.
317    ///
318    /// If the computation `F` panics, the host is removed from this controller,
319    /// releasing its resources.
320    #[tracing::instrument(level = "trace", skip_all)]
321    pub async fn using_database<F, T>(&self, database: Database, replica_id: u64, f: F) -> anyhow::Result<T>
322    where
323        F: FnOnce(&RelationalDB) -> T + Send + 'static,
324        T: Send + 'static,
325    {
326        trace!("using database {}/{}", database.database_identity, replica_id);
327        let module = self.get_or_launch_module_host(database, replica_id).await?;
328        let on_panic = self.unregister_fn(replica_id);
329        scopeguard::defer_on_unwind!({
330            warn!("database operation panicked");
331            on_panic();
332        });
333        let result = asyncify(move || f(&module.replica_ctx().relational_db)).await;
334        Ok(result)
335    }
336
337    /// Update the [`ModuleHost`] identified by `replica_id` to the given
338    /// program.
339    ///
340    /// The host may not be running, in which case it is spawned (see
341    /// [`Self::get_or_launch_module_host`] for details on what this entails).
342    ///
343    /// If the host was running, and the update fails, the previous version of
344    /// the host keeps running.
345    #[tracing::instrument(level = "trace", skip_all, err)]
346    pub async fn update_module_host(
347        &self,
348        database: Database,
349        host_type: HostType,
350        replica_id: u64,
351        program_bytes: Box<[u8]>,
352    ) -> anyhow::Result<UpdateDatabaseResult> {
353        let program = Program {
354            hash: hash_bytes(&program_bytes),
355            bytes: program_bytes,
356        };
357        trace!(
358            "update module host {}/{}: genesis={} update-to={}",
359            database.database_identity,
360            replica_id,
361            database.initial_program,
362            program.hash
363        );
364
365        let mut guard = self.acquire_write_lock(replica_id).await;
366
367        // `HostController::clone` is fast,
368        // as all of its fields are either `Copy` or wrapped in `Arc`.
369        let this = self.clone();
370
371        // `try_init_host` is not cancel safe, as it will spawn other async tasks
372        // which hold a filesystem lock past when `try_init_host` returns or is cancelled.
373        // This means that, if `try_init_host` is cancelled, subsequent calls will fail.
374        //
375        // The rest of this future is also not cancel safe, as it will `Option::take` out of the guard
376        // at the start of the block and then store back into it at the end.
377        //
378        // This is problematic because Axum will cancel its handler tasks if the client disconnects,
379        // and this method is called from Axum handlers, e.g. for the publish route.
380        // `tokio::spawn` a task to update the contents of `guard`,
381        // so that it will run to completion even if the caller goes away.
382        //
383        // Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
384        // at which point we won't be calling `try_init_host` again anyways.
385        let update_result = tokio::spawn(async move {
386            let mut host = match guard.take() {
387                None => {
388                    trace!("host not running, try_init");
389                    this.try_init_host(database, replica_id).await?
390                }
391                Some(host) => {
392                    trace!("host found, updating");
393                    host
394                }
395            };
396            let update_result = host
397                .update_module(
398                    this.runtimes.clone(),
399                    host_type,
400                    program,
401                    this.energy_monitor.clone(),
402                    this.unregister_fn(replica_id),
403                    this.db_cores.take(),
404                )
405                .await?;
406
407            *guard = Some(host);
408            Ok::<_, anyhow::Error>(update_result)
409        })
410        .await??;
411
412        Ok(update_result)
413    }
414
415    /// Start the host `replica_id` and conditionally update it.
416    ///
417    /// If the host was not initialized before, it is initialized with the
418    /// program [`Database::initial_program`], which is loaded from the
419    /// controller's [`ProgramStorage`].
420    ///
421    /// If it was already initialized and its stored program hash matches
422    /// [`Database::initial_program`], no further action is taken.
423    ///
424    /// Otherwise, if `expected_hash` is `Some` and does **not** match the
425    /// stored hash, an error is returned.
426    ///
427    /// Otherwise, the host is updated to [`Database::initial_program`], loading
428    /// the program data from the controller's [`ProgramStorage`].
429    ///
430    /// > Note that this ascribes different semantics to [`Database::initial_program`]
431    /// > than elsewhere, where the [`Database`] value is provided by the control
432    /// > database. The method is mainly useful for bootstrapping the control
433    /// > database itself.
434    pub async fn init_maybe_update_module_host(
435        &self,
436        database: Database,
437        replica_id: u64,
438        expected_hash: Option<Hash>,
439    ) -> anyhow::Result<watch::Receiver<ModuleHost>> {
440        trace!("custom bootstrap {}/{}", database.database_identity, replica_id);
441
442        let db_addr = database.database_identity;
443        let host_type = database.host_type;
444        let program_hash = database.initial_program;
445
446        let mut guard = self.acquire_write_lock(replica_id).await;
447
448        // `HostController::clone` is fast,
449        // as all of its fields are either `Copy` or wrapped in `Arc`.
450        let this = self.clone();
451
452        // `try_init_host` is not cancel safe, as it will spawn other async tasks
453        // which hold a filesystem lock past when `try_init_host` returns or is cancelled.
454        // This means that, if `try_init_host` is cancelled, subsequent calls will fail.
455        //
456        // The rest of this future is also not cancel safe, as it will `Option::take` out of the guard
457        // at the start of the block and then store back into it at the end.
458        //
459        // This is problematic because Axum will cancel its handler tasks if the client disconnects,
460        // and this method is called from Axum handlers, e.g. for the publish route.
461        // `tokio::spawn` a task to update the contents of `guard`,
462        // so that it will run to completion even if the caller goes away.
463        //
464        // Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
465        // at which point we won't be calling `try_init_host` again anyways.
466        let module = tokio::spawn(async move {
467            let mut host = match guard.take() {
468                Some(host) => host,
469                None => this.try_init_host(database, replica_id).await?,
470            };
471            let module = host.module.subscribe();
472
473            // The program is now either:
474            //
475            // - the desired one from [Database], in which case we do nothing
476            // - `Some` expected hash, in which case we update to the desired one
477            // - `None` expected hash, in which case we also update
478            let stored_hash = stored_program_hash(host.db())?
479                .with_context(|| format!("[{}] database improperly initialized", db_addr))?;
480            if stored_hash == program_hash {
481                info!("[{}] database up-to-date with {}", db_addr, program_hash);
482                *guard = Some(host);
483            } else {
484                if let Some(expected_hash) = expected_hash {
485                    ensure!(
486                        expected_hash == stored_hash,
487                        "[{}] expected program {} found {}",
488                        db_addr,
489                        expected_hash,
490                        stored_hash
491                    );
492                }
493                info!(
494                    "[{}] updating database from `{}` to `{}`",
495                    db_addr, stored_hash, program_hash
496                );
497                let program = load_program(&this.program_storage, program_hash).await?;
498                let update_result = host
499                    .update_module(
500                        this.runtimes.clone(),
501                        host_type,
502                        program,
503                        this.energy_monitor.clone(),
504                        this.unregister_fn(replica_id),
505                        this.db_cores.take(),
506                    )
507                    .await?;
508                match update_result {
509                    UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {
510                        *guard = Some(host);
511                    }
512                    UpdateDatabaseResult::AutoMigrateError(e) => {
513                        return Err(anyhow::anyhow!(e));
514                    }
515                    UpdateDatabaseResult::ErrorExecutingMigration(e) => {
516                        return Err(e);
517                    }
518                }
519            }
520
521            Ok::<_, anyhow::Error>(module)
522        })
523        .await??;
524
525        Ok(module)
526    }
527
528    /// Release all resources of the [`ModuleHost`] identified by `replica_id`,
529    /// and deregister it from the controller.
530    #[tracing::instrument(level = "trace", skip_all)]
531    pub async fn exit_module_host(&self, replica_id: u64) -> Result<(), anyhow::Error> {
532        trace!("exit module host {}", replica_id);
533        let lock = self.hosts.lock().remove(&replica_id);
534        if let Some(lock) = lock {
535            if let Some(host) = lock.write_owned().await.take() {
536                let module = host.module.borrow().clone();
537                module.exit().await;
538                let table_names = module.info().module_def.tables().map(|t| t.name.deref());
539                remove_database_gauges(&module.info().database_identity, table_names);
540            }
541        }
542
543        Ok(())
544    }
545
546    /// Get the [`ModuleHost`] identified by `replica_id` or return an error
547    /// if it is not registered with the controller.
548    ///
549    /// See [`Self::get_or_launch_module_host`] for a variant which launches
550    /// the host if it is not running.
551    #[tracing::instrument(level = "trace", skip_all)]
552    pub async fn get_module_host(&self, replica_id: u64) -> Result<ModuleHost, NoSuchModule> {
553        trace!("get module host {}", replica_id);
554        let guard = self.acquire_read_lock(replica_id).await;
555        guard
556            .as_ref()
557            .map(|Host { module, .. }| module.borrow().clone())
558            .ok_or(NoSuchModule)
559    }
560
561    /// Subscribe to updates of the [`ModuleHost`] identified by `replica_id`,
562    /// or return an error if it is not registered with the controller.
563    ///
564    /// See [`Self::watch_maybe_launch_module_host`] for a variant which
565    /// launches the host if it is not running.
566    #[tracing::instrument(level = "trace", skip_all)]
567    pub async fn watch_module_host(&self, replica_id: u64) -> Result<watch::Receiver<ModuleHost>, NoSuchModule> {
568        trace!("watch module host {}", replica_id);
569        let guard = self.acquire_read_lock(replica_id).await;
570        guard
571            .as_ref()
572            .map(|Host { module, .. }| module.subscribe())
573            .ok_or(NoSuchModule)
574    }
575
576    /// `true` if the module host `replica_id` is currently registered with
577    /// the controller.
578    pub async fn has_module_host(&self, replica_id: u64) -> bool {
579        self.acquire_read_lock(replica_id).await.is_some()
580    }
581
582    /// On-panic callback passed to [`ModuleHost`]s created by this controller.
583    ///
584    /// Removes the module with the given `replica_id` from this controller.
585    fn unregister_fn(&self, replica_id: u64) -> impl Fn() + Send + Sync + 'static {
586        let hosts = Arc::downgrade(&self.hosts);
587        move || {
588            if let Some(hosts) = hosts.upgrade() {
589                hosts.lock().remove(&replica_id);
590            }
591        }
592    }
593
594    async fn acquire_write_lock(&self, replica_id: u64) -> OwnedRwLockWriteGuard<Option<Host>> {
595        let lock = self.hosts.lock().entry(replica_id).or_default().clone();
596        lock.write_owned().await
597    }
598
599    async fn acquire_read_lock(&self, replica_id: u64) -> OwnedRwLockReadGuard<Option<Host>> {
600        let lock = self.hosts.lock().entry(replica_id).or_default().clone();
601        lock.read_owned().await
602    }
603
604    async fn try_init_host(&self, database: Database, replica_id: u64) -> anyhow::Result<Host> {
605        Host::try_init(self, database, replica_id).await
606    }
607}
608
609fn stored_program_hash(db: &RelationalDB) -> anyhow::Result<Option<Hash>> {
610    let meta = db.metadata()?;
611    Ok(meta.map(|meta| meta.program_hash))
612}
613
614async fn make_replica_ctx(
615    path: ReplicaDir,
616    database: Database,
617    replica_id: u64,
618    relational_db: Arc<RelationalDB>,
619) -> anyhow::Result<ReplicaContext> {
620    let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs())));
621    let send_worker_queue = spawn_send_worker(Some(database.database_identity));
622    let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::new(
623        send_worker_queue.clone(),
624    )));
625    let downgraded = Arc::downgrade(&subscriptions);
626    let subscriptions = ModuleSubscriptions::new(
627        relational_db.clone(),
628        subscriptions,
629        send_worker_queue,
630        database.owner_identity,
631    );
632
633    // If an error occurs when evaluating a subscription,
634    // we mark each client that was affected,
635    // and we remove those clients from the manager async.
636    tokio::spawn(async move {
637        loop {
638            tokio::time::sleep(Duration::from_secs(10)).await;
639            let Some(subscriptions) = downgraded.upgrade() else {
640                break;
641            };
642            asyncify(move || subscriptions.write().remove_dropped_clients()).await
643        }
644    });
645
646    Ok(ReplicaContext {
647        database,
648        replica_id,
649        logger,
650        subscriptions,
651        relational_db,
652    })
653}
654
655/// Initialize a module host for the given program.
656/// The passed replica_ctx may not be configured for this version of the program's database schema yet.
657#[allow(clippy::too_many_arguments)]
658async fn make_module_host(
659    runtimes: Arc<HostRuntimes>,
660    host_type: HostType,
661    replica_ctx: Arc<ReplicaContext>,
662    scheduler: Scheduler,
663    program: Program,
664    energy_monitor: Arc<dyn EnergyMonitor>,
665    unregister: impl Fn() + Send + Sync + 'static,
666    core: JobCore,
667) -> anyhow::Result<(Program, ModuleHost)> {
668    // `make_actor` is blocking, as it needs to compile the wasm to native code,
669    // which may be computationally expensive - sometimes up to 1s for a large module.
670    // TODO: change back to using `spawn_rayon` here - asyncify runs on tokio blocking
671    //       threads, but those aren't for computation. Also, wasmtime uses rayon
672    //       to run compilation in parallel, so it'll need to run stuff in rayon anyway.
673    asyncify(move || {
674        let module_host = match host_type {
675            HostType::Wasm => {
676                let mcc = ModuleCreationContext {
677                    replica_ctx,
678                    scheduler,
679                    program: &program,
680                    energy_monitor,
681                };
682                let start = Instant::now();
683                let actor = runtimes.wasmtime.make_actor(mcc)?;
684                trace!("wasmtime::make_actor blocked for {:?}", start.elapsed());
685                ModuleHost::new(actor, unregister, core)
686            }
687        };
688        Ok((program, module_host))
689    })
690    .await
691}
692
693async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result<Program> {
694    let bytes = storage
695        .lookup(hash)
696        .await?
697        .with_context(|| format!("program {} not found", hash))?;
698    Ok(Program { hash, bytes })
699}
700
701struct LaunchedModule {
702    replica_ctx: Arc<ReplicaContext>,
703    module_host: ModuleHost,
704    scheduler: Scheduler,
705    scheduler_starter: SchedulerStarter,
706}
707
708#[allow(clippy::too_many_arguments)]
709async fn launch_module(
710    database: Database,
711    replica_id: u64,
712    program: Program,
713    on_panic: impl Fn() + Send + Sync + 'static,
714    relational_db: Arc<RelationalDB>,
715    energy_monitor: Arc<dyn EnergyMonitor>,
716    replica_dir: ReplicaDir,
717    runtimes: Arc<HostRuntimes>,
718    core: JobCore,
719) -> anyhow::Result<(Program, LaunchedModule)> {
720    let db_identity = database.database_identity;
721    let host_type = database.host_type;
722
723    let replica_ctx = make_replica_ctx(replica_dir, database, replica_id, relational_db)
724        .await
725        .map(Arc::new)?;
726    let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone());
727    let (program, module_host) = make_module_host(
728        runtimes.clone(),
729        host_type,
730        replica_ctx.clone(),
731        scheduler.clone(),
732        program,
733        energy_monitor.clone(),
734        on_panic,
735        core,
736    )
737    .await?;
738
739    trace!("launched database {} with program {}", db_identity, program.hash);
740
741    Ok((
742        program,
743        LaunchedModule {
744            replica_ctx,
745            module_host,
746            scheduler,
747            scheduler_starter,
748        },
749    ))
750}
751
752/// Update a module.
753///
754/// If the `db` is not initialized yet (i.e. its program hash is `None`),
755/// return an error.
756///
757/// Otherwise, if `db.program_hash` matches the given `program_hash`, do
758/// nothing and return an empty `UpdateDatabaseResult`.
759///
760/// Otherwise, invoke `module.update_database` and return the result.
761async fn update_module(
762    db: &RelationalDB,
763    module: &ModuleHost,
764    program: Program,
765    old_module_info: Arc<ModuleInfo>,
766) -> anyhow::Result<UpdateDatabaseResult> {
767    let addr = db.database_identity();
768    match stored_program_hash(db)? {
769        None => Err(anyhow!("database `{}` not yet initialized", addr)),
770        Some(stored) => {
771            let res = if stored == program.hash {
772                info!("database `{}` up to date with program `{}`", addr, program.hash);
773                UpdateDatabaseResult::NoUpdateNeeded
774            } else {
775                info!("updating `{}` from {} to {}", addr, stored, program.hash);
776                module.update_database(program, old_module_info).await?
777            };
778
779            Ok(res)
780        }
781    }
782}
783
784/// Encapsulates a database, associated module, and auxiliary state.
785struct Host {
786    /// The [`ModuleHost`], providing the callable reducer API.
787    ///
788    /// Modules may be updated via [`Host::update_module`].
789    /// The module is wrapped in a [`watch::Sender`] to allow for "hot swapping":
790    /// clients may subscribe to the channel, so they get the most recent
791    /// [`ModuleHost`] version or an error if the [`Host`] was dropped.
792    module: watch::Sender<ModuleHost>,
793    /// Pointer to the `module`'s [`ReplicaContext`].
794    ///
795    /// The database stays the same if and when the module is updated via
796    /// [`Host::update_module`].
797    replica_ctx: Arc<ReplicaContext>,
798    /// Scheduler for repeating reducers, operating on the current `module`.
799    scheduler: Scheduler,
800    /// Handle to the metrics collection task started via [`disk_monitor`].
801    ///
802    /// The task collects metrics from the `replica_ctx`, and so stays alive as long
803    /// as the `replica_ctx` is live. The task is aborted when [`Host`] is dropped.
804    metrics_task: AbortHandle,
805}
806
807impl Host {
808    /// Attempt to instantiate a [`Host`] from persistent storage.
809    ///
810    /// Note that this does **not** run module initialization routines, but may
811    /// create on-disk artifacts if the host / database did not exist.
812    #[tracing::instrument(level = "debug", skip_all)]
813    async fn try_init(host_controller: &HostController, database: Database, replica_id: u64) -> anyhow::Result<Self> {
814        let HostController {
815            data_dir,
816            default_config: config,
817            program_storage,
818            energy_monitor,
819            runtimes,
820            durability,
821            page_pool,
822            ..
823        } = host_controller;
824        let on_panic = host_controller.unregister_fn(replica_id);
825        let replica_dir = data_dir.replica(replica_id);
826
827        let (db, connected_clients) = match config.storage {
828            db::Storage::Memory => RelationalDB::open(
829                &replica_dir,
830                database.database_identity,
831                database.owner_identity,
832                EmptyHistory::new(),
833                None,
834                None,
835                page_pool.clone(),
836            )?,
837            db::Storage::Disk => {
838                let snapshot_repo =
839                    relational_db::open_snapshot_repo(replica_dir.snapshots(), database.database_identity, replica_id)?;
840                let (history, _) = relational_db::local_durability(replica_dir.commit_log()).await?;
841                let (durability, start_snapshot_watcher) = durability.durability(replica_id).await?;
842
843                let (db, clients) = RelationalDB::open(
844                    &replica_dir,
845                    database.database_identity,
846                    database.owner_identity,
847                    history,
848                    Some(durability),
849                    Some(snapshot_repo),
850                    page_pool.clone(),
851                )
852                // Make sure we log the source chain of the error
853                // as a single line, with the help of `anyhow`.
854                .map_err(anyhow::Error::from)
855                .inspect_err(|e| {
856                    tracing::error!(
857                        database = %database.database_identity,
858                        replica = replica_id,
859                        "Failed to open database: {e:#}"
860                    );
861                })?;
862                if let Some(start_snapshot_watcher) = start_snapshot_watcher {
863                    let watcher = db.subscribe_to_snapshots().expect("we passed snapshot_repo");
864                    start_snapshot_watcher(watcher)
865                }
866                (db, clients)
867            }
868        };
869        let (program, program_needs_init) = match db.program()? {
870            // Launch module with program from existing database.
871            Some(program) => (program, false),
872            // Database is empty, load program from external storage and run
873            // initialization.
874            None => (load_program(program_storage, database.initial_program).await?, true),
875        };
876
877        let (program, launched) = launch_module(
878            database,
879            replica_id,
880            program,
881            on_panic,
882            Arc::new(db),
883            energy_monitor.clone(),
884            replica_dir,
885            runtimes.clone(),
886            host_controller.db_cores.take(),
887        )
888        .await?;
889
890        if program_needs_init {
891            let call_result = launched.module_host.init_database(program).await?;
892            if let Some(call_result) = call_result {
893                Result::from(call_result)?;
894            }
895        } else {
896            drop(program)
897        }
898
899        let LaunchedModule {
900            replica_ctx,
901            module_host,
902            scheduler,
903            scheduler_starter,
904        } = launched;
905
906        // Disconnect dangling clients.
907        for (identity, connection_id) in connected_clients {
908            module_host
909                .call_identity_disconnected(identity, connection_id)
910                .await
911                .with_context(|| {
912                    format!(
913                        "Error calling disconnect for {} {} on {}",
914                        identity, connection_id, replica_ctx.database_identity
915                    )
916                })?;
917        }
918
919        scheduler_starter.start(&module_host)?;
920        let metrics_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
921
922        Ok(Host {
923            module: watch::Sender::new(module_host),
924            replica_ctx,
925            scheduler,
926            metrics_task,
927        })
928    }
929
930    /// Construct an in-memory instance of `database` running `program`,
931    /// initialize it, then immediately destroy it.
932    ///
933    /// This is used during an initial, fresh publish operation
934    /// in order to check the `program`'s validity as a module,
935    /// since some validity checks we'd like to do (e.g. typechecking RLS filters)
936    /// require a fully instantiated database.
937    ///
938    /// This is not necessary during hotswap publishes,
939    /// as the automigration planner and executor accomplish the same validity checks.
940    async fn try_init_in_memory_to_check(
941        runtimes: &Arc<HostRuntimes>,
942        page_pool: PagePool,
943        database: Database,
944        program: Program,
945        core: JobCore,
946    ) -> anyhow::Result<Arc<ModuleInfo>> {
947        // Even in-memory databases acquire a lockfile.
948        // Grab a tempdir to put that lockfile in.
949        let phony_replica_dir = TempDir::with_prefix("spacetimedb-publish-in-memory-check")
950            .context("Error creating temporary directory to house temporary database during publish")?;
951
952        // Leave the `TempDir` instance in place, so that its destructor will still run.
953        let phony_replica_dir = ReplicaDir::from_path_unchecked(phony_replica_dir.path().to_owned());
954
955        let (db, _connected_clients) = RelationalDB::open(
956            &phony_replica_dir,
957            database.database_identity,
958            database.owner_identity,
959            EmptyHistory::new(),
960            None,
961            None,
962            page_pool,
963        )?;
964
965        let (program, launched) = launch_module(
966            database,
967            0,
968            program,
969            // No need to register a callback here:
970            // proper publishes use it to unregister a panicked module,
971            // but this module is not registered in the first place.
972            || log::error!("launch_module on_panic called for temporary publish in-memory instance"),
973            Arc::new(db),
974            Arc::new(NullEnergyMonitor),
975            phony_replica_dir,
976            runtimes.clone(),
977            core,
978        )
979        .await?;
980
981        let call_result = launched.module_host.init_database(program).await?;
982        if let Some(call_result) = call_result {
983            Result::from(call_result)?;
984        }
985
986        Ok(launched.module_host.info)
987    }
988
989    /// Attempt to replace this [`Host`]'s [`ModuleHost`] with a new one running
990    /// the program `program_hash`.
991    ///
992    /// The associated [`ReplicaContext`] stays the same.
993    ///
994    /// Executes [`ModuleHost::update_database`] on the newly instantiated
995    /// module, updating the database schema and invoking the `__update__`
996    /// reducer if it is defined.
997    /// If this succeeds, the current module is replaced with the new one,
998    /// otherwise it stays the same.
999    ///
1000    /// Either way, the [`UpdateDatabaseResult`] is returned.
1001    async fn update_module(
1002        &mut self,
1003        runtimes: Arc<HostRuntimes>,
1004        host_type: HostType,
1005        program: Program,
1006        energy_monitor: Arc<dyn EnergyMonitor>,
1007        on_panic: impl Fn() + Send + Sync + 'static,
1008        core: JobCore,
1009    ) -> anyhow::Result<UpdateDatabaseResult> {
1010        let replica_ctx = &self.replica_ctx;
1011        let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone());
1012
1013        let (program, module) = make_module_host(
1014            runtimes,
1015            host_type,
1016            replica_ctx.clone(),
1017            scheduler.clone(),
1018            program,
1019            energy_monitor,
1020            on_panic,
1021            core,
1022        )
1023        .await?;
1024
1025        // Get the old module info to diff against when building a migration plan.
1026        let old_module_info = self.module.borrow().info.clone();
1027
1028        let update_result = update_module(&replica_ctx.relational_db, &module, program, old_module_info).await?;
1029        trace!("update result: {update_result:?}");
1030        // Only replace the module + scheduler if the update succeeded.
1031        // Otherwise, we want the database to continue running with the old state.
1032        if update_result.was_successful() {
1033            self.scheduler = scheduler;
1034            scheduler_starter.start(&module)?;
1035            let old_module = self.module.send_replace(module);
1036            old_module.exit().await;
1037        }
1038
1039        Ok(update_result)
1040    }
1041
1042    fn db(&self) -> &RelationalDB {
1043        &self.replica_ctx.relational_db
1044    }
1045}
1046
1047impl Drop for Host {
1048    fn drop(&mut self) {
1049        self.metrics_task.abort();
1050    }
1051}
1052
1053const STORAGE_METERING_INTERVAL: Duration = Duration::from_secs(15);
1054
1055/// Periodically collect gauge stats and update prometheus metrics.
1056async fn metric_reporter(replica_ctx: Arc<ReplicaContext>) {
1057    // TODO: Consider adding a metric for heap usage.
1058    let message_log_size = DB_METRICS
1059        .message_log_size
1060        .with_label_values(&replica_ctx.database_identity);
1061    let module_log_file_size = DB_METRICS
1062        .module_log_file_size
1063        .with_label_values(&replica_ctx.database_identity);
1064
1065    loop {
1066        let disk_usage = tokio::task::block_in_place(|| replica_ctx.total_disk_usage());
1067        replica_ctx.update_gauges();
1068        if let Some(num_bytes) = disk_usage.durability {
1069            message_log_size.set(num_bytes as i64);
1070        }
1071        if let Some(num_bytes) = disk_usage.logs {
1072            module_log_file_size.set(num_bytes as i64);
1073        }
1074        tokio::time::sleep(STORAGE_METERING_INTERVAL).await;
1075    }
1076}
1077
1078/// Extracts the schema from a given module.
1079///
1080/// Spins up a dummy host and returns the `ModuleDef` that it extracts.
1081pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> anyhow::Result<ModuleDef> {
1082    let owner_identity = Identity::from_u256(0xdcba_u32.into());
1083    let database_identity = Identity::from_u256(0xabcd_u32.into());
1084    let program = Program::from_bytes(program_bytes);
1085
1086    let database = Database {
1087        id: 0,
1088        database_identity,
1089        owner_identity,
1090        host_type,
1091        initial_program: program.hash,
1092    };
1093
1094    let runtimes = HostRuntimes::new(None);
1095    let page_pool = PagePool::new(None);
1096    let core = JobCore::default();
1097    let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?;
1098    // this should always succeed, but sometimes it doesn't
1099    let module_def = match Arc::try_unwrap(module_info) {
1100        Ok(info) => info.module_def,
1101        Err(info) => info.module_def.clone(),
1102    };
1103
1104    Ok(module_def)
1105}
1106
1107// Remove all gauges associated with a database.
1108// This is useful if a database is being deleted.
1109pub fn remove_database_gauges<'a, I>(db: &Identity, table_names: I)
1110where
1111    I: IntoIterator<Item = &'a str>,
1112{
1113    // Remove the per-table gauges.
1114    for table_name in table_names {
1115        let _ = DATA_SIZE_METRICS
1116            .data_size_table_num_rows
1117            .remove_label_values(db, table_name);
1118        let _ = DATA_SIZE_METRICS
1119            .data_size_table_bytes_used_by_rows
1120            .remove_label_values(db, table_name);
1121        let _ = DATA_SIZE_METRICS
1122            .data_size_table_num_rows_in_indexes
1123            .remove_label_values(db, table_name);
1124        let _ = DATA_SIZE_METRICS
1125            .data_size_table_bytes_used_by_index_keys
1126            .remove_label_values(db, table_name);
1127    }
1128    // Remove the per-db gauges.
1129    let _ = DATA_SIZE_METRICS.data_size_blob_store_num_blobs.remove_label_values(db);
1130    let _ = DATA_SIZE_METRICS
1131        .data_size_blob_store_bytes_used_by_blobs
1132        .remove_label_values(db);
1133    let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(db);
1134}