spacetimedb/subscription/
mod.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use module_subscription_manager::Plan;
5use prometheus::IntCounter;
6use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
7use spacetimedb_client_api_messages::websocket::{
8    ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, WebsocketFormat,
9};
10use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
11use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
12use spacetimedb_primitives::TableId;
13
14use crate::{error::DBError, worker_metrics::WORKER_METRICS};
15use spacetimedb_datastore::{
16    db_metrics::DB_METRICS, execution_context::WorkloadType, locking_tx_datastore::datastore::MetricsRecorder,
17};
18
19pub mod delta;
20pub mod execution_unit;
21pub mod module_subscription_actor;
22pub mod module_subscription_manager;
23pub mod query;
24#[allow(clippy::module_inception)] // it's right this isn't ideal :/
25pub mod subscription;
26pub mod tx;
27
28#[derive(Debug)]
29pub struct ExecutionCounters {
30    rdb_num_index_seeks: IntCounter,
31    rdb_num_rows_scanned: IntCounter,
32    rdb_num_bytes_scanned: IntCounter,
33    rdb_num_bytes_written: IntCounter,
34    bytes_sent_to_clients: IntCounter,
35    delta_queries_matched: IntCounter,
36    delta_queries_evaluated: IntCounter,
37    duplicate_rows_evaluated: IntCounter,
38    duplicate_rows_sent: IntCounter,
39}
40
41impl ExecutionCounters {
42    pub fn new(workload: &WorkloadType, db: &Identity) -> Self {
43        Self {
44            rdb_num_index_seeks: DB_METRICS.rdb_num_index_seeks.with_label_values(workload, db),
45            rdb_num_rows_scanned: DB_METRICS.rdb_num_rows_scanned.with_label_values(workload, db),
46            rdb_num_bytes_scanned: DB_METRICS.rdb_num_bytes_scanned.with_label_values(workload, db),
47            rdb_num_bytes_written: DB_METRICS.rdb_num_bytes_written.with_label_values(workload, db),
48            bytes_sent_to_clients: WORKER_METRICS.bytes_sent_to_clients.with_label_values(workload, db),
49            delta_queries_matched: DB_METRICS.delta_queries_matched.with_label_values(db),
50            delta_queries_evaluated: DB_METRICS.delta_queries_evaluated.with_label_values(db),
51            duplicate_rows_evaluated: DB_METRICS.duplicate_rows_evaluated.with_label_values(db),
52            duplicate_rows_sent: DB_METRICS.duplicate_rows_sent.with_label_values(db),
53        }
54    }
55
56    /// Update the global system metrics with transaction-level execution metrics.
57    pub(crate) fn record(&self, metrics: &ExecutionMetrics) {
58        if metrics.index_seeks > 0 {
59            self.rdb_num_index_seeks.inc_by(metrics.index_seeks as u64);
60        }
61        if metrics.rows_scanned > 0 {
62            self.rdb_num_rows_scanned.inc_by(metrics.rows_scanned as u64);
63        }
64        if metrics.bytes_scanned > 0 {
65            self.rdb_num_bytes_scanned.inc_by(metrics.bytes_scanned as u64);
66        }
67        if metrics.bytes_written > 0 {
68            self.rdb_num_bytes_written.inc_by(metrics.bytes_written as u64);
69        }
70        if metrics.bytes_sent_to_clients > 0 {
71            self.bytes_sent_to_clients.inc_by(metrics.bytes_sent_to_clients as u64);
72        }
73        if metrics.delta_queries_matched > 0 {
74            self.delta_queries_matched.inc_by(metrics.delta_queries_matched);
75        }
76        if metrics.delta_queries_evaluated > 0 {
77            self.delta_queries_evaluated.inc_by(metrics.delta_queries_evaluated);
78        }
79        if metrics.duplicate_rows_evaluated > 0 {
80            self.duplicate_rows_evaluated.inc_by(metrics.duplicate_rows_evaluated);
81        }
82        if metrics.duplicate_rows_sent > 0 {
83            self.duplicate_rows_sent.inc_by(metrics.duplicate_rows_sent);
84        }
85    }
86}
87
88impl MetricsRecorder for ExecutionCounters {
89    fn record(&self, metrics: &ExecutionMetrics) {
90        self.record(metrics);
91    }
92}
93
94/// Execute a subscription query
95pub fn execute_plan<Tx, F>(plan_fragments: &[PipelinedProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)>
96where
97    Tx: Datastore + DeltaStore,
98    F: WebsocketFormat,
99{
100    let mut rows = vec![];
101    let mut metrics = ExecutionMetrics::default();
102
103    for fragment in plan_fragments {
104        fragment.execute(tx, &mut metrics, &mut |row| {
105            rows.push(row);
106            Ok(())
107        })?;
108    }
109
110    let (list, n) = F::encode_list(rows.into_iter());
111    metrics.bytes_scanned += list.num_bytes();
112    metrics.bytes_sent_to_clients += list.num_bytes();
113    Ok((list, n, metrics))
114}
115
116/// When collecting a table update are we inserting or deleting rows?
117/// For unsubscribe operations, we need to delete rows.
118#[derive(Debug, Clone, Copy)]
119pub enum TableUpdateType {
120    Subscribe,
121    Unsubscribe,
122}
123
124/// Execute a subscription query and collect the results in a [TableUpdate]
125pub fn collect_table_update<Tx, F>(
126    plan_fragments: &[PipelinedProject],
127    table_id: TableId,
128    table_name: Box<str>,
129    tx: &Tx,
130    update_type: TableUpdateType,
131) -> Result<(TableUpdate<F>, ExecutionMetrics)>
132where
133    Tx: Datastore + DeltaStore,
134    F: WebsocketFormat,
135{
136    execute_plan::<Tx, F>(plan_fragments, tx).map(|(rows, num_rows, metrics)| {
137        let empty = F::List::default();
138        let qu = match update_type {
139            TableUpdateType::Subscribe => QueryUpdate {
140                deletes: empty,
141                inserts: rows,
142            },
143            TableUpdateType::Unsubscribe => QueryUpdate {
144                deletes: rows,
145                inserts: empty,
146            },
147        };
148        // We will compress the outer server message,
149        // after we release the tx lock.
150        // There's no need to compress the inner table update too.
151        let update = F::into_query_update(qu, Compression::None);
152        (
153            TableUpdate::new(table_id, table_name, SingleQueryUpdate { update, num_rows }),
154            metrics,
155        )
156    })
157}
158
159/// Execute a collection of subscription queries in parallel
160pub fn execute_plans<Tx, F>(
161    plans: &[Arc<Plan>],
162    tx: &Tx,
163    update_type: TableUpdateType,
164) -> Result<(DatabaseUpdate<F>, ExecutionMetrics), DBError>
165where
166    Tx: Datastore + DeltaStore + Sync,
167    F: WebsocketFormat,
168{
169    plans
170        .par_iter()
171        .flat_map_iter(|plan| plan.plans_fragments().map(|fragment| (plan.sql(), fragment)))
172        .filter(|(_, plan)| {
173            // Since subscriptions only support selects and inner joins,
174            // we filter out any plans that read from an empty table.
175            plan.table_ids()
176                .all(|table_id| tx.table(table_id).is_some_and(|t| t.row_count > 0))
177        })
178        .map(|(sql, plan)| (sql, plan, plan.subscribed_table_id(), plan.subscribed_table_name()))
179        .map(|(sql, plan, table_id, table_name)| {
180            plan.optimized_physical_plan()
181                .clone()
182                .optimize()
183                .map(|plan| (sql, PipelinedProject::from(plan)))
184                .and_then(|(_, plan)| collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type))
185                .map_err(|err| DBError::WithSql {
186                    sql: sql.into(),
187                    error: Box::new(DBError::Other(err)),
188                })
189        })
190        .collect::<Result<Vec<_>, _>>()
191        .map(|table_updates_with_metrics| {
192            let n = table_updates_with_metrics.len();
193            let mut tables = Vec::with_capacity(n);
194            let mut aggregated_metrics = ExecutionMetrics::default();
195            for (update, metrics) in table_updates_with_metrics {
196                tables.push(update);
197                aggregated_metrics.merge(metrics);
198            }
199            (DatabaseUpdate { tables }, aggregated_metrics)
200        })
201}