1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
use super::database_instance_context::DatabaseInstanceContext;
use crate::database_instance_context::TotalDiskUsage;
use crate::db::db_metrics::DB_METRICS;
use crate::energy::EnergyMonitor;
use crate::host::Scheduler;
use once_cell::sync::OnceCell;
use spacetimedb_data_structures::map::IntMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
/// The database state managed by [`DatabaseInstanceContextController`].
///
/// Morally, this holds a pair of [`DatabaseInstanceContext`] and [`Scheduler`].
/// The former holds metadata and state of a running database instance, while
/// the latter manages scheduled reducers.
///
/// Operationally, this pair is wrapped in a [`OnceCell`], which ensures that
/// the same physical database instance is not initialized multiple times due
/// to concurrent access to the controller. Note that this is not primarily for
/// safety -- databases acquire filesystem locks which ensure singleton access --
/// but to prevent unhelpful errors from propagating up, and potential retries
/// from consuming resources.
///
/// Lastly, the [`OnceCell`] is wrapped in an [`Arc`] pointer, which allows
/// initialization to proceed without holding the lock on the controller's
/// internal map.
type Context = Arc<OnceCell<(Arc<DatabaseInstanceContext>, Scheduler)>>;
#[derive(Clone)]
pub struct DatabaseInstanceContextController {
contexts: Arc<Mutex<IntMap<u64, (Context, TotalDiskUsage)>>>,
energy_monitor: Arc<dyn EnergyMonitor>,
}
impl DatabaseInstanceContextController {
pub fn new(energy_monitor: Arc<dyn EnergyMonitor>) -> Self {
Self {
contexts: Arc::default(),
energy_monitor,
}
}
/// Get the database instance state if it is already initialized.
///
/// Returns `None` if [`Self::get_or_try_init`] has not been called for the
/// given instance id before, or the state was explicitly [`Self::remove`]d.
#[tracing::instrument(skip_all)]
pub fn get(&self, database_instance_id: u64) -> Option<(Arc<DatabaseInstanceContext>, Scheduler)> {
let contexts = self.contexts.lock().unwrap();
contexts
.get(&database_instance_id)
.and_then(|cell| cell.0.get())
.map(|(dbic, scheduler)| (dbic.clone(), scheduler.clone()))
}
/// Get the database instance state, or initialize it if it is not present.
///
/// If the instance is not initialized yet, this method will block until
/// `F` returns. It will, however, release internal locks so calls to other
/// methods will not block.
///
/// After this method returns, the instance state becomes managed by the
/// controller until it is removed by calling [`Self::remove`].
///
/// Note that [`Self::remove`] must be called eventually, even if this
/// method returns an `Err` result: in this case, [`Self::get`] returns
/// `None` (as one would expect), but the given `database_instance_id` is
/// nevertheless known to the controller.
#[tracing::instrument(skip_all)]
pub fn get_or_try_init<F, E>(
&self,
database_instance_id: u64,
f: F,
) -> Result<(Arc<DatabaseInstanceContext>, Scheduler), E>
where
F: FnOnce() -> Result<(DatabaseInstanceContext, Scheduler), E>,
{
let cell = {
let mut guard = self.contexts.lock().unwrap();
let (cell, _) = guard
.entry(database_instance_id)
.or_insert_with(|| (Arc::new(OnceCell::new()), TotalDiskUsage::default()));
Arc::clone(cell)
};
cell.get_or_try_init(|| {
let (dbic, scheduler) = f()?;
Ok((Arc::new(dbic), scheduler))
})
.map(|(dbic, scheduler)| (dbic.clone(), scheduler.clone()))
}
/// Remove and return the state corresponding to `database_instance_id`.
///
/// Returns `None` if either the state is not known, or was not properly
/// initialized (i.e. [`Self::get_or_try_init`] returned an error).
///
/// This method may block if the instance state is currently being
/// initialized via [`Self::get_or_try_init`].
#[tracing::instrument(skip_all)]
pub fn remove(&self, database_instance_id: u64) -> Option<(Arc<DatabaseInstanceContext>, Scheduler)> {
let mut contexts = self.contexts.lock().unwrap();
let (arc, _) = contexts.remove(&database_instance_id)?;
match Arc::try_unwrap(arc) {
Ok(cell) => cell.into_inner(),
Err(arc) => {
// If the `Arc`'s refcount is > 1, another thread is currently
// executing the `get_or_try_init` closure. Wait until it
// completes (instead of returning `None`), as callers rely on
// calling `scheduler.clear()`.
// TODO(noa): this can deadlock if get_or_try_init() errors. maybe use a different datastructure?
let (dbic, scheduler) = arc.wait();
Some((dbic.clone(), scheduler.clone()))
}
}
}
#[tracing::instrument(skip_all)]
pub fn update_metrics(&self) {
for (cell, _) in self.contexts.lock().unwrap().values() {
if let Some((db, _)) = cell.get() {
// Use the previous gauge value if there is an issue getting the file sizes.
if let Ok(num_bytes) = db.durability_size_on_disk() {
DB_METRICS
.message_log_size
.with_label_values(&db.address)
.set(num_bytes as i64);
}
if let Ok(num_bytes) = db.log_file_size() {
DB_METRICS
.module_log_file_size
.with_label_values(&db.address)
.set(num_bytes as i64);
}
}
}
}
pub fn start_disk_monitor(&self) {
tokio::spawn(self.clone().disk_monitor());
}
const DISK_METERING_INTERVAL: Duration = Duration::from_secs(5);
async fn disk_monitor(self) {
let mut interval = tokio::time::interval(Self::DISK_METERING_INTERVAL);
// we don't care about happening precisely every 5 seconds - it just matters that the time between
// ticks is accurate
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut prev_tick = interval.tick().await;
loop {
let tick = interval.tick().await;
let dt = tick - prev_tick;
for (cell, prev_disk_usage) in self.contexts.lock().unwrap().values_mut() {
if let Some((db, _)) = cell.get() {
let disk_usage = db.total_disk_usage().or(*prev_disk_usage);
self.energy_monitor
.record_disk_usage(&db.database, db.database_instance_id, disk_usage.sum(), dt);
*prev_disk_usage = disk_usage;
}
}
prev_tick = tick;
}
}
}