spacetimedb/host/
host_controller.rs

1use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule};
2use super::scheduler::SchedulerStarter;
3use super::wasmtime::WasmtimeRuntime;
4use super::{Scheduler, UpdateDatabaseResult};
5use crate::database_logger::DatabaseLogger;
6use crate::db::relational_db::{self, DiskSizeFn, RelationalDB, Txdata};
7use crate::db::{self, spawn_tx_metrics_recorder};
8use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor};
9use crate::host::module_host::ModuleRuntime as _;
10use crate::host::v8::V8Runtime;
11use crate::messages::control_db::{Database, HostType};
12use crate::module_host_context::ModuleCreationContext;
13use crate::replica_context::ReplicaContext;
14use crate::subscription::module_subscription_actor::ModuleSubscriptions;
15use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
16use crate::util::asyncify;
17use crate::util::jobs::{JobCore, JobCores};
18use crate::worker_metrics::WORKER_METRICS;
19use anyhow::{anyhow, ensure, Context};
20use async_trait::async_trait;
21use durability::{Durability, EmptyHistory};
22use log::{info, trace, warn};
23use parking_lot::Mutex;
24use spacetimedb_data_structures::map::IntMap;
25use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS;
26use spacetimedb_datastore::db_metrics::DB_METRICS;
27use spacetimedb_datastore::traits::Program;
28use spacetimedb_durability::{self as durability, TxOffset};
29use spacetimedb_lib::{hash_bytes, Identity};
30use spacetimedb_paths::server::{ReplicaDir, ServerDataDir};
31use spacetimedb_paths::FromPathUnchecked;
32use spacetimedb_sats::hash::Hash;
33use spacetimedb_schema::def::ModuleDef;
34use spacetimedb_table::page_pool::PagePool;
35use std::future::Future;
36use std::ops::Deref;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39use tempfile::TempDir;
40use tokio::sync::{watch, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock as AsyncRwLock};
41use tokio::task::AbortHandle;
42
43// TODO:
44//
45// - [db::Config] should be per-[Database]
46
47/// A shared mutable cell containing a module host and associated database.
48type HostCell = Arc<AsyncRwLock<Option<Host>>>;
49
50/// The registry of all running hosts.
51type Hosts = Arc<Mutex<IntMap<u64, HostCell>>>;
52
53pub type ExternalDurability = (Arc<dyn Durability<TxData = Txdata>>, DiskSizeFn);
54
55pub type StartSnapshotWatcher = Box<dyn FnOnce(watch::Receiver<TxOffset>)>;
56
57#[async_trait]
58pub trait DurabilityProvider: Send + Sync + 'static {
59    async fn durability(&self, replica_id: u64) -> anyhow::Result<(ExternalDurability, Option<StartSnapshotWatcher>)>;
60}
61
62#[async_trait]
63pub trait ExternalStorage: Send + Sync + 'static {
64    async fn lookup(&self, program_hash: Hash) -> anyhow::Result<Option<Box<[u8]>>>;
65}
66#[async_trait]
67impl<F, Fut> ExternalStorage for F
68where
69    F: Fn(Hash) -> Fut + Send + Sync + 'static,
70    Fut: Future<Output = anyhow::Result<Option<Box<[u8]>>>> + Send,
71{
72    async fn lookup(&self, program_hash: Hash) -> anyhow::Result<Option<Box<[u8]>>> {
73        self(program_hash).await
74    }
75}
76
77pub type ProgramStorage = Arc<dyn ExternalStorage>;
78
79/// A host controller manages the lifecycle of spacetime databases and their
80/// associated modules.
81///
82/// This type is, and must remain, cheap to clone.
83/// All of its fields should either be [`Copy`], enclosed in an [`Arc`],
84/// or have some other fast [`Clone`] implementation.
85#[derive(Clone)]
86pub struct HostController {
87    /// Map of all hosts managed by this controller,
88    /// keyed by database instance id.
89    hosts: Hosts,
90    /// The root directory for database data.
91    pub data_dir: Arc<ServerDataDir>,
92    /// The default configuration to use for databases created by this
93    /// controller.
94    default_config: db::Config,
95    /// The [`ProgramStorage`] to query when instantiating a module.
96    program_storage: ProgramStorage,
97    /// The [`EnergyMonitor`] used by this controller.
98    energy_monitor: Arc<dyn EnergyMonitor>,
99    /// Provides implementations of [`Durability`] for each replica.
100    durability: Arc<dyn DurabilityProvider>,
101    /// The page pool all databases will use by cloning the ref counted pool.
102    pub page_pool: PagePool,
103    /// The runtimes for running our modules.
104    runtimes: Arc<HostRuntimes>,
105    /// The CPU cores that are reserved for ModuleHost operations to run on.
106    db_cores: JobCores,
107}
108
109struct HostRuntimes {
110    wasmtime: WasmtimeRuntime,
111    v8: V8Runtime,
112}
113
114impl HostRuntimes {
115    fn new(data_dir: Option<&ServerDataDir>) -> Arc<Self> {
116        let wasmtime = WasmtimeRuntime::new(data_dir);
117        let v8 = V8Runtime::default();
118        Arc::new(Self { wasmtime, v8 })
119    }
120}
121
122#[derive(Clone, Debug)]
123pub struct ReducerCallResult {
124    pub outcome: ReducerOutcome,
125    pub energy_used: EnergyQuanta,
126    pub execution_duration: Duration,
127}
128
129impl ReducerCallResult {
130    pub fn is_err(&self) -> bool {
131        self.outcome.is_err()
132    }
133
134    pub fn is_ok(&self) -> bool {
135        !self.is_err()
136    }
137}
138
139impl From<ReducerCallResult> for Result<(), anyhow::Error> {
140    fn from(value: ReducerCallResult) -> Self {
141        value.outcome.into_result()
142    }
143}
144
145#[derive(Clone, Debug)]
146pub enum ReducerOutcome {
147    Committed,
148    Failed(String),
149    BudgetExceeded,
150}
151
152impl ReducerOutcome {
153    pub fn into_result(self) -> anyhow::Result<()> {
154        match self {
155            Self::Committed => Ok(()),
156            Self::Failed(e) => Err(anyhow::anyhow!(e)),
157            Self::BudgetExceeded => Err(anyhow::anyhow!("reducer ran out of energy")),
158        }
159    }
160
161    pub fn is_err(&self) -> bool {
162        !matches!(self, Self::Committed)
163    }
164}
165
166impl From<&EventStatus> for ReducerOutcome {
167    fn from(status: &EventStatus) -> Self {
168        match &status {
169            EventStatus::Committed(_) => ReducerOutcome::Committed,
170            EventStatus::Failed(e) => ReducerOutcome::Failed(e.clone()),
171            EventStatus::OutOfEnergy => ReducerOutcome::BudgetExceeded,
172        }
173    }
174}
175
176impl HostController {
177    pub fn new(
178        data_dir: Arc<ServerDataDir>,
179        default_config: db::Config,
180        program_storage: ProgramStorage,
181        energy_monitor: Arc<impl EnergyMonitor>,
182        durability: Arc<dyn DurabilityProvider>,
183        db_cores: JobCores,
184    ) -> Self {
185        Self {
186            hosts: <_>::default(),
187            default_config,
188            program_storage,
189            energy_monitor,
190            durability,
191            runtimes: HostRuntimes::new(Some(&data_dir)),
192            data_dir,
193            page_pool: PagePool::new(default_config.page_pool_max_size),
194            db_cores,
195        }
196    }
197
198    /// Replace the [`ProgramStorage`] used by this controller.
199    pub fn set_program_storage(&mut self, ps: ProgramStorage) {
200        self.program_storage = ps;
201    }
202
203    /// Get a [`ModuleHost`] managed by this controller, or launch it from
204    /// persistent state.
205    ///
206    /// If the host is not running, it is started according to the default
207    /// [`db::Config`] set for this controller.
208    ///   The underlying database is restored from existing data at its
209    /// canonical filesystem location _iff_ the default config mandates disk
210    /// storage.
211    ///
212    /// The module will be instantiated from the program bytes stored in an
213    /// existing database.
214    ///   If the database is empty, the `program_bytes_address` of the given
215    /// [`Database`] will be used to load the program from the controller's
216    /// [`ProgramStorage`]. The initialization procedure (schema creation,
217    /// `__init__` reducer) will be invoked on the found module, and the
218    /// database will be marked as initialized.
219    ///
220    /// See also: [`Self::get_module_host`]
221    #[tracing::instrument(level = "trace", skip_all)]
222    pub async fn get_or_launch_module_host(&self, database: Database, replica_id: u64) -> anyhow::Result<ModuleHost> {
223        let mut rx = self.watch_maybe_launch_module_host(database, replica_id).await?;
224        let module = rx.borrow_and_update();
225        Ok(module.clone())
226    }
227
228    /// Like [`Self::get_or_launch_module_host`], use a [`ModuleHost`] managed
229    /// by this controller, or launch it if it is not running.
230    ///
231    /// Instead of a [`ModuleHost`], this returns a [`watch::Receiver`] which
232    /// gets notified each time the module is updated.
233    ///
234    /// See also: [`Self::watch_module_host`]
235    #[tracing::instrument(level = "trace", skip_all)]
236    pub async fn watch_maybe_launch_module_host(
237        &self,
238        database: Database,
239        replica_id: u64,
240    ) -> anyhow::Result<watch::Receiver<ModuleHost>> {
241        // Try a read lock first.
242        {
243            let guard = self.acquire_read_lock(replica_id).await;
244            if let Some(host) = &*guard {
245                trace!("cached host {}/{}", database.database_identity, replica_id);
246                return Ok(host.module.subscribe());
247            }
248        }
249
250        // We didn't find a running module, so take a write lock.
251        // Since [`tokio::sync::RwLock`] doesn't support upgrading of read locks,
252        // we'll need to check again if a module was added meanwhile.
253        let mut guard = self.acquire_write_lock(replica_id).await;
254        if let Some(host) = &*guard {
255            trace!(
256                "cached host {}/{} (lock upgrade)",
257                database.database_identity,
258                replica_id
259            );
260            return Ok(host.module.subscribe());
261        }
262
263        trace!("launch host {}/{}", database.database_identity, replica_id);
264
265        // `HostController::clone` is fast,
266        // as all of its fields are either `Copy` or wrapped in `Arc`.
267        let this = self.clone();
268
269        // `try_init_host` is not cancel safe, as it will spawn other async tasks
270        // which hold a filesystem lock past when `try_init_host` returns or is cancelled.
271        // This means that, if `try_init_host` is cancelled, subsequent calls will fail.
272        //
273        // This is problematic because Axum will cancel its handler tasks if the client disconnects,
274        // and this method is called from Axum handlers, e.g. for the subscribe route.
275        // `tokio::spawn` a task to build the `Host` and install it in the `guard`,
276        // so that it will run to completion even if the caller goes away.
277        //
278        // Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
279        // at which point we won't be calling `try_init_host` again anyways.
280        let rx = tokio::spawn(async move {
281            let host = this.try_init_host(database, replica_id).await?;
282
283            let rx = host.module.subscribe();
284            *guard = Some(host);
285
286            Ok::<_, anyhow::Error>(rx)
287        })
288        .await??;
289
290        Ok(rx)
291    }
292
293    /// Construct an in-memory instance of `database` running `program`,
294    /// initialize it, then immediately destroy it.
295    ///
296    /// This is used during an initial, fresh publish operation
297    /// in order to check the `program`'s validity as a module,
298    /// since some validity checks we'd like to do (e.g. typechecking RLS filters)
299    /// require a fully instantiated database.
300    ///
301    /// This is not necessary during hotswap publishes,
302    /// as the automigration planner and executor accomplish the same validity checks.
303    pub async fn check_module_validity(&self, database: Database, program: Program) -> anyhow::Result<Arc<ModuleInfo>> {
304        Host::try_init_in_memory_to_check(
305            &self.runtimes,
306            self.page_pool.clone(),
307            database,
308            program,
309            // This takes a db core to check validity, and we will later take
310            // another core to actually run the module. Due to the round-robin
311            // algorithm that JobCores uses, that will likely just be the same
312            // core - there's not a concern that we'll only end up using 1/2
313            // of the actual cores.
314            self.db_cores.take(),
315        )
316        .await
317    }
318
319    /// Run a computation on the [`RelationalDB`] of a [`ModuleHost`] managed by
320    /// this controller, launching the host if necessary.
321    ///
322    /// If the computation `F` panics, the host is removed from this controller,
323    /// releasing its resources.
324    #[tracing::instrument(level = "trace", skip_all)]
325    pub async fn using_database<F, T>(&self, database: Database, replica_id: u64, f: F) -> anyhow::Result<T>
326    where
327        F: FnOnce(&RelationalDB) -> T + Send + 'static,
328        T: Send + 'static,
329    {
330        trace!("using database {}/{}", database.database_identity, replica_id);
331        let module = self.get_or_launch_module_host(database, replica_id).await?;
332        let on_panic = self.unregister_fn(replica_id);
333        scopeguard::defer_on_unwind!({
334            warn!("database operation panicked");
335            on_panic();
336        });
337
338        let db = module.replica_ctx().relational_db.clone();
339        let result = module.on_module_thread("using_database", move || f(&db)).await?;
340        Ok(result)
341    }
342
343    /// Update the [`ModuleHost`] identified by `replica_id` to the given
344    /// program.
345    ///
346    /// The host may not be running, in which case it is spawned (see
347    /// [`Self::get_or_launch_module_host`] for details on what this entails).
348    ///
349    /// If the host was running, and the update fails, the previous version of
350    /// the host keeps running.
351    #[tracing::instrument(level = "trace", skip_all, err)]
352    pub async fn update_module_host(
353        &self,
354        database: Database,
355        host_type: HostType,
356        replica_id: u64,
357        program_bytes: Box<[u8]>,
358    ) -> anyhow::Result<UpdateDatabaseResult> {
359        let program = Program {
360            hash: hash_bytes(&program_bytes),
361            bytes: program_bytes,
362        };
363        trace!(
364            "update module host {}/{}: genesis={} update-to={}",
365            database.database_identity,
366            replica_id,
367            database.initial_program,
368            program.hash
369        );
370
371        let mut guard = self.acquire_write_lock(replica_id).await;
372
373        // `HostController::clone` is fast,
374        // as all of its fields are either `Copy` or wrapped in `Arc`.
375        let this = self.clone();
376
377        // `try_init_host` is not cancel safe, as it will spawn other async tasks
378        // which hold a filesystem lock past when `try_init_host` returns or is cancelled.
379        // This means that, if `try_init_host` is cancelled, subsequent calls will fail.
380        //
381        // The rest of this future is also not cancel safe, as it will `Option::take` out of the guard
382        // at the start of the block and then store back into it at the end.
383        //
384        // This is problematic because Axum will cancel its handler tasks if the client disconnects,
385        // and this method is called from Axum handlers, e.g. for the publish route.
386        // `tokio::spawn` a task to update the contents of `guard`,
387        // so that it will run to completion even if the caller goes away.
388        //
389        // Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
390        // at which point we won't be calling `try_init_host` again anyways.
391        let update_result = tokio::spawn(async move {
392            let mut host = match guard.take() {
393                None => {
394                    trace!("host not running, try_init");
395                    this.try_init_host(database, replica_id).await?
396                }
397                Some(host) => {
398                    trace!("host found, updating");
399                    host
400                }
401            };
402            let update_result = host
403                .update_module(
404                    this.runtimes.clone(),
405                    host_type,
406                    program,
407                    this.energy_monitor.clone(),
408                    this.unregister_fn(replica_id),
409                    this.db_cores.take(),
410                )
411                .await?;
412
413            *guard = Some(host);
414            Ok::<_, anyhow::Error>(update_result)
415        })
416        .await??;
417
418        Ok(update_result)
419    }
420
421    /// Start the host `replica_id` and conditionally update it.
422    ///
423    /// If the host was not initialized before, it is initialized with the
424    /// program [`Database::initial_program`], which is loaded from the
425    /// controller's [`ProgramStorage`].
426    ///
427    /// If it was already initialized and its stored program hash matches
428    /// [`Database::initial_program`], no further action is taken.
429    ///
430    /// Otherwise, if `expected_hash` is `Some` and does **not** match the
431    /// stored hash, an error is returned.
432    ///
433    /// Otherwise, the host is updated to [`Database::initial_program`], loading
434    /// the program data from the controller's [`ProgramStorage`].
435    ///
436    /// > Note that this ascribes different semantics to [`Database::initial_program`]
437    /// > than elsewhere, where the [`Database`] value is provided by the control
438    /// > database. The method is mainly useful for bootstrapping the control
439    /// > database itself.
440    pub async fn init_maybe_update_module_host(
441        &self,
442        database: Database,
443        replica_id: u64,
444        expected_hash: Option<Hash>,
445    ) -> anyhow::Result<watch::Receiver<ModuleHost>> {
446        trace!("custom bootstrap {}/{}", database.database_identity, replica_id);
447
448        let db_addr = database.database_identity;
449        let host_type = database.host_type;
450        let program_hash = database.initial_program;
451
452        let mut guard = self.acquire_write_lock(replica_id).await;
453
454        // `HostController::clone` is fast,
455        // as all of its fields are either `Copy` or wrapped in `Arc`.
456        let this = self.clone();
457
458        // `try_init_host` is not cancel safe, as it will spawn other async tasks
459        // which hold a filesystem lock past when `try_init_host` returns or is cancelled.
460        // This means that, if `try_init_host` is cancelled, subsequent calls will fail.
461        //
462        // The rest of this future is also not cancel safe, as it will `Option::take` out of the guard
463        // at the start of the block and then store back into it at the end.
464        //
465        // This is problematic because Axum will cancel its handler tasks if the client disconnects,
466        // and this method is called from Axum handlers, e.g. for the publish route.
467        // `tokio::spawn` a task to update the contents of `guard`,
468        // so that it will run to completion even if the caller goes away.
469        //
470        // Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
471        // at which point we won't be calling `try_init_host` again anyways.
472        let module = tokio::spawn(async move {
473            let mut host = match guard.take() {
474                Some(host) => host,
475                None => this.try_init_host(database, replica_id).await?,
476            };
477            let module = host.module.subscribe();
478
479            // The program is now either:
480            //
481            // - the desired one from [Database], in which case we do nothing
482            // - `Some` expected hash, in which case we update to the desired one
483            // - `None` expected hash, in which case we also update
484            let stored_hash = stored_program_hash(host.db())?
485                .with_context(|| format!("[{db_addr}] database improperly initialized"))?;
486            if stored_hash == program_hash {
487                info!("[{db_addr}] database up-to-date with {program_hash}");
488                *guard = Some(host);
489            } else {
490                if let Some(expected_hash) = expected_hash {
491                    ensure!(
492                        expected_hash == stored_hash,
493                        "[{}] expected program {} found {}",
494                        db_addr,
495                        expected_hash,
496                        stored_hash
497                    );
498                }
499                info!("[{db_addr}] updating database from `{stored_hash}` to `{program_hash}`");
500                let program = load_program(&this.program_storage, program_hash).await?;
501                let update_result = host
502                    .update_module(
503                        this.runtimes.clone(),
504                        host_type,
505                        program,
506                        this.energy_monitor.clone(),
507                        this.unregister_fn(replica_id),
508                        this.db_cores.take(),
509                    )
510                    .await?;
511                match update_result {
512                    UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {
513                        *guard = Some(host);
514                    }
515                    UpdateDatabaseResult::AutoMigrateError(e) => {
516                        return Err(anyhow::anyhow!(e));
517                    }
518                    UpdateDatabaseResult::ErrorExecutingMigration(e) => {
519                        return Err(e);
520                    }
521                }
522            }
523
524            Ok::<_, anyhow::Error>(module)
525        })
526        .await??;
527
528        Ok(module)
529    }
530
531    /// Release all resources of the [`ModuleHost`] identified by `replica_id`,
532    /// and deregister it from the controller.
533    #[tracing::instrument(level = "trace", skip_all)]
534    pub async fn exit_module_host(&self, replica_id: u64) -> Result<(), anyhow::Error> {
535        trace!("exit module host {replica_id}");
536        let lock = self.hosts.lock().remove(&replica_id);
537        if let Some(lock) = lock {
538            if let Some(host) = lock.write_owned().await.take() {
539                let module = host.module.borrow().clone();
540                module.exit().await;
541                let table_names = module.info().module_def.tables().map(|t| t.name.deref());
542                remove_database_gauges(&module.info().database_identity, table_names);
543            }
544        }
545
546        Ok(())
547    }
548
549    /// Get the [`ModuleHost`] identified by `replica_id` or return an error
550    /// if it is not registered with the controller.
551    ///
552    /// See [`Self::get_or_launch_module_host`] for a variant which launches
553    /// the host if it is not running.
554    #[tracing::instrument(level = "trace", skip_all)]
555    pub async fn get_module_host(&self, replica_id: u64) -> Result<ModuleHost, NoSuchModule> {
556        trace!("get module host {replica_id}");
557        let guard = self.acquire_read_lock(replica_id).await;
558        guard
559            .as_ref()
560            .map(|Host { module, .. }| module.borrow().clone())
561            .ok_or(NoSuchModule)
562    }
563
564    /// Subscribe to updates of the [`ModuleHost`] identified by `replica_id`,
565    /// or return an error if it is not registered with the controller.
566    ///
567    /// See [`Self::watch_maybe_launch_module_host`] for a variant which
568    /// launches the host if it is not running.
569    #[tracing::instrument(level = "trace", skip_all)]
570    pub async fn watch_module_host(&self, replica_id: u64) -> Result<watch::Receiver<ModuleHost>, NoSuchModule> {
571        trace!("watch module host {replica_id}");
572        let guard = self.acquire_read_lock(replica_id).await;
573        guard
574            .as_ref()
575            .map(|Host { module, .. }| module.subscribe())
576            .ok_or(NoSuchModule)
577    }
578
579    /// `true` if the module host `replica_id` is currently registered with
580    /// the controller.
581    pub async fn has_module_host(&self, replica_id: u64) -> bool {
582        self.acquire_read_lock(replica_id).await.is_some()
583    }
584
585    /// On-panic callback passed to [`ModuleHost`]s created by this controller.
586    ///
587    /// Removes the module with the given `replica_id` from this controller.
588    fn unregister_fn(&self, replica_id: u64) -> impl Fn() + Send + Sync + 'static {
589        let hosts = Arc::downgrade(&self.hosts);
590        move || {
591            if let Some(hosts) = hosts.upgrade() {
592                hosts.lock().remove(&replica_id);
593            }
594        }
595    }
596
597    async fn acquire_write_lock(&self, replica_id: u64) -> OwnedRwLockWriteGuard<Option<Host>> {
598        let lock = self.hosts.lock().entry(replica_id).or_default().clone();
599        lock.write_owned().await
600    }
601
602    async fn acquire_read_lock(&self, replica_id: u64) -> OwnedRwLockReadGuard<Option<Host>> {
603        let lock = self.hosts.lock().entry(replica_id).or_default().clone();
604        lock.read_owned().await
605    }
606
607    async fn try_init_host(&self, database: Database, replica_id: u64) -> anyhow::Result<Host> {
608        Host::try_init(self, database, replica_id).await
609    }
610}
611
612fn stored_program_hash(db: &RelationalDB) -> anyhow::Result<Option<Hash>> {
613    let meta = db.metadata()?;
614    Ok(meta.map(|meta| meta.program_hash))
615}
616
617async fn make_replica_ctx(
618    path: ReplicaDir,
619    database: Database,
620    replica_id: u64,
621    relational_db: Arc<RelationalDB>,
622) -> anyhow::Result<ReplicaContext> {
623    let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs())));
624    let send_worker_queue = spawn_send_worker(Some(database.database_identity));
625    let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::new(
626        send_worker_queue.clone(),
627    )));
628    let downgraded = Arc::downgrade(&subscriptions);
629    let subscriptions = ModuleSubscriptions::new(
630        relational_db.clone(),
631        subscriptions,
632        send_worker_queue,
633        database.owner_identity,
634    );
635
636    // If an error occurs when evaluating a subscription,
637    // we mark each client that was affected,
638    // and we remove those clients from the manager async.
639    tokio::spawn(async move {
640        loop {
641            tokio::time::sleep(Duration::from_secs(10)).await;
642            let Some(subscriptions) = downgraded.upgrade() else {
643                break;
644            };
645            // This should happen on the module thread, but we haven't created the module yet.
646            asyncify(move || subscriptions.write().remove_dropped_clients()).await
647        }
648    });
649
650    Ok(ReplicaContext {
651        database,
652        replica_id,
653        logger,
654        subscriptions,
655        relational_db,
656    })
657}
658
659/// Initialize a module host for the given program.
660/// The passed replica_ctx may not be configured for this version of the program's database schema yet.
661#[allow(clippy::too_many_arguments)]
662async fn make_module_host(
663    runtimes: Arc<HostRuntimes>,
664    host_type: HostType,
665    replica_ctx: Arc<ReplicaContext>,
666    scheduler: Scheduler,
667    program: Program,
668    energy_monitor: Arc<dyn EnergyMonitor>,
669    unregister: impl Fn() + Send + Sync + 'static,
670    core: JobCore,
671) -> anyhow::Result<(Program, ModuleHost)> {
672    // `make_actor` is blocking, as it needs to compile the wasm to native code,
673    // which may be computationally expensive - sometimes up to 1s for a large module.
674    // TODO: change back to using `spawn_rayon` here - asyncify runs on tokio blocking
675    //       threads, but those aren't for computation. Also, wasmtime uses rayon
676    //       to run compilation in parallel, so it'll need to run stuff in rayon anyway.
677    asyncify(move || {
678        let mcc = ModuleCreationContext {
679            replica_ctx,
680            scheduler,
681            program: &program,
682            energy_monitor,
683        };
684
685        let start = Instant::now();
686        let module_host = match host_type {
687            HostType::Wasm => {
688                let actor = runtimes.wasmtime.make_actor(mcc)?;
689                trace!("wasmtime::make_actor blocked for {:?}", start.elapsed());
690                ModuleHost::new(actor, unregister, core)
691            }
692            HostType::Js => {
693                let actor = runtimes.v8.make_actor(mcc)?;
694                trace!("v8::make_actor blocked for {:?}", start.elapsed());
695                ModuleHost::new(actor, unregister, core)
696            }
697        };
698        Ok((program, module_host))
699    })
700    .await
701}
702
703async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result<Program> {
704    let bytes = storage
705        .lookup(hash)
706        .await?
707        .with_context(|| format!("program {hash} not found"))?;
708    Ok(Program { hash, bytes })
709}
710
711struct LaunchedModule {
712    replica_ctx: Arc<ReplicaContext>,
713    module_host: ModuleHost,
714    scheduler: Scheduler,
715    scheduler_starter: SchedulerStarter,
716}
717
718#[allow(clippy::too_many_arguments)]
719async fn launch_module(
720    database: Database,
721    replica_id: u64,
722    program: Program,
723    on_panic: impl Fn() + Send + Sync + 'static,
724    relational_db: Arc<RelationalDB>,
725    energy_monitor: Arc<dyn EnergyMonitor>,
726    replica_dir: ReplicaDir,
727    runtimes: Arc<HostRuntimes>,
728    core: JobCore,
729) -> anyhow::Result<(Program, LaunchedModule)> {
730    let db_identity = database.database_identity;
731    let host_type = database.host_type;
732
733    let replica_ctx = make_replica_ctx(replica_dir, database, replica_id, relational_db)
734        .await
735        .map(Arc::new)?;
736    let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone());
737    let (program, module_host) = make_module_host(
738        runtimes.clone(),
739        host_type,
740        replica_ctx.clone(),
741        scheduler.clone(),
742        program,
743        energy_monitor.clone(),
744        on_panic,
745        core,
746    )
747    .await?;
748
749    trace!("launched database {} with program {}", db_identity, program.hash);
750
751    Ok((
752        program,
753        LaunchedModule {
754            replica_ctx,
755            module_host,
756            scheduler,
757            scheduler_starter,
758        },
759    ))
760}
761
762/// Update a module.
763///
764/// If the `db` is not initialized yet (i.e. its program hash is `None`),
765/// return an error.
766///
767/// Otherwise, if `db.program_hash` matches the given `program_hash`, do
768/// nothing and return an empty `UpdateDatabaseResult`.
769///
770/// Otherwise, invoke `module.update_database` and return the result.
771async fn update_module(
772    db: &RelationalDB,
773    module: &ModuleHost,
774    program: Program,
775    old_module_info: Arc<ModuleInfo>,
776) -> anyhow::Result<UpdateDatabaseResult> {
777    let addr = db.database_identity();
778    match stored_program_hash(db)? {
779        None => Err(anyhow!("database `{}` not yet initialized", addr)),
780        Some(stored) => {
781            let res = if stored == program.hash {
782                info!("database `{}` up to date with program `{}`", addr, program.hash);
783                UpdateDatabaseResult::NoUpdateNeeded
784            } else {
785                info!("updating `{}` from {} to {}", addr, stored, program.hash);
786                module.update_database(program, old_module_info).await?
787            };
788
789            Ok(res)
790        }
791    }
792}
793
794/// Encapsulates a database, associated module, and auxiliary state.
795struct Host {
796    /// The [`ModuleHost`], providing the callable reducer API.
797    ///
798    /// Modules may be updated via [`Host::update_module`].
799    /// The module is wrapped in a [`watch::Sender`] to allow for "hot swapping":
800    /// clients may subscribe to the channel, so they get the most recent
801    /// [`ModuleHost`] version or an error if the [`Host`] was dropped.
802    module: watch::Sender<ModuleHost>,
803    /// Pointer to the `module`'s [`ReplicaContext`].
804    ///
805    /// The database stays the same if and when the module is updated via
806    /// [`Host::update_module`].
807    replica_ctx: Arc<ReplicaContext>,
808    /// Scheduler for repeating reducers, operating on the current `module`.
809    scheduler: Scheduler,
810    /// Handle to the metrics collection task started via [`disk_monitor`].
811    ///
812    /// The task collects metrics from the `replica_ctx`, and so stays alive as long
813    /// as the `replica_ctx` is live. The task is aborted when [`Host`] is dropped.
814    disk_metrics_recorder_task: AbortHandle,
815    /// Handle to the task responsible for recording metrics for each transaction.
816    /// The task is aborted when [`Host`] is dropped.
817    tx_metrics_recorder_task: AbortHandle,
818}
819
820impl Host {
821    /// Attempt to instantiate a [`Host`] from persistent storage.
822    ///
823    /// Note that this does **not** run module initialization routines, but may
824    /// create on-disk artifacts if the host / database did not exist.
825    #[tracing::instrument(level = "debug", skip_all)]
826    async fn try_init(host_controller: &HostController, database: Database, replica_id: u64) -> anyhow::Result<Self> {
827        let HostController {
828            data_dir,
829            default_config: config,
830            program_storage,
831            energy_monitor,
832            runtimes,
833            durability,
834            page_pool,
835            ..
836        } = host_controller;
837        let on_panic = host_controller.unregister_fn(replica_id);
838        let replica_dir = data_dir.replica(replica_id);
839        let (tx_metrics_queue, tx_metrics_recorder_task) = spawn_tx_metrics_recorder();
840
841        let (db, connected_clients) = match config.storage {
842            db::Storage::Memory => RelationalDB::open(
843                &replica_dir,
844                database.database_identity,
845                database.owner_identity,
846                EmptyHistory::new(),
847                None,
848                None,
849                Some(tx_metrics_queue),
850                page_pool.clone(),
851            )?,
852            db::Storage::Disk => {
853                let snapshot_repo =
854                    relational_db::open_snapshot_repo(replica_dir.snapshots(), database.database_identity, replica_id)?;
855                let (history, _) = relational_db::local_durability(replica_dir.commit_log()).await?;
856                let (durability, start_snapshot_watcher) = durability.durability(replica_id).await?;
857
858                let (db, clients) = RelationalDB::open(
859                    &replica_dir,
860                    database.database_identity,
861                    database.owner_identity,
862                    history,
863                    Some(durability),
864                    Some(snapshot_repo),
865                    Some(tx_metrics_queue),
866                    page_pool.clone(),
867                )
868                // Make sure we log the source chain of the error
869                // as a single line, with the help of `anyhow`.
870                .map_err(anyhow::Error::from)
871                .inspect_err(|e| {
872                    tracing::error!(
873                        database = %database.database_identity,
874                        replica = replica_id,
875                        "Failed to open database: {e:#}"
876                    );
877                })?;
878                if let Some(start_snapshot_watcher) = start_snapshot_watcher {
879                    let watcher = db.subscribe_to_snapshots().expect("we passed snapshot_repo");
880                    start_snapshot_watcher(watcher)
881                }
882                (db, clients)
883            }
884        };
885        let (program, program_needs_init) = match db.program()? {
886            // Launch module with program from existing database.
887            Some(program) => (program, false),
888            // Database is empty, load program from external storage and run
889            // initialization.
890            None => (load_program(program_storage, database.initial_program).await?, true),
891        };
892
893        let (program, launched) = launch_module(
894            database,
895            replica_id,
896            program,
897            on_panic,
898            Arc::new(db),
899            energy_monitor.clone(),
900            replica_dir,
901            runtimes.clone(),
902            host_controller.db_cores.take(),
903        )
904        .await?;
905
906        if program_needs_init {
907            let call_result = launched.module_host.init_database(program).await?;
908            if let Some(call_result) = call_result {
909                Result::from(call_result)?;
910            }
911        } else {
912            drop(program)
913        }
914
915        let LaunchedModule {
916            replica_ctx,
917            module_host,
918            scheduler,
919            scheduler_starter,
920        } = launched;
921
922        // Disconnect dangling clients.
923        for (identity, connection_id) in connected_clients {
924            module_host
925                .call_identity_disconnected(identity, connection_id)
926                .await
927                .with_context(|| {
928                    format!(
929                        "Error calling disconnect for {} {} on {}",
930                        identity, connection_id, replica_ctx.database_identity
931                    )
932                })?;
933        }
934
935        scheduler_starter.start(&module_host)?;
936        let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
937
938        Ok(Host {
939            module: watch::Sender::new(module_host),
940            replica_ctx,
941            scheduler,
942            disk_metrics_recorder_task,
943            tx_metrics_recorder_task,
944        })
945    }
946
947    /// Construct an in-memory instance of `database` running `program`,
948    /// initialize it, then immediately destroy it.
949    ///
950    /// This is used during an initial, fresh publish operation
951    /// in order to check the `program`'s validity as a module,
952    /// since some validity checks we'd like to do (e.g. typechecking RLS filters)
953    /// require a fully instantiated database.
954    ///
955    /// This is not necessary during hotswap publishes,
956    /// as the automigration planner and executor accomplish the same validity checks.
957    async fn try_init_in_memory_to_check(
958        runtimes: &Arc<HostRuntimes>,
959        page_pool: PagePool,
960        database: Database,
961        program: Program,
962        core: JobCore,
963    ) -> anyhow::Result<Arc<ModuleInfo>> {
964        // Even in-memory databases acquire a lockfile.
965        // Grab a tempdir to put that lockfile in.
966        let phony_replica_dir = TempDir::with_prefix("spacetimedb-publish-in-memory-check")
967            .context("Error creating temporary directory to house temporary database during publish")?;
968
969        // Leave the `TempDir` instance in place, so that its destructor will still run.
970        let phony_replica_dir = ReplicaDir::from_path_unchecked(phony_replica_dir.path().to_owned());
971
972        let (db, _connected_clients) = RelationalDB::open(
973            &phony_replica_dir,
974            database.database_identity,
975            database.owner_identity,
976            EmptyHistory::new(),
977            None,
978            None,
979            None,
980            page_pool,
981        )?;
982
983        let (program, launched) = launch_module(
984            database,
985            0,
986            program,
987            // No need to register a callback here:
988            // proper publishes use it to unregister a panicked module,
989            // but this module is not registered in the first place.
990            || log::error!("launch_module on_panic called for temporary publish in-memory instance"),
991            Arc::new(db),
992            Arc::new(NullEnergyMonitor),
993            phony_replica_dir,
994            runtimes.clone(),
995            core,
996        )
997        .await?;
998
999        let call_result = launched.module_host.init_database(program).await?;
1000        if let Some(call_result) = call_result {
1001            Result::from(call_result)?;
1002        }
1003
1004        Ok(launched.module_host.info)
1005    }
1006
1007    /// Attempt to replace this [`Host`]'s [`ModuleHost`] with a new one running
1008    /// the program `program_hash`.
1009    ///
1010    /// The associated [`ReplicaContext`] stays the same.
1011    ///
1012    /// Executes [`ModuleHost::update_database`] on the newly instantiated
1013    /// module, updating the database schema and invoking the `__update__`
1014    /// reducer if it is defined.
1015    /// If this succeeds, the current module is replaced with the new one,
1016    /// otherwise it stays the same.
1017    ///
1018    /// Either way, the [`UpdateDatabaseResult`] is returned.
1019    async fn update_module(
1020        &mut self,
1021        runtimes: Arc<HostRuntimes>,
1022        host_type: HostType,
1023        program: Program,
1024        energy_monitor: Arc<dyn EnergyMonitor>,
1025        on_panic: impl Fn() + Send + Sync + 'static,
1026        core: JobCore,
1027    ) -> anyhow::Result<UpdateDatabaseResult> {
1028        let replica_ctx = &self.replica_ctx;
1029        let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone());
1030
1031        let (program, module) = make_module_host(
1032            runtimes,
1033            host_type,
1034            replica_ctx.clone(),
1035            scheduler.clone(),
1036            program,
1037            energy_monitor,
1038            on_panic,
1039            core,
1040        )
1041        .await?;
1042
1043        // Get the old module info to diff against when building a migration plan.
1044        let old_module_info = self.module.borrow().info.clone();
1045
1046        let update_result = update_module(&replica_ctx.relational_db, &module, program, old_module_info).await?;
1047        trace!("update result: {update_result:?}");
1048        // Only replace the module + scheduler if the update succeeded.
1049        // Otherwise, we want the database to continue running with the old state.
1050        if update_result.was_successful() {
1051            self.scheduler = scheduler;
1052            scheduler_starter.start(&module)?;
1053            let old_module = self.module.send_replace(module);
1054            old_module.exit().await;
1055        }
1056
1057        Ok(update_result)
1058    }
1059
1060    fn db(&self) -> &RelationalDB {
1061        &self.replica_ctx.relational_db
1062    }
1063}
1064
1065impl Drop for Host {
1066    fn drop(&mut self) {
1067        self.disk_metrics_recorder_task.abort();
1068        self.tx_metrics_recorder_task.abort();
1069    }
1070}
1071
1072const STORAGE_METERING_INTERVAL: Duration = Duration::from_secs(15);
1073
1074/// Periodically collect gauge stats and update prometheus metrics.
1075async fn metric_reporter(replica_ctx: Arc<ReplicaContext>) {
1076    // TODO: Consider adding a metric for heap usage.
1077    let message_log_size = DB_METRICS
1078        .message_log_size
1079        .with_label_values(&replica_ctx.database_identity);
1080    let module_log_file_size = DB_METRICS
1081        .module_log_file_size
1082        .with_label_values(&replica_ctx.database_identity);
1083
1084    loop {
1085        let ctx = replica_ctx.clone();
1086        // We spawn a blocking task here because this grabs blocking locks.
1087        let disk_usage_future = tokio::task::spawn_blocking(move || {
1088            ctx.update_gauges();
1089            ctx.total_disk_usage()
1090        });
1091        if let Ok(disk_usage) = disk_usage_future.await {
1092            if let Some(num_bytes) = disk_usage.durability {
1093                message_log_size.set(num_bytes as i64);
1094            }
1095            if let Some(num_bytes) = disk_usage.logs {
1096                module_log_file_size.set(num_bytes as i64);
1097            }
1098        }
1099        tokio::time::sleep(STORAGE_METERING_INTERVAL).await;
1100    }
1101}
1102
1103/// Extracts the schema from a given module.
1104///
1105/// Spins up a dummy host and returns the `ModuleDef` that it extracts.
1106pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> anyhow::Result<ModuleDef> {
1107    let owner_identity = Identity::from_u256(0xdcba_u32.into());
1108    let database_identity = Identity::from_u256(0xabcd_u32.into());
1109    let program = Program::from_bytes(program_bytes);
1110
1111    let database = Database {
1112        id: 0,
1113        database_identity,
1114        owner_identity,
1115        host_type,
1116        initial_program: program.hash,
1117    };
1118
1119    let runtimes = HostRuntimes::new(None);
1120    let page_pool = PagePool::new(None);
1121    let core = JobCore::default();
1122    let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?;
1123    // this should always succeed, but sometimes it doesn't
1124    let module_def = match Arc::try_unwrap(module_info) {
1125        Ok(info) => info.module_def,
1126        Err(info) => info.module_def.clone(),
1127    };
1128
1129    Ok(module_def)
1130}
1131
1132// Remove all gauges associated with a database.
1133// This is useful if a database is being deleted.
1134pub fn remove_database_gauges<'a, I>(db: &Identity, table_names: I)
1135where
1136    I: IntoIterator<Item = &'a str>,
1137{
1138    // Remove the per-table gauges.
1139    for table_name in table_names {
1140        let _ = DATA_SIZE_METRICS
1141            .data_size_table_num_rows
1142            .remove_label_values(db, table_name);
1143        let _ = DATA_SIZE_METRICS
1144            .data_size_table_bytes_used_by_rows
1145            .remove_label_values(db, table_name);
1146        let _ = DATA_SIZE_METRICS
1147            .data_size_table_num_rows_in_indexes
1148            .remove_label_values(db, table_name);
1149        let _ = DATA_SIZE_METRICS
1150            .data_size_table_bytes_used_by_index_keys
1151            .remove_label_values(db, table_name);
1152    }
1153    // Remove the per-db gauges.
1154    let _ = DATA_SIZE_METRICS.data_size_blob_store_num_blobs.remove_label_values(db);
1155    let _ = DATA_SIZE_METRICS
1156        .data_size_blob_store_bytes_used_by_blobs
1157        .remove_label_values(db);
1158    let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(db);
1159}