spacetimedb/db/
mod.rs

1use std::sync::Arc;
2
3use enum_map::EnumMap;
4use tokio::sync::mpsc;
5
6use crate::subscription::ExecutionCounters;
7use spacetimedb_datastore::execution_context::WorkloadType;
8use spacetimedb_datastore::{locking_tx_datastore::datastore::TxMetrics, traits::TxData};
9
10pub mod relational_db;
11pub mod update;
12
13/// Whether SpacetimeDB is run in memory, or persists objects and
14/// a message log to disk.
15#[derive(Clone, Copy)]
16pub enum Storage {
17    /// The object store is in memory, and no message log is kept.
18    Memory,
19
20    /// The object store is persisted to disk, and a message log is kept.
21    Disk,
22}
23
24/// Internal database config parameters
25#[derive(Clone, Copy)]
26pub struct Config {
27    /// Specifies the object storage model.
28    pub storage: Storage,
29    /// Specifies the page pool max size in bytes.
30    pub page_pool_max_size: Option<usize>,
31}
32
33/// A message that is processed by the [`spawn_metrics_recorder`] actor.
34/// We use a separate task to record metrics to avoid blocking transactions.
35pub struct MetricsMessage {
36    /// The reducer the produced these metrics.
37    reducer: String,
38    /// Metrics from a mutable transaction.
39    metrics_for_writer: Option<TxMetrics>,
40    /// Metrics from a read-only transaction.
41    /// A message may have metrics for both types of transactions,
42    /// because metrics for a reducer and its subscription updates are recorded together.
43    metrics_for_reader: Option<TxMetrics>,
44    /// The row updates for an immutable transaction.
45    /// Needed for insert and delete counters.
46    tx_data: Option<Arc<TxData>>,
47    /// Cached metrics counters for each workload type.
48    counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
49}
50
51/// The handle used to send work to the tx metrics recorder.
52#[derive(Clone)]
53pub struct MetricsRecorderQueue {
54    tx: mpsc::UnboundedSender<MetricsMessage>,
55}
56
57impl MetricsRecorderQueue {
58    pub fn send_metrics(
59        &self,
60        reducer: String,
61        metrics_for_writer: Option<TxMetrics>,
62        metrics_for_reader: Option<TxMetrics>,
63        tx_data: Option<Arc<TxData>>,
64        counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
65    ) {
66        if let Err(err) = self.tx.send(MetricsMessage {
67            reducer,
68            metrics_for_writer,
69            metrics_for_reader,
70            tx_data,
71            counters,
72        }) {
73            log::warn!("failed to send metrics: {err}");
74        }
75    }
76}
77
78/// Spawns a task for recording transaction metrics.
79/// Returns the handle for pushing metrics to the recorder.
80pub fn spawn_tx_metrics_recorder() -> (MetricsRecorderQueue, tokio::task::AbortHandle) {
81    let (tx, mut rx) = mpsc::unbounded_channel();
82    let abort_handle = tokio::spawn(async move {
83        while let Some(MetricsMessage {
84            reducer,
85            metrics_for_writer,
86            metrics_for_reader,
87            tx_data,
88            counters,
89        }) = rx.recv().await
90        {
91            if let Some(tx_metrics) = metrics_for_writer {
92                tx_metrics.report(
93                    // If row updates are present,
94                    // they will always belong to the writer transaction.
95                    tx_data.as_deref(),
96                    &reducer,
97                    |wl| &counters[wl],
98                );
99            }
100            if let Some(tx_metrics) = metrics_for_reader {
101                tx_metrics.report(
102                    // If row updates are present,
103                    // they will never belong to the reader transaction.
104                    // Passing row updates here will most likely panic.
105                    None,
106                    &reducer,
107                    |wl| &counters[wl],
108                );
109            }
110        }
111    })
112    .abort_handle();
113    (MetricsRecorderQueue { tx }, abort_handle)
114}