1use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule};
2use super::scheduler::SchedulerStarter;
3use super::wasmtime::WasmtimeRuntime;
4use super::{Scheduler, UpdateDatabaseResult};
5use crate::database_logger::DatabaseLogger;
6use crate::db::relational_db::{self, DiskSizeFn, RelationalDB, Txdata};
7use crate::db::{self, spawn_tx_metrics_recorder};
8use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor};
9use crate::host::module_host::ModuleRuntime as _;
10use crate::host::v8::V8Runtime;
11use crate::messages::control_db::{Database, HostType};
12use crate::module_host_context::ModuleCreationContext;
13use crate::replica_context::ReplicaContext;
14use crate::subscription::module_subscription_actor::ModuleSubscriptions;
15use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
16use crate::util::asyncify;
17use crate::util::jobs::{JobCore, JobCores};
18use crate::worker_metrics::WORKER_METRICS;
19use anyhow::{anyhow, ensure, Context};
20use async_trait::async_trait;
21use durability::{Durability, EmptyHistory};
22use log::{info, trace, warn};
23use parking_lot::Mutex;
24use spacetimedb_data_structures::map::IntMap;
25use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS;
26use spacetimedb_datastore::db_metrics::DB_METRICS;
27use spacetimedb_datastore::traits::Program;
28use spacetimedb_durability::{self as durability, TxOffset};
29use spacetimedb_lib::{hash_bytes, Identity};
30use spacetimedb_paths::server::{ReplicaDir, ServerDataDir};
31use spacetimedb_paths::FromPathUnchecked;
32use spacetimedb_sats::hash::Hash;
33use spacetimedb_schema::def::ModuleDef;
34use spacetimedb_table::page_pool::PagePool;
35use std::future::Future;
36use std::ops::Deref;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39use tempfile::TempDir;
40use tokio::sync::{watch, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock as AsyncRwLock};
41use tokio::task::AbortHandle;
42
43type HostCell = Arc<AsyncRwLock<Option<Host>>>;
49
50type Hosts = Arc<Mutex<IntMap<u64, HostCell>>>;
52
53pub type ExternalDurability = (Arc<dyn Durability<TxData = Txdata>>, DiskSizeFn);
54
55pub type StartSnapshotWatcher = Box<dyn FnOnce(watch::Receiver<TxOffset>)>;
56
57#[async_trait]
58pub trait DurabilityProvider: Send + Sync + 'static {
59 async fn durability(&self, replica_id: u64) -> anyhow::Result<(ExternalDurability, Option<StartSnapshotWatcher>)>;
60}
61
62#[async_trait]
63pub trait ExternalStorage: Send + Sync + 'static {
64 async fn lookup(&self, program_hash: Hash) -> anyhow::Result<Option<Box<[u8]>>>;
65}
66#[async_trait]
67impl<F, Fut> ExternalStorage for F
68where
69 F: Fn(Hash) -> Fut + Send + Sync + 'static,
70 Fut: Future<Output = anyhow::Result<Option<Box<[u8]>>>> + Send,
71{
72 async fn lookup(&self, program_hash: Hash) -> anyhow::Result<Option<Box<[u8]>>> {
73 self(program_hash).await
74 }
75}
76
77pub type ProgramStorage = Arc<dyn ExternalStorage>;
78
79#[derive(Clone)]
86pub struct HostController {
87 hosts: Hosts,
90 pub data_dir: Arc<ServerDataDir>,
92 default_config: db::Config,
95 program_storage: ProgramStorage,
97 energy_monitor: Arc<dyn EnergyMonitor>,
99 durability: Arc<dyn DurabilityProvider>,
101 pub page_pool: PagePool,
103 runtimes: Arc<HostRuntimes>,
105 db_cores: JobCores,
107}
108
109struct HostRuntimes {
110 wasmtime: WasmtimeRuntime,
111 v8: V8Runtime,
112}
113
114impl HostRuntimes {
115 fn new(data_dir: Option<&ServerDataDir>) -> Arc<Self> {
116 let wasmtime = WasmtimeRuntime::new(data_dir);
117 let v8 = V8Runtime::default();
118 Arc::new(Self { wasmtime, v8 })
119 }
120}
121
122#[derive(Clone, Debug)]
123pub struct ReducerCallResult {
124 pub outcome: ReducerOutcome,
125 pub energy_used: EnergyQuanta,
126 pub execution_duration: Duration,
127}
128
129impl ReducerCallResult {
130 pub fn is_err(&self) -> bool {
131 self.outcome.is_err()
132 }
133
134 pub fn is_ok(&self) -> bool {
135 !self.is_err()
136 }
137}
138
139impl From<ReducerCallResult> for Result<(), anyhow::Error> {
140 fn from(value: ReducerCallResult) -> Self {
141 value.outcome.into_result()
142 }
143}
144
145#[derive(Clone, Debug)]
146pub enum ReducerOutcome {
147 Committed,
148 Failed(String),
149 BudgetExceeded,
150}
151
152impl ReducerOutcome {
153 pub fn into_result(self) -> anyhow::Result<()> {
154 match self {
155 Self::Committed => Ok(()),
156 Self::Failed(e) => Err(anyhow::anyhow!(e)),
157 Self::BudgetExceeded => Err(anyhow::anyhow!("reducer ran out of energy")),
158 }
159 }
160
161 pub fn is_err(&self) -> bool {
162 !matches!(self, Self::Committed)
163 }
164}
165
166impl From<&EventStatus> for ReducerOutcome {
167 fn from(status: &EventStatus) -> Self {
168 match &status {
169 EventStatus::Committed(_) => ReducerOutcome::Committed,
170 EventStatus::Failed(e) => ReducerOutcome::Failed(e.clone()),
171 EventStatus::OutOfEnergy => ReducerOutcome::BudgetExceeded,
172 }
173 }
174}
175
176impl HostController {
177 pub fn new(
178 data_dir: Arc<ServerDataDir>,
179 default_config: db::Config,
180 program_storage: ProgramStorage,
181 energy_monitor: Arc<impl EnergyMonitor>,
182 durability: Arc<dyn DurabilityProvider>,
183 db_cores: JobCores,
184 ) -> Self {
185 Self {
186 hosts: <_>::default(),
187 default_config,
188 program_storage,
189 energy_monitor,
190 durability,
191 runtimes: HostRuntimes::new(Some(&data_dir)),
192 data_dir,
193 page_pool: PagePool::new(default_config.page_pool_max_size),
194 db_cores,
195 }
196 }
197
198 pub fn set_program_storage(&mut self, ps: ProgramStorage) {
200 self.program_storage = ps;
201 }
202
203 #[tracing::instrument(level = "trace", skip_all)]
222 pub async fn get_or_launch_module_host(&self, database: Database, replica_id: u64) -> anyhow::Result<ModuleHost> {
223 let mut rx = self.watch_maybe_launch_module_host(database, replica_id).await?;
224 let module = rx.borrow_and_update();
225 Ok(module.clone())
226 }
227
228 #[tracing::instrument(level = "trace", skip_all)]
236 pub async fn watch_maybe_launch_module_host(
237 &self,
238 database: Database,
239 replica_id: u64,
240 ) -> anyhow::Result<watch::Receiver<ModuleHost>> {
241 {
243 let guard = self.acquire_read_lock(replica_id).await;
244 if let Some(host) = &*guard {
245 trace!("cached host {}/{}", database.database_identity, replica_id);
246 return Ok(host.module.subscribe());
247 }
248 }
249
250 let mut guard = self.acquire_write_lock(replica_id).await;
254 if let Some(host) = &*guard {
255 trace!(
256 "cached host {}/{} (lock upgrade)",
257 database.database_identity,
258 replica_id
259 );
260 return Ok(host.module.subscribe());
261 }
262
263 trace!("launch host {}/{}", database.database_identity, replica_id);
264
265 let this = self.clone();
268
269 let rx = tokio::spawn(async move {
281 let host = this.try_init_host(database, replica_id).await?;
282
283 let rx = host.module.subscribe();
284 *guard = Some(host);
285
286 Ok::<_, anyhow::Error>(rx)
287 })
288 .await??;
289
290 Ok(rx)
291 }
292
293 pub async fn check_module_validity(&self, database: Database, program: Program) -> anyhow::Result<Arc<ModuleInfo>> {
304 Host::try_init_in_memory_to_check(
305 &self.runtimes,
306 self.page_pool.clone(),
307 database,
308 program,
309 self.db_cores.take(),
315 )
316 .await
317 }
318
319 #[tracing::instrument(level = "trace", skip_all)]
325 pub async fn using_database<F, T>(&self, database: Database, replica_id: u64, f: F) -> anyhow::Result<T>
326 where
327 F: FnOnce(&RelationalDB) -> T + Send + 'static,
328 T: Send + 'static,
329 {
330 trace!("using database {}/{}", database.database_identity, replica_id);
331 let module = self.get_or_launch_module_host(database, replica_id).await?;
332 let on_panic = self.unregister_fn(replica_id);
333 scopeguard::defer_on_unwind!({
334 warn!("database operation panicked");
335 on_panic();
336 });
337
338 let db = module.replica_ctx().relational_db.clone();
339 let result = module.on_module_thread("using_database", move || f(&db)).await?;
340 Ok(result)
341 }
342
343 #[tracing::instrument(level = "trace", skip_all, err)]
352 pub async fn update_module_host(
353 &self,
354 database: Database,
355 host_type: HostType,
356 replica_id: u64,
357 program_bytes: Box<[u8]>,
358 ) -> anyhow::Result<UpdateDatabaseResult> {
359 let program = Program {
360 hash: hash_bytes(&program_bytes),
361 bytes: program_bytes,
362 };
363 trace!(
364 "update module host {}/{}: genesis={} update-to={}",
365 database.database_identity,
366 replica_id,
367 database.initial_program,
368 program.hash
369 );
370
371 let mut guard = self.acquire_write_lock(replica_id).await;
372
373 let this = self.clone();
376
377 let update_result = tokio::spawn(async move {
392 let mut host = match guard.take() {
393 None => {
394 trace!("host not running, try_init");
395 this.try_init_host(database, replica_id).await?
396 }
397 Some(host) => {
398 trace!("host found, updating");
399 host
400 }
401 };
402 let update_result = host
403 .update_module(
404 this.runtimes.clone(),
405 host_type,
406 program,
407 this.energy_monitor.clone(),
408 this.unregister_fn(replica_id),
409 this.db_cores.take(),
410 )
411 .await?;
412
413 *guard = Some(host);
414 Ok::<_, anyhow::Error>(update_result)
415 })
416 .await??;
417
418 Ok(update_result)
419 }
420
421 pub async fn init_maybe_update_module_host(
441 &self,
442 database: Database,
443 replica_id: u64,
444 expected_hash: Option<Hash>,
445 ) -> anyhow::Result<watch::Receiver<ModuleHost>> {
446 trace!("custom bootstrap {}/{}", database.database_identity, replica_id);
447
448 let db_addr = database.database_identity;
449 let host_type = database.host_type;
450 let program_hash = database.initial_program;
451
452 let mut guard = self.acquire_write_lock(replica_id).await;
453
454 let this = self.clone();
457
458 let module = tokio::spawn(async move {
473 let mut host = match guard.take() {
474 Some(host) => host,
475 None => this.try_init_host(database, replica_id).await?,
476 };
477 let module = host.module.subscribe();
478
479 let stored_hash = stored_program_hash(host.db())?
485 .with_context(|| format!("[{db_addr}] database improperly initialized"))?;
486 if stored_hash == program_hash {
487 info!("[{db_addr}] database up-to-date with {program_hash}");
488 *guard = Some(host);
489 } else {
490 if let Some(expected_hash) = expected_hash {
491 ensure!(
492 expected_hash == stored_hash,
493 "[{}] expected program {} found {}",
494 db_addr,
495 expected_hash,
496 stored_hash
497 );
498 }
499 info!("[{db_addr}] updating database from `{stored_hash}` to `{program_hash}`");
500 let program = load_program(&this.program_storage, program_hash).await?;
501 let update_result = host
502 .update_module(
503 this.runtimes.clone(),
504 host_type,
505 program,
506 this.energy_monitor.clone(),
507 this.unregister_fn(replica_id),
508 this.db_cores.take(),
509 )
510 .await?;
511 match update_result {
512 UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {
513 *guard = Some(host);
514 }
515 UpdateDatabaseResult::AutoMigrateError(e) => {
516 return Err(anyhow::anyhow!(e));
517 }
518 UpdateDatabaseResult::ErrorExecutingMigration(e) => {
519 return Err(e);
520 }
521 }
522 }
523
524 Ok::<_, anyhow::Error>(module)
525 })
526 .await??;
527
528 Ok(module)
529 }
530
531 #[tracing::instrument(level = "trace", skip_all)]
534 pub async fn exit_module_host(&self, replica_id: u64) -> Result<(), anyhow::Error> {
535 trace!("exit module host {replica_id}");
536 let lock = self.hosts.lock().remove(&replica_id);
537 if let Some(lock) = lock {
538 if let Some(host) = lock.write_owned().await.take() {
539 let module = host.module.borrow().clone();
540 module.exit().await;
541 let table_names = module.info().module_def.tables().map(|t| t.name.deref());
542 remove_database_gauges(&module.info().database_identity, table_names);
543 }
544 }
545
546 Ok(())
547 }
548
549 #[tracing::instrument(level = "trace", skip_all)]
555 pub async fn get_module_host(&self, replica_id: u64) -> Result<ModuleHost, NoSuchModule> {
556 trace!("get module host {replica_id}");
557 let guard = self.acquire_read_lock(replica_id).await;
558 guard
559 .as_ref()
560 .map(|Host { module, .. }| module.borrow().clone())
561 .ok_or(NoSuchModule)
562 }
563
564 #[tracing::instrument(level = "trace", skip_all)]
570 pub async fn watch_module_host(&self, replica_id: u64) -> Result<watch::Receiver<ModuleHost>, NoSuchModule> {
571 trace!("watch module host {replica_id}");
572 let guard = self.acquire_read_lock(replica_id).await;
573 guard
574 .as_ref()
575 .map(|Host { module, .. }| module.subscribe())
576 .ok_or(NoSuchModule)
577 }
578
579 pub async fn has_module_host(&self, replica_id: u64) -> bool {
582 self.acquire_read_lock(replica_id).await.is_some()
583 }
584
585 fn unregister_fn(&self, replica_id: u64) -> impl Fn() + Send + Sync + 'static {
589 let hosts = Arc::downgrade(&self.hosts);
590 move || {
591 if let Some(hosts) = hosts.upgrade() {
592 hosts.lock().remove(&replica_id);
593 }
594 }
595 }
596
597 async fn acquire_write_lock(&self, replica_id: u64) -> OwnedRwLockWriteGuard<Option<Host>> {
598 let lock = self.hosts.lock().entry(replica_id).or_default().clone();
599 lock.write_owned().await
600 }
601
602 async fn acquire_read_lock(&self, replica_id: u64) -> OwnedRwLockReadGuard<Option<Host>> {
603 let lock = self.hosts.lock().entry(replica_id).or_default().clone();
604 lock.read_owned().await
605 }
606
607 async fn try_init_host(&self, database: Database, replica_id: u64) -> anyhow::Result<Host> {
608 Host::try_init(self, database, replica_id).await
609 }
610}
611
612fn stored_program_hash(db: &RelationalDB) -> anyhow::Result<Option<Hash>> {
613 let meta = db.metadata()?;
614 Ok(meta.map(|meta| meta.program_hash))
615}
616
617async fn make_replica_ctx(
618 path: ReplicaDir,
619 database: Database,
620 replica_id: u64,
621 relational_db: Arc<RelationalDB>,
622) -> anyhow::Result<ReplicaContext> {
623 let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs())));
624 let send_worker_queue = spawn_send_worker(Some(database.database_identity));
625 let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::new(
626 send_worker_queue.clone(),
627 )));
628 let downgraded = Arc::downgrade(&subscriptions);
629 let subscriptions = ModuleSubscriptions::new(
630 relational_db.clone(),
631 subscriptions,
632 send_worker_queue,
633 database.owner_identity,
634 );
635
636 tokio::spawn(async move {
640 loop {
641 tokio::time::sleep(Duration::from_secs(10)).await;
642 let Some(subscriptions) = downgraded.upgrade() else {
643 break;
644 };
645 asyncify(move || subscriptions.write().remove_dropped_clients()).await
647 }
648 });
649
650 Ok(ReplicaContext {
651 database,
652 replica_id,
653 logger,
654 subscriptions,
655 relational_db,
656 })
657}
658
659#[allow(clippy::too_many_arguments)]
662async fn make_module_host(
663 runtimes: Arc<HostRuntimes>,
664 host_type: HostType,
665 replica_ctx: Arc<ReplicaContext>,
666 scheduler: Scheduler,
667 program: Program,
668 energy_monitor: Arc<dyn EnergyMonitor>,
669 unregister: impl Fn() + Send + Sync + 'static,
670 core: JobCore,
671) -> anyhow::Result<(Program, ModuleHost)> {
672 asyncify(move || {
678 let mcc = ModuleCreationContext {
679 replica_ctx,
680 scheduler,
681 program: &program,
682 energy_monitor,
683 };
684
685 let start = Instant::now();
686 let module_host = match host_type {
687 HostType::Wasm => {
688 let actor = runtimes.wasmtime.make_actor(mcc)?;
689 trace!("wasmtime::make_actor blocked for {:?}", start.elapsed());
690 ModuleHost::new(actor, unregister, core)
691 }
692 HostType::Js => {
693 let actor = runtimes.v8.make_actor(mcc)?;
694 trace!("v8::make_actor blocked for {:?}", start.elapsed());
695 ModuleHost::new(actor, unregister, core)
696 }
697 };
698 Ok((program, module_host))
699 })
700 .await
701}
702
703async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result<Program> {
704 let bytes = storage
705 .lookup(hash)
706 .await?
707 .with_context(|| format!("program {hash} not found"))?;
708 Ok(Program { hash, bytes })
709}
710
711struct LaunchedModule {
712 replica_ctx: Arc<ReplicaContext>,
713 module_host: ModuleHost,
714 scheduler: Scheduler,
715 scheduler_starter: SchedulerStarter,
716}
717
718#[allow(clippy::too_many_arguments)]
719async fn launch_module(
720 database: Database,
721 replica_id: u64,
722 program: Program,
723 on_panic: impl Fn() + Send + Sync + 'static,
724 relational_db: Arc<RelationalDB>,
725 energy_monitor: Arc<dyn EnergyMonitor>,
726 replica_dir: ReplicaDir,
727 runtimes: Arc<HostRuntimes>,
728 core: JobCore,
729) -> anyhow::Result<(Program, LaunchedModule)> {
730 let db_identity = database.database_identity;
731 let host_type = database.host_type;
732
733 let replica_ctx = make_replica_ctx(replica_dir, database, replica_id, relational_db)
734 .await
735 .map(Arc::new)?;
736 let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone());
737 let (program, module_host) = make_module_host(
738 runtimes.clone(),
739 host_type,
740 replica_ctx.clone(),
741 scheduler.clone(),
742 program,
743 energy_monitor.clone(),
744 on_panic,
745 core,
746 )
747 .await?;
748
749 trace!("launched database {} with program {}", db_identity, program.hash);
750
751 Ok((
752 program,
753 LaunchedModule {
754 replica_ctx,
755 module_host,
756 scheduler,
757 scheduler_starter,
758 },
759 ))
760}
761
762async fn update_module(
772 db: &RelationalDB,
773 module: &ModuleHost,
774 program: Program,
775 old_module_info: Arc<ModuleInfo>,
776) -> anyhow::Result<UpdateDatabaseResult> {
777 let addr = db.database_identity();
778 match stored_program_hash(db)? {
779 None => Err(anyhow!("database `{}` not yet initialized", addr)),
780 Some(stored) => {
781 let res = if stored == program.hash {
782 info!("database `{}` up to date with program `{}`", addr, program.hash);
783 UpdateDatabaseResult::NoUpdateNeeded
784 } else {
785 info!("updating `{}` from {} to {}", addr, stored, program.hash);
786 module.update_database(program, old_module_info).await?
787 };
788
789 Ok(res)
790 }
791 }
792}
793
794struct Host {
796 module: watch::Sender<ModuleHost>,
803 replica_ctx: Arc<ReplicaContext>,
808 scheduler: Scheduler,
810 disk_metrics_recorder_task: AbortHandle,
815 tx_metrics_recorder_task: AbortHandle,
818}
819
820impl Host {
821 #[tracing::instrument(level = "debug", skip_all)]
826 async fn try_init(host_controller: &HostController, database: Database, replica_id: u64) -> anyhow::Result<Self> {
827 let HostController {
828 data_dir,
829 default_config: config,
830 program_storage,
831 energy_monitor,
832 runtimes,
833 durability,
834 page_pool,
835 ..
836 } = host_controller;
837 let on_panic = host_controller.unregister_fn(replica_id);
838 let replica_dir = data_dir.replica(replica_id);
839 let (tx_metrics_queue, tx_metrics_recorder_task) = spawn_tx_metrics_recorder();
840
841 let (db, connected_clients) = match config.storage {
842 db::Storage::Memory => RelationalDB::open(
843 &replica_dir,
844 database.database_identity,
845 database.owner_identity,
846 EmptyHistory::new(),
847 None,
848 None,
849 Some(tx_metrics_queue),
850 page_pool.clone(),
851 )?,
852 db::Storage::Disk => {
853 let snapshot_repo =
854 relational_db::open_snapshot_repo(replica_dir.snapshots(), database.database_identity, replica_id)?;
855 let (history, _) = relational_db::local_durability(replica_dir.commit_log()).await?;
856 let (durability, start_snapshot_watcher) = durability.durability(replica_id).await?;
857
858 let (db, clients) = RelationalDB::open(
859 &replica_dir,
860 database.database_identity,
861 database.owner_identity,
862 history,
863 Some(durability),
864 Some(snapshot_repo),
865 Some(tx_metrics_queue),
866 page_pool.clone(),
867 )
868 .map_err(anyhow::Error::from)
871 .inspect_err(|e| {
872 tracing::error!(
873 database = %database.database_identity,
874 replica = replica_id,
875 "Failed to open database: {e:#}"
876 );
877 })?;
878 if let Some(start_snapshot_watcher) = start_snapshot_watcher {
879 let watcher = db.subscribe_to_snapshots().expect("we passed snapshot_repo");
880 start_snapshot_watcher(watcher)
881 }
882 (db, clients)
883 }
884 };
885 let (program, program_needs_init) = match db.program()? {
886 Some(program) => (program, false),
888 None => (load_program(program_storage, database.initial_program).await?, true),
891 };
892
893 let (program, launched) = launch_module(
894 database,
895 replica_id,
896 program,
897 on_panic,
898 Arc::new(db),
899 energy_monitor.clone(),
900 replica_dir,
901 runtimes.clone(),
902 host_controller.db_cores.take(),
903 )
904 .await?;
905
906 if program_needs_init {
907 let call_result = launched.module_host.init_database(program).await?;
908 if let Some(call_result) = call_result {
909 Result::from(call_result)?;
910 }
911 } else {
912 drop(program)
913 }
914
915 let LaunchedModule {
916 replica_ctx,
917 module_host,
918 scheduler,
919 scheduler_starter,
920 } = launched;
921
922 for (identity, connection_id) in connected_clients {
924 module_host
925 .call_identity_disconnected(identity, connection_id)
926 .await
927 .with_context(|| {
928 format!(
929 "Error calling disconnect for {} {} on {}",
930 identity, connection_id, replica_ctx.database_identity
931 )
932 })?;
933 }
934
935 scheduler_starter.start(&module_host)?;
936 let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
937
938 Ok(Host {
939 module: watch::Sender::new(module_host),
940 replica_ctx,
941 scheduler,
942 disk_metrics_recorder_task,
943 tx_metrics_recorder_task,
944 })
945 }
946
947 async fn try_init_in_memory_to_check(
958 runtimes: &Arc<HostRuntimes>,
959 page_pool: PagePool,
960 database: Database,
961 program: Program,
962 core: JobCore,
963 ) -> anyhow::Result<Arc<ModuleInfo>> {
964 let phony_replica_dir = TempDir::with_prefix("spacetimedb-publish-in-memory-check")
967 .context("Error creating temporary directory to house temporary database during publish")?;
968
969 let phony_replica_dir = ReplicaDir::from_path_unchecked(phony_replica_dir.path().to_owned());
971
972 let (db, _connected_clients) = RelationalDB::open(
973 &phony_replica_dir,
974 database.database_identity,
975 database.owner_identity,
976 EmptyHistory::new(),
977 None,
978 None,
979 None,
980 page_pool,
981 )?;
982
983 let (program, launched) = launch_module(
984 database,
985 0,
986 program,
987 || log::error!("launch_module on_panic called for temporary publish in-memory instance"),
991 Arc::new(db),
992 Arc::new(NullEnergyMonitor),
993 phony_replica_dir,
994 runtimes.clone(),
995 core,
996 )
997 .await?;
998
999 let call_result = launched.module_host.init_database(program).await?;
1000 if let Some(call_result) = call_result {
1001 Result::from(call_result)?;
1002 }
1003
1004 Ok(launched.module_host.info)
1005 }
1006
1007 async fn update_module(
1020 &mut self,
1021 runtimes: Arc<HostRuntimes>,
1022 host_type: HostType,
1023 program: Program,
1024 energy_monitor: Arc<dyn EnergyMonitor>,
1025 on_panic: impl Fn() + Send + Sync + 'static,
1026 core: JobCore,
1027 ) -> anyhow::Result<UpdateDatabaseResult> {
1028 let replica_ctx = &self.replica_ctx;
1029 let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone());
1030
1031 let (program, module) = make_module_host(
1032 runtimes,
1033 host_type,
1034 replica_ctx.clone(),
1035 scheduler.clone(),
1036 program,
1037 energy_monitor,
1038 on_panic,
1039 core,
1040 )
1041 .await?;
1042
1043 let old_module_info = self.module.borrow().info.clone();
1045
1046 let update_result = update_module(&replica_ctx.relational_db, &module, program, old_module_info).await?;
1047 trace!("update result: {update_result:?}");
1048 if update_result.was_successful() {
1051 self.scheduler = scheduler;
1052 scheduler_starter.start(&module)?;
1053 let old_module = self.module.send_replace(module);
1054 old_module.exit().await;
1055 }
1056
1057 Ok(update_result)
1058 }
1059
1060 fn db(&self) -> &RelationalDB {
1061 &self.replica_ctx.relational_db
1062 }
1063}
1064
1065impl Drop for Host {
1066 fn drop(&mut self) {
1067 self.disk_metrics_recorder_task.abort();
1068 self.tx_metrics_recorder_task.abort();
1069 }
1070}
1071
1072const STORAGE_METERING_INTERVAL: Duration = Duration::from_secs(15);
1073
1074async fn metric_reporter(replica_ctx: Arc<ReplicaContext>) {
1076 let message_log_size = DB_METRICS
1078 .message_log_size
1079 .with_label_values(&replica_ctx.database_identity);
1080 let module_log_file_size = DB_METRICS
1081 .module_log_file_size
1082 .with_label_values(&replica_ctx.database_identity);
1083
1084 loop {
1085 let ctx = replica_ctx.clone();
1086 let disk_usage_future = tokio::task::spawn_blocking(move || {
1088 ctx.update_gauges();
1089 ctx.total_disk_usage()
1090 });
1091 if let Ok(disk_usage) = disk_usage_future.await {
1092 if let Some(num_bytes) = disk_usage.durability {
1093 message_log_size.set(num_bytes as i64);
1094 }
1095 if let Some(num_bytes) = disk_usage.logs {
1096 module_log_file_size.set(num_bytes as i64);
1097 }
1098 }
1099 tokio::time::sleep(STORAGE_METERING_INTERVAL).await;
1100 }
1101}
1102
1103pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> anyhow::Result<ModuleDef> {
1107 let owner_identity = Identity::from_u256(0xdcba_u32.into());
1108 let database_identity = Identity::from_u256(0xabcd_u32.into());
1109 let program = Program::from_bytes(program_bytes);
1110
1111 let database = Database {
1112 id: 0,
1113 database_identity,
1114 owner_identity,
1115 host_type,
1116 initial_program: program.hash,
1117 };
1118
1119 let runtimes = HostRuntimes::new(None);
1120 let page_pool = PagePool::new(None);
1121 let core = JobCore::default();
1122 let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?;
1123 let module_def = match Arc::try_unwrap(module_info) {
1125 Ok(info) => info.module_def,
1126 Err(info) => info.module_def.clone(),
1127 };
1128
1129 Ok(module_def)
1130}
1131
1132pub fn remove_database_gauges<'a, I>(db: &Identity, table_names: I)
1135where
1136 I: IntoIterator<Item = &'a str>,
1137{
1138 for table_name in table_names {
1140 let _ = DATA_SIZE_METRICS
1141 .data_size_table_num_rows
1142 .remove_label_values(db, table_name);
1143 let _ = DATA_SIZE_METRICS
1144 .data_size_table_bytes_used_by_rows
1145 .remove_label_values(db, table_name);
1146 let _ = DATA_SIZE_METRICS
1147 .data_size_table_num_rows_in_indexes
1148 .remove_label_values(db, table_name);
1149 let _ = DATA_SIZE_METRICS
1150 .data_size_table_bytes_used_by_index_keys
1151 .remove_label_values(db, table_name);
1152 }
1153 let _ = DATA_SIZE_METRICS.data_size_blob_store_num_blobs.remove_label_values(db);
1155 let _ = DATA_SIZE_METRICS
1156 .data_size_blob_store_bytes_used_by_blobs
1157 .remove_label_values(db);
1158 let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(db);
1159}