spacetimedb/db/mod.rs
1use std::sync::Arc;
2
3use enum_map::EnumMap;
4use tokio::sync::mpsc;
5
6use crate::subscription::ExecutionCounters;
7use spacetimedb_datastore::execution_context::WorkloadType;
8use spacetimedb_datastore::{locking_tx_datastore::datastore::TxMetrics, traits::TxData};
9
10pub mod relational_db;
11pub mod update;
12
13/// Whether SpacetimeDB is run in memory, or persists objects and
14/// a message log to disk.
15#[derive(Clone, Copy)]
16pub enum Storage {
17 /// The object store is in memory, and no message log is kept.
18 Memory,
19
20 /// The object store is persisted to disk, and a message log is kept.
21 Disk,
22}
23
24/// Internal database config parameters
25#[derive(Clone, Copy)]
26pub struct Config {
27 /// Specifies the object storage model.
28 pub storage: Storage,
29 /// Specifies the page pool max size in bytes.
30 pub page_pool_max_size: Option<usize>,
31}
32
33/// A message that is processed by the [`spawn_metrics_recorder`] actor.
34/// We use a separate task to record metrics to avoid blocking transactions.
35pub struct MetricsMessage {
36 /// The reducer the produced these metrics.
37 reducer: String,
38 /// Metrics from a mutable transaction.
39 metrics_for_writer: Option<TxMetrics>,
40 /// Metrics from a read-only transaction.
41 /// A message may have metrics for both types of transactions,
42 /// because metrics for a reducer and its subscription updates are recorded together.
43 metrics_for_reader: Option<TxMetrics>,
44 /// The row updates for an immutable transaction.
45 /// Needed for insert and delete counters.
46 tx_data: Option<Arc<TxData>>,
47 /// Cached metrics counters for each workload type.
48 counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
49}
50
51/// The handle used to send work to the tx metrics recorder.
52#[derive(Clone)]
53pub struct MetricsRecorderQueue {
54 tx: mpsc::UnboundedSender<MetricsMessage>,
55}
56
57impl MetricsRecorderQueue {
58 pub fn send_metrics(
59 &self,
60 reducer: String,
61 metrics_for_writer: Option<TxMetrics>,
62 metrics_for_reader: Option<TxMetrics>,
63 tx_data: Option<Arc<TxData>>,
64 counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
65 ) {
66 if let Err(err) = self.tx.send(MetricsMessage {
67 reducer,
68 metrics_for_writer,
69 metrics_for_reader,
70 tx_data,
71 counters,
72 }) {
73 log::warn!("failed to send metrics: {err}");
74 }
75 }
76}
77
78/// Spawns a task for recording transaction metrics.
79/// Returns the handle for pushing metrics to the recorder.
80pub fn spawn_tx_metrics_recorder() -> (MetricsRecorderQueue, tokio::task::AbortHandle) {
81 let (tx, mut rx) = mpsc::unbounded_channel();
82 let abort_handle = tokio::spawn(async move {
83 while let Some(MetricsMessage {
84 reducer,
85 metrics_for_writer,
86 metrics_for_reader,
87 tx_data,
88 counters,
89 }) = rx.recv().await
90 {
91 if let Some(tx_metrics) = metrics_for_writer {
92 tx_metrics.report(
93 // If row updates are present,
94 // they will always belong to the writer transaction.
95 tx_data.as_deref(),
96 &reducer,
97 |wl| &counters[wl],
98 );
99 }
100 if let Some(tx_metrics) = metrics_for_reader {
101 tx_metrics.report(
102 // If row updates are present,
103 // they will never belong to the reader transaction.
104 // Passing row updates here will most likely panic.
105 None,
106 &reducer,
107 |wl| &counters[wl],
108 );
109 }
110 }
111 })
112 .abort_handle();
113 (MetricsRecorderQueue { tx }, abort_handle)
114}