1use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule};
2use super::scheduler::SchedulerStarter;
3use super::wasmtime::WasmtimeRuntime;
4use super::{Scheduler, UpdateDatabaseResult};
5use crate::database_logger::DatabaseLogger;
6use crate::db::datastore::traits::Program;
7use crate::db::db_metrics::data_size::DATA_SIZE_METRICS;
8use crate::db::db_metrics::DB_METRICS;
9use crate::db::relational_db::{self, DiskSizeFn, RelationalDB, Txdata};
10use crate::db::{self};
11use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor};
12use crate::messages::control_db::{Database, HostType};
13use crate::module_host_context::ModuleCreationContext;
14use crate::replica_context::ReplicaContext;
15use crate::subscription::module_subscription_actor::ModuleSubscriptions;
16use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
17use crate::util::asyncify;
18use crate::util::jobs::{JobCore, JobCores};
19use crate::worker_metrics::WORKER_METRICS;
20use anyhow::{anyhow, ensure, Context};
21use async_trait::async_trait;
22use durability::{Durability, EmptyHistory};
23use log::{info, trace, warn};
24use parking_lot::Mutex;
25use spacetimedb_data_structures::map::IntMap;
26use spacetimedb_durability::{self as durability, TxOffset};
27use spacetimedb_lib::{hash_bytes, Identity};
28use spacetimedb_paths::server::{ReplicaDir, ServerDataDir};
29use spacetimedb_paths::FromPathUnchecked;
30use spacetimedb_sats::hash::Hash;
31use spacetimedb_schema::def::ModuleDef;
32use spacetimedb_table::page_pool::PagePool;
33use std::future::Future;
34use std::ops::Deref;
35use std::sync::Arc;
36use std::time::{Duration, Instant};
37use tempfile::TempDir;
38use tokio::sync::{watch, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock as AsyncRwLock};
39use tokio::task::AbortHandle;
40
41type HostCell = Arc<AsyncRwLock<Option<Host>>>;
47
48type Hosts = Arc<Mutex<IntMap<u64, HostCell>>>;
50
51pub type ExternalDurability = (Arc<dyn Durability<TxData = Txdata>>, DiskSizeFn);
52
53pub type StartSnapshotWatcher = Box<dyn FnOnce(watch::Receiver<TxOffset>)>;
54
55#[async_trait]
56pub trait DurabilityProvider: Send + Sync + 'static {
57 async fn durability(&self, replica_id: u64) -> anyhow::Result<(ExternalDurability, Option<StartSnapshotWatcher>)>;
58}
59
60#[async_trait]
61pub trait ExternalStorage: Send + Sync + 'static {
62 async fn lookup(&self, program_hash: Hash) -> anyhow::Result<Option<Box<[u8]>>>;
63}
64#[async_trait]
65impl<F, Fut> ExternalStorage for F
66where
67 F: Fn(Hash) -> Fut + Send + Sync + 'static,
68 Fut: Future<Output = anyhow::Result<Option<Box<[u8]>>>> + Send,
69{
70 async fn lookup(&self, program_hash: Hash) -> anyhow::Result<Option<Box<[u8]>>> {
71 self(program_hash).await
72 }
73}
74
75pub type ProgramStorage = Arc<dyn ExternalStorage>;
76
77#[derive(Clone)]
84pub struct HostController {
85 hosts: Hosts,
88 pub data_dir: Arc<ServerDataDir>,
90 default_config: db::Config,
93 program_storage: ProgramStorage,
95 energy_monitor: Arc<dyn EnergyMonitor>,
97 durability: Arc<dyn DurabilityProvider>,
99 pub page_pool: PagePool,
101 runtimes: Arc<HostRuntimes>,
103 db_cores: JobCores,
105}
106
107struct HostRuntimes {
108 wasmtime: WasmtimeRuntime,
109}
110
111impl HostRuntimes {
112 fn new(data_dir: Option<&ServerDataDir>) -> Arc<Self> {
113 let wasmtime = WasmtimeRuntime::new(data_dir);
114 Arc::new(Self { wasmtime })
115 }
116}
117
118#[derive(Clone, Debug)]
119pub struct ReducerCallResult {
120 pub outcome: ReducerOutcome,
121 pub energy_used: EnergyQuanta,
122 pub execution_duration: Duration,
123}
124
125impl ReducerCallResult {
126 pub fn is_err(&self) -> bool {
127 self.outcome.is_err()
128 }
129
130 pub fn is_ok(&self) -> bool {
131 !self.is_err()
132 }
133}
134
135impl From<ReducerCallResult> for Result<(), anyhow::Error> {
136 fn from(value: ReducerCallResult) -> Self {
137 value.outcome.into_result()
138 }
139}
140
141#[derive(Clone, Debug)]
142pub enum ReducerOutcome {
143 Committed,
144 Failed(String),
145 BudgetExceeded,
146}
147
148impl ReducerOutcome {
149 pub fn into_result(self) -> anyhow::Result<()> {
150 match self {
151 Self::Committed => Ok(()),
152 Self::Failed(e) => Err(anyhow::anyhow!(e)),
153 Self::BudgetExceeded => Err(anyhow::anyhow!("reducer ran out of energy")),
154 }
155 }
156
157 pub fn is_err(&self) -> bool {
158 !matches!(self, Self::Committed)
159 }
160}
161
162impl From<&EventStatus> for ReducerOutcome {
163 fn from(status: &EventStatus) -> Self {
164 match &status {
165 EventStatus::Committed(_) => ReducerOutcome::Committed,
166 EventStatus::Failed(e) => ReducerOutcome::Failed(e.clone()),
167 EventStatus::OutOfEnergy => ReducerOutcome::BudgetExceeded,
168 }
169 }
170}
171
172impl HostController {
173 pub fn new(
174 data_dir: Arc<ServerDataDir>,
175 default_config: db::Config,
176 program_storage: ProgramStorage,
177 energy_monitor: Arc<impl EnergyMonitor>,
178 durability: Arc<dyn DurabilityProvider>,
179 db_cores: JobCores,
180 ) -> Self {
181 Self {
182 hosts: <_>::default(),
183 default_config,
184 program_storage,
185 energy_monitor,
186 durability,
187 runtimes: HostRuntimes::new(Some(&data_dir)),
188 data_dir,
189 page_pool: PagePool::new(default_config.page_pool_max_size),
190 db_cores,
191 }
192 }
193
194 pub fn set_program_storage(&mut self, ps: ProgramStorage) {
196 self.program_storage = ps;
197 }
198
199 #[tracing::instrument(level = "trace", skip_all)]
218 pub async fn get_or_launch_module_host(&self, database: Database, replica_id: u64) -> anyhow::Result<ModuleHost> {
219 let mut rx = self.watch_maybe_launch_module_host(database, replica_id).await?;
220 let module = rx.borrow_and_update();
221 Ok(module.clone())
222 }
223
224 #[tracing::instrument(level = "trace", skip_all)]
232 pub async fn watch_maybe_launch_module_host(
233 &self,
234 database: Database,
235 replica_id: u64,
236 ) -> anyhow::Result<watch::Receiver<ModuleHost>> {
237 {
239 let guard = self.acquire_read_lock(replica_id).await;
240 if let Some(host) = &*guard {
241 trace!("cached host {}/{}", database.database_identity, replica_id);
242 return Ok(host.module.subscribe());
243 }
244 }
245
246 let mut guard = self.acquire_write_lock(replica_id).await;
250 if let Some(host) = &*guard {
251 trace!(
252 "cached host {}/{} (lock upgrade)",
253 database.database_identity,
254 replica_id
255 );
256 return Ok(host.module.subscribe());
257 }
258
259 trace!("launch host {}/{}", database.database_identity, replica_id);
260
261 let this = self.clone();
264
265 let rx = tokio::spawn(async move {
277 let host = this.try_init_host(database, replica_id).await?;
278
279 let rx = host.module.subscribe();
280 *guard = Some(host);
281
282 Ok::<_, anyhow::Error>(rx)
283 })
284 .await??;
285
286 Ok(rx)
287 }
288
289 pub async fn check_module_validity(&self, database: Database, program: Program) -> anyhow::Result<Arc<ModuleInfo>> {
300 Host::try_init_in_memory_to_check(
301 &self.runtimes,
302 self.page_pool.clone(),
303 database,
304 program,
305 self.db_cores.take(),
311 )
312 .await
313 }
314
315 #[tracing::instrument(level = "trace", skip_all)]
321 pub async fn using_database<F, T>(&self, database: Database, replica_id: u64, f: F) -> anyhow::Result<T>
322 where
323 F: FnOnce(&RelationalDB) -> T + Send + 'static,
324 T: Send + 'static,
325 {
326 trace!("using database {}/{}", database.database_identity, replica_id);
327 let module = self.get_or_launch_module_host(database, replica_id).await?;
328 let on_panic = self.unregister_fn(replica_id);
329 scopeguard::defer_on_unwind!({
330 warn!("database operation panicked");
331 on_panic();
332 });
333 let result = asyncify(move || f(&module.replica_ctx().relational_db)).await;
334 Ok(result)
335 }
336
337 #[tracing::instrument(level = "trace", skip_all, err)]
346 pub async fn update_module_host(
347 &self,
348 database: Database,
349 host_type: HostType,
350 replica_id: u64,
351 program_bytes: Box<[u8]>,
352 ) -> anyhow::Result<UpdateDatabaseResult> {
353 let program = Program {
354 hash: hash_bytes(&program_bytes),
355 bytes: program_bytes,
356 };
357 trace!(
358 "update module host {}/{}: genesis={} update-to={}",
359 database.database_identity,
360 replica_id,
361 database.initial_program,
362 program.hash
363 );
364
365 let mut guard = self.acquire_write_lock(replica_id).await;
366
367 let this = self.clone();
370
371 let update_result = tokio::spawn(async move {
386 let mut host = match guard.take() {
387 None => {
388 trace!("host not running, try_init");
389 this.try_init_host(database, replica_id).await?
390 }
391 Some(host) => {
392 trace!("host found, updating");
393 host
394 }
395 };
396 let update_result = host
397 .update_module(
398 this.runtimes.clone(),
399 host_type,
400 program,
401 this.energy_monitor.clone(),
402 this.unregister_fn(replica_id),
403 this.db_cores.take(),
404 )
405 .await?;
406
407 *guard = Some(host);
408 Ok::<_, anyhow::Error>(update_result)
409 })
410 .await??;
411
412 Ok(update_result)
413 }
414
415 pub async fn init_maybe_update_module_host(
435 &self,
436 database: Database,
437 replica_id: u64,
438 expected_hash: Option<Hash>,
439 ) -> anyhow::Result<watch::Receiver<ModuleHost>> {
440 trace!("custom bootstrap {}/{}", database.database_identity, replica_id);
441
442 let db_addr = database.database_identity;
443 let host_type = database.host_type;
444 let program_hash = database.initial_program;
445
446 let mut guard = self.acquire_write_lock(replica_id).await;
447
448 let this = self.clone();
451
452 let module = tokio::spawn(async move {
467 let mut host = match guard.take() {
468 Some(host) => host,
469 None => this.try_init_host(database, replica_id).await?,
470 };
471 let module = host.module.subscribe();
472
473 let stored_hash = stored_program_hash(host.db())?
479 .with_context(|| format!("[{}] database improperly initialized", db_addr))?;
480 if stored_hash == program_hash {
481 info!("[{}] database up-to-date with {}", db_addr, program_hash);
482 *guard = Some(host);
483 } else {
484 if let Some(expected_hash) = expected_hash {
485 ensure!(
486 expected_hash == stored_hash,
487 "[{}] expected program {} found {}",
488 db_addr,
489 expected_hash,
490 stored_hash
491 );
492 }
493 info!(
494 "[{}] updating database from `{}` to `{}`",
495 db_addr, stored_hash, program_hash
496 );
497 let program = load_program(&this.program_storage, program_hash).await?;
498 let update_result = host
499 .update_module(
500 this.runtimes.clone(),
501 host_type,
502 program,
503 this.energy_monitor.clone(),
504 this.unregister_fn(replica_id),
505 this.db_cores.take(),
506 )
507 .await?;
508 match update_result {
509 UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {
510 *guard = Some(host);
511 }
512 UpdateDatabaseResult::AutoMigrateError(e) => {
513 return Err(anyhow::anyhow!(e));
514 }
515 UpdateDatabaseResult::ErrorExecutingMigration(e) => {
516 return Err(e);
517 }
518 }
519 }
520
521 Ok::<_, anyhow::Error>(module)
522 })
523 .await??;
524
525 Ok(module)
526 }
527
528 #[tracing::instrument(level = "trace", skip_all)]
531 pub async fn exit_module_host(&self, replica_id: u64) -> Result<(), anyhow::Error> {
532 trace!("exit module host {}", replica_id);
533 let lock = self.hosts.lock().remove(&replica_id);
534 if let Some(lock) = lock {
535 if let Some(host) = lock.write_owned().await.take() {
536 let module = host.module.borrow().clone();
537 module.exit().await;
538 let table_names = module.info().module_def.tables().map(|t| t.name.deref());
539 remove_database_gauges(&module.info().database_identity, table_names);
540 }
541 }
542
543 Ok(())
544 }
545
546 #[tracing::instrument(level = "trace", skip_all)]
552 pub async fn get_module_host(&self, replica_id: u64) -> Result<ModuleHost, NoSuchModule> {
553 trace!("get module host {}", replica_id);
554 let guard = self.acquire_read_lock(replica_id).await;
555 guard
556 .as_ref()
557 .map(|Host { module, .. }| module.borrow().clone())
558 .ok_or(NoSuchModule)
559 }
560
561 #[tracing::instrument(level = "trace", skip_all)]
567 pub async fn watch_module_host(&self, replica_id: u64) -> Result<watch::Receiver<ModuleHost>, NoSuchModule> {
568 trace!("watch module host {}", replica_id);
569 let guard = self.acquire_read_lock(replica_id).await;
570 guard
571 .as_ref()
572 .map(|Host { module, .. }| module.subscribe())
573 .ok_or(NoSuchModule)
574 }
575
576 pub async fn has_module_host(&self, replica_id: u64) -> bool {
579 self.acquire_read_lock(replica_id).await.is_some()
580 }
581
582 fn unregister_fn(&self, replica_id: u64) -> impl Fn() + Send + Sync + 'static {
586 let hosts = Arc::downgrade(&self.hosts);
587 move || {
588 if let Some(hosts) = hosts.upgrade() {
589 hosts.lock().remove(&replica_id);
590 }
591 }
592 }
593
594 async fn acquire_write_lock(&self, replica_id: u64) -> OwnedRwLockWriteGuard<Option<Host>> {
595 let lock = self.hosts.lock().entry(replica_id).or_default().clone();
596 lock.write_owned().await
597 }
598
599 async fn acquire_read_lock(&self, replica_id: u64) -> OwnedRwLockReadGuard<Option<Host>> {
600 let lock = self.hosts.lock().entry(replica_id).or_default().clone();
601 lock.read_owned().await
602 }
603
604 async fn try_init_host(&self, database: Database, replica_id: u64) -> anyhow::Result<Host> {
605 Host::try_init(self, database, replica_id).await
606 }
607}
608
609fn stored_program_hash(db: &RelationalDB) -> anyhow::Result<Option<Hash>> {
610 let meta = db.metadata()?;
611 Ok(meta.map(|meta| meta.program_hash))
612}
613
614async fn make_replica_ctx(
615 path: ReplicaDir,
616 database: Database,
617 replica_id: u64,
618 relational_db: Arc<RelationalDB>,
619) -> anyhow::Result<ReplicaContext> {
620 let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs())));
621 let send_worker_queue = spawn_send_worker(Some(database.database_identity));
622 let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::new(
623 send_worker_queue.clone(),
624 )));
625 let downgraded = Arc::downgrade(&subscriptions);
626 let subscriptions = ModuleSubscriptions::new(
627 relational_db.clone(),
628 subscriptions,
629 send_worker_queue,
630 database.owner_identity,
631 );
632
633 tokio::spawn(async move {
637 loop {
638 tokio::time::sleep(Duration::from_secs(10)).await;
639 let Some(subscriptions) = downgraded.upgrade() else {
640 break;
641 };
642 asyncify(move || subscriptions.write().remove_dropped_clients()).await
643 }
644 });
645
646 Ok(ReplicaContext {
647 database,
648 replica_id,
649 logger,
650 subscriptions,
651 relational_db,
652 })
653}
654
655#[allow(clippy::too_many_arguments)]
658async fn make_module_host(
659 runtimes: Arc<HostRuntimes>,
660 host_type: HostType,
661 replica_ctx: Arc<ReplicaContext>,
662 scheduler: Scheduler,
663 program: Program,
664 energy_monitor: Arc<dyn EnergyMonitor>,
665 unregister: impl Fn() + Send + Sync + 'static,
666 core: JobCore,
667) -> anyhow::Result<(Program, ModuleHost)> {
668 asyncify(move || {
674 let module_host = match host_type {
675 HostType::Wasm => {
676 let mcc = ModuleCreationContext {
677 replica_ctx,
678 scheduler,
679 program: &program,
680 energy_monitor,
681 };
682 let start = Instant::now();
683 let actor = runtimes.wasmtime.make_actor(mcc)?;
684 trace!("wasmtime::make_actor blocked for {:?}", start.elapsed());
685 ModuleHost::new(actor, unregister, core)
686 }
687 };
688 Ok((program, module_host))
689 })
690 .await
691}
692
693async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result<Program> {
694 let bytes = storage
695 .lookup(hash)
696 .await?
697 .with_context(|| format!("program {} not found", hash))?;
698 Ok(Program { hash, bytes })
699}
700
701struct LaunchedModule {
702 replica_ctx: Arc<ReplicaContext>,
703 module_host: ModuleHost,
704 scheduler: Scheduler,
705 scheduler_starter: SchedulerStarter,
706}
707
708#[allow(clippy::too_many_arguments)]
709async fn launch_module(
710 database: Database,
711 replica_id: u64,
712 program: Program,
713 on_panic: impl Fn() + Send + Sync + 'static,
714 relational_db: Arc<RelationalDB>,
715 energy_monitor: Arc<dyn EnergyMonitor>,
716 replica_dir: ReplicaDir,
717 runtimes: Arc<HostRuntimes>,
718 core: JobCore,
719) -> anyhow::Result<(Program, LaunchedModule)> {
720 let db_identity = database.database_identity;
721 let host_type = database.host_type;
722
723 let replica_ctx = make_replica_ctx(replica_dir, database, replica_id, relational_db)
724 .await
725 .map(Arc::new)?;
726 let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone());
727 let (program, module_host) = make_module_host(
728 runtimes.clone(),
729 host_type,
730 replica_ctx.clone(),
731 scheduler.clone(),
732 program,
733 energy_monitor.clone(),
734 on_panic,
735 core,
736 )
737 .await?;
738
739 trace!("launched database {} with program {}", db_identity, program.hash);
740
741 Ok((
742 program,
743 LaunchedModule {
744 replica_ctx,
745 module_host,
746 scheduler,
747 scheduler_starter,
748 },
749 ))
750}
751
752async fn update_module(
762 db: &RelationalDB,
763 module: &ModuleHost,
764 program: Program,
765 old_module_info: Arc<ModuleInfo>,
766) -> anyhow::Result<UpdateDatabaseResult> {
767 let addr = db.database_identity();
768 match stored_program_hash(db)? {
769 None => Err(anyhow!("database `{}` not yet initialized", addr)),
770 Some(stored) => {
771 let res = if stored == program.hash {
772 info!("database `{}` up to date with program `{}`", addr, program.hash);
773 UpdateDatabaseResult::NoUpdateNeeded
774 } else {
775 info!("updating `{}` from {} to {}", addr, stored, program.hash);
776 module.update_database(program, old_module_info).await?
777 };
778
779 Ok(res)
780 }
781 }
782}
783
784struct Host {
786 module: watch::Sender<ModuleHost>,
793 replica_ctx: Arc<ReplicaContext>,
798 scheduler: Scheduler,
800 metrics_task: AbortHandle,
805}
806
807impl Host {
808 #[tracing::instrument(level = "debug", skip_all)]
813 async fn try_init(host_controller: &HostController, database: Database, replica_id: u64) -> anyhow::Result<Self> {
814 let HostController {
815 data_dir,
816 default_config: config,
817 program_storage,
818 energy_monitor,
819 runtimes,
820 durability,
821 page_pool,
822 ..
823 } = host_controller;
824 let on_panic = host_controller.unregister_fn(replica_id);
825 let replica_dir = data_dir.replica(replica_id);
826
827 let (db, connected_clients) = match config.storage {
828 db::Storage::Memory => RelationalDB::open(
829 &replica_dir,
830 database.database_identity,
831 database.owner_identity,
832 EmptyHistory::new(),
833 None,
834 None,
835 page_pool.clone(),
836 )?,
837 db::Storage::Disk => {
838 let snapshot_repo =
839 relational_db::open_snapshot_repo(replica_dir.snapshots(), database.database_identity, replica_id)?;
840 let (history, _) = relational_db::local_durability(replica_dir.commit_log()).await?;
841 let (durability, start_snapshot_watcher) = durability.durability(replica_id).await?;
842
843 let (db, clients) = RelationalDB::open(
844 &replica_dir,
845 database.database_identity,
846 database.owner_identity,
847 history,
848 Some(durability),
849 Some(snapshot_repo),
850 page_pool.clone(),
851 )
852 .map_err(anyhow::Error::from)
855 .inspect_err(|e| {
856 tracing::error!(
857 database = %database.database_identity,
858 replica = replica_id,
859 "Failed to open database: {e:#}"
860 );
861 })?;
862 if let Some(start_snapshot_watcher) = start_snapshot_watcher {
863 let watcher = db.subscribe_to_snapshots().expect("we passed snapshot_repo");
864 start_snapshot_watcher(watcher)
865 }
866 (db, clients)
867 }
868 };
869 let (program, program_needs_init) = match db.program()? {
870 Some(program) => (program, false),
872 None => (load_program(program_storage, database.initial_program).await?, true),
875 };
876
877 let (program, launched) = launch_module(
878 database,
879 replica_id,
880 program,
881 on_panic,
882 Arc::new(db),
883 energy_monitor.clone(),
884 replica_dir,
885 runtimes.clone(),
886 host_controller.db_cores.take(),
887 )
888 .await?;
889
890 if program_needs_init {
891 let call_result = launched.module_host.init_database(program).await?;
892 if let Some(call_result) = call_result {
893 Result::from(call_result)?;
894 }
895 } else {
896 drop(program)
897 }
898
899 let LaunchedModule {
900 replica_ctx,
901 module_host,
902 scheduler,
903 scheduler_starter,
904 } = launched;
905
906 for (identity, connection_id) in connected_clients {
908 module_host
909 .call_identity_disconnected(identity, connection_id)
910 .await
911 .with_context(|| {
912 format!(
913 "Error calling disconnect for {} {} on {}",
914 identity, connection_id, replica_ctx.database_identity
915 )
916 })?;
917 }
918
919 scheduler_starter.start(&module_host)?;
920 let metrics_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
921
922 Ok(Host {
923 module: watch::Sender::new(module_host),
924 replica_ctx,
925 scheduler,
926 metrics_task,
927 })
928 }
929
930 async fn try_init_in_memory_to_check(
941 runtimes: &Arc<HostRuntimes>,
942 page_pool: PagePool,
943 database: Database,
944 program: Program,
945 core: JobCore,
946 ) -> anyhow::Result<Arc<ModuleInfo>> {
947 let phony_replica_dir = TempDir::with_prefix("spacetimedb-publish-in-memory-check")
950 .context("Error creating temporary directory to house temporary database during publish")?;
951
952 let phony_replica_dir = ReplicaDir::from_path_unchecked(phony_replica_dir.path().to_owned());
954
955 let (db, _connected_clients) = RelationalDB::open(
956 &phony_replica_dir,
957 database.database_identity,
958 database.owner_identity,
959 EmptyHistory::new(),
960 None,
961 None,
962 page_pool,
963 )?;
964
965 let (program, launched) = launch_module(
966 database,
967 0,
968 program,
969 || log::error!("launch_module on_panic called for temporary publish in-memory instance"),
973 Arc::new(db),
974 Arc::new(NullEnergyMonitor),
975 phony_replica_dir,
976 runtimes.clone(),
977 core,
978 )
979 .await?;
980
981 let call_result = launched.module_host.init_database(program).await?;
982 if let Some(call_result) = call_result {
983 Result::from(call_result)?;
984 }
985
986 Ok(launched.module_host.info)
987 }
988
989 async fn update_module(
1002 &mut self,
1003 runtimes: Arc<HostRuntimes>,
1004 host_type: HostType,
1005 program: Program,
1006 energy_monitor: Arc<dyn EnergyMonitor>,
1007 on_panic: impl Fn() + Send + Sync + 'static,
1008 core: JobCore,
1009 ) -> anyhow::Result<UpdateDatabaseResult> {
1010 let replica_ctx = &self.replica_ctx;
1011 let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone());
1012
1013 let (program, module) = make_module_host(
1014 runtimes,
1015 host_type,
1016 replica_ctx.clone(),
1017 scheduler.clone(),
1018 program,
1019 energy_monitor,
1020 on_panic,
1021 core,
1022 )
1023 .await?;
1024
1025 let old_module_info = self.module.borrow().info.clone();
1027
1028 let update_result = update_module(&replica_ctx.relational_db, &module, program, old_module_info).await?;
1029 trace!("update result: {update_result:?}");
1030 if update_result.was_successful() {
1033 self.scheduler = scheduler;
1034 scheduler_starter.start(&module)?;
1035 let old_module = self.module.send_replace(module);
1036 old_module.exit().await;
1037 }
1038
1039 Ok(update_result)
1040 }
1041
1042 fn db(&self) -> &RelationalDB {
1043 &self.replica_ctx.relational_db
1044 }
1045}
1046
1047impl Drop for Host {
1048 fn drop(&mut self) {
1049 self.metrics_task.abort();
1050 }
1051}
1052
1053const STORAGE_METERING_INTERVAL: Duration = Duration::from_secs(15);
1054
1055async fn metric_reporter(replica_ctx: Arc<ReplicaContext>) {
1057 let message_log_size = DB_METRICS
1059 .message_log_size
1060 .with_label_values(&replica_ctx.database_identity);
1061 let module_log_file_size = DB_METRICS
1062 .module_log_file_size
1063 .with_label_values(&replica_ctx.database_identity);
1064
1065 loop {
1066 let disk_usage = tokio::task::block_in_place(|| replica_ctx.total_disk_usage());
1067 replica_ctx.update_gauges();
1068 if let Some(num_bytes) = disk_usage.durability {
1069 message_log_size.set(num_bytes as i64);
1070 }
1071 if let Some(num_bytes) = disk_usage.logs {
1072 module_log_file_size.set(num_bytes as i64);
1073 }
1074 tokio::time::sleep(STORAGE_METERING_INTERVAL).await;
1075 }
1076}
1077
1078pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> anyhow::Result<ModuleDef> {
1082 let owner_identity = Identity::from_u256(0xdcba_u32.into());
1083 let database_identity = Identity::from_u256(0xabcd_u32.into());
1084 let program = Program::from_bytes(program_bytes);
1085
1086 let database = Database {
1087 id: 0,
1088 database_identity,
1089 owner_identity,
1090 host_type,
1091 initial_program: program.hash,
1092 };
1093
1094 let runtimes = HostRuntimes::new(None);
1095 let page_pool = PagePool::new(None);
1096 let core = JobCore::default();
1097 let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?;
1098 let module_def = match Arc::try_unwrap(module_info) {
1100 Ok(info) => info.module_def,
1101 Err(info) => info.module_def.clone(),
1102 };
1103
1104 Ok(module_def)
1105}
1106
1107pub fn remove_database_gauges<'a, I>(db: &Identity, table_names: I)
1110where
1111 I: IntoIterator<Item = &'a str>,
1112{
1113 for table_name in table_names {
1115 let _ = DATA_SIZE_METRICS
1116 .data_size_table_num_rows
1117 .remove_label_values(db, table_name);
1118 let _ = DATA_SIZE_METRICS
1119 .data_size_table_bytes_used_by_rows
1120 .remove_label_values(db, table_name);
1121 let _ = DATA_SIZE_METRICS
1122 .data_size_table_num_rows_in_indexes
1123 .remove_label_values(db, table_name);
1124 let _ = DATA_SIZE_METRICS
1125 .data_size_table_bytes_used_by_index_keys
1126 .remove_label_values(db, table_name);
1127 }
1128 let _ = DATA_SIZE_METRICS.data_size_blob_store_num_blobs.remove_label_values(db);
1130 let _ = DATA_SIZE_METRICS
1131 .data_size_blob_store_bytes_used_by_blobs
1132 .remove_label_values(db);
1133 let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(db);
1134}