1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use std::sync::Arc;
use std::{collections::HashMap, sync::Mutex};

use once_cell::sync::OnceCell;

use crate::db::db_metrics::DB_METRICS;
use crate::host::scheduler::Scheduler;

use super::database_instance_context::DatabaseInstanceContext;

/// The database state managed by [`DatabaseInstanceContextController`].
///
/// Morally, this holds a pair of [`DatabaseInstanceContext`] and [`Scheduler`].
/// The former holds metadata and state of a running database instance, while
/// the latter manages scheduled reducers.
///
/// Operationally, this pair is wrapped in a [`OnceCell`], which ensures that
/// the same physical database instance is not initialized multiple times due
/// to concurrent access to the controller. Note that this is not primarily for
/// safety -- databases acquire filesystem locks which ensure singleton access --
/// but to prevent unhelpful errors from propagating up, and potential retries
/// from consuming resources.
///
/// Lastly, the [`OnceCell`] is wrapped in an [`Arc`] pointer, which allows
/// initialization to proceed without holding the lock on the controller's
/// internal map.
type Context = Arc<OnceCell<(Arc<DatabaseInstanceContext>, Scheduler)>>;

#[derive(Default)]
pub struct DatabaseInstanceContextController {
    contexts: Mutex<HashMap<u64, Context>>,
}

impl DatabaseInstanceContextController {
    pub fn new() -> Self {
        Self::default()
    }

    /// Get the database instance state if it is already initialized.
    ///
    /// Returns `None` if [`Self::get_or_try_init`] has not been called for the
    /// given instance id before, or the state was explicitly [`Self::remove`]d.
    #[tracing::instrument(skip_all)]
    pub fn get(&self, database_instance_id: u64) -> Option<(Arc<DatabaseInstanceContext>, Scheduler)> {
        let contexts = self.contexts.lock().unwrap();
        contexts
            .get(&database_instance_id)
            .and_then(|cell| cell.get())
            .map(|(dbic, scheduler)| (dbic.clone(), scheduler.clone()))
    }

    /// Get the database instance state, or initialize it if it is not present.
    ///
    /// If the instance is not initialized yet, this method will block until
    /// `F` returns. It will, however, release internal locks so calls to other
    /// methods will not block.
    ///
    /// After this method returns, the instance state becomes managed by the
    /// controller until it is removed by calling [`Self::remove`].
    ///
    /// Note that [`Self::remove`] must be called eventually, even if this
    /// method returns an `Err` result: in this case, [`Self::get`] returns
    /// `None` (as one would expect), but the given `database_instance_id` is
    /// nevertheless known to the controller.
    #[tracing::instrument(skip_all)]
    pub fn get_or_try_init<F, E>(
        &self,
        database_instance_id: u64,
        f: F,
    ) -> Result<(Arc<DatabaseInstanceContext>, Scheduler), E>
    where
        F: FnOnce() -> Result<(DatabaseInstanceContext, Scheduler), E>,
    {
        let cell = {
            let mut guard = self.contexts.lock().unwrap();
            let cell = guard
                .entry(database_instance_id)
                .or_insert_with(|| Arc::new(OnceCell::new()));
            Arc::clone(cell)
        };
        cell.get_or_try_init(|| {
            let (dbic, scheduler) = f()?;
            Ok((Arc::new(dbic), scheduler))
        })
        .map(|(dbic, scheduler)| (dbic.clone(), scheduler.clone()))
    }

    /// Remove and return the state corresponding to `database_instance_id`.
    ///
    /// Returns `None` if either the state is not known, or was not properly
    /// initialized (i.e. [`Self::get_or_try_init`] returned an error).
    ///
    /// This method may block if the instance state is currently being
    /// initialized via [`Self::get_or_try_init`].
    #[tracing::instrument(skip_all)]
    pub fn remove(&self, database_instance_id: u64) -> Option<(Arc<DatabaseInstanceContext>, Scheduler)> {
        let mut contexts = self.contexts.lock().unwrap();
        let arc = contexts.remove(&database_instance_id)?;
        match Arc::try_unwrap(arc) {
            Ok(cell) => cell.into_inner(),
            Err(arc) => {
                // If the `Arc`'s refcount is > 1, another thread is currently
                // executing the `get_or_try_init` closure. Wait until it
                // completes (instead of returning `None`), as callers rely on
                // calling `scheduler.clear()`.
                let (dbic, scheduler) = arc.wait();
                Some((dbic.clone(), scheduler.clone()))
            }
        }
    }

    #[tracing::instrument(skip_all)]
    pub fn update_metrics(&self) {
        for cell in self.contexts.lock().unwrap().values() {
            if let Some((db, _)) = cell.get() {
                // Use the previous gauge value if there is an issue getting the file size.
                if let Ok(num_bytes) = db.message_log_size_on_disk() {
                    DB_METRICS
                        .message_log_size
                        .with_label_values(&db.address)
                        .set(num_bytes as i64);
                }
                // Use the previous gauge value if there is an issue getting the file size.
                if let Ok(num_bytes) = db.object_db_size_on_disk() {
                    DB_METRICS
                        .object_db_disk_usage
                        .with_label_values(&db.address)
                        .set(num_bytes as i64);
                }
                // Use the previous gauge value if there is an issue getting the file size.
                if let Ok(num_bytes) = db.log_file_size() {
                    DB_METRICS
                        .module_log_file_size
                        .with_label_values(&db.address)
                        .set(num_bytes as i64);
                }
            }
        }
    }
}