spacetimedb/subscription/
mod.rs1use std::sync::Arc;
2
3use anyhow::Result;
4use module_subscription_manager::Plan;
5use prometheus::IntCounter;
6use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
7use spacetimedb_client_api_messages::websocket::{
8 ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, WebsocketFormat,
9};
10use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
11use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
12use spacetimedb_primitives::TableId;
13
14use crate::{error::DBError, worker_metrics::WORKER_METRICS};
15use spacetimedb_datastore::{
16 db_metrics::DB_METRICS, execution_context::WorkloadType, locking_tx_datastore::datastore::MetricsRecorder,
17};
18
19pub mod delta;
20pub mod execution_unit;
21pub mod module_subscription_actor;
22pub mod module_subscription_manager;
23pub mod query;
24#[allow(clippy::module_inception)] pub mod subscription;
26pub mod tx;
27
28#[derive(Debug)]
29pub struct ExecutionCounters {
30 rdb_num_index_seeks: IntCounter,
31 rdb_num_rows_scanned: IntCounter,
32 rdb_num_bytes_scanned: IntCounter,
33 rdb_num_bytes_written: IntCounter,
34 bytes_sent_to_clients: IntCounter,
35 delta_queries_matched: IntCounter,
36 delta_queries_evaluated: IntCounter,
37 duplicate_rows_evaluated: IntCounter,
38 duplicate_rows_sent: IntCounter,
39}
40
41impl ExecutionCounters {
42 pub fn new(workload: &WorkloadType, db: &Identity) -> Self {
43 Self {
44 rdb_num_index_seeks: DB_METRICS.rdb_num_index_seeks.with_label_values(workload, db),
45 rdb_num_rows_scanned: DB_METRICS.rdb_num_rows_scanned.with_label_values(workload, db),
46 rdb_num_bytes_scanned: DB_METRICS.rdb_num_bytes_scanned.with_label_values(workload, db),
47 rdb_num_bytes_written: DB_METRICS.rdb_num_bytes_written.with_label_values(workload, db),
48 bytes_sent_to_clients: WORKER_METRICS.bytes_sent_to_clients.with_label_values(workload, db),
49 delta_queries_matched: DB_METRICS.delta_queries_matched.with_label_values(db),
50 delta_queries_evaluated: DB_METRICS.delta_queries_evaluated.with_label_values(db),
51 duplicate_rows_evaluated: DB_METRICS.duplicate_rows_evaluated.with_label_values(db),
52 duplicate_rows_sent: DB_METRICS.duplicate_rows_sent.with_label_values(db),
53 }
54 }
55
56 pub(crate) fn record(&self, metrics: &ExecutionMetrics) {
58 if metrics.index_seeks > 0 {
59 self.rdb_num_index_seeks.inc_by(metrics.index_seeks as u64);
60 }
61 if metrics.rows_scanned > 0 {
62 self.rdb_num_rows_scanned.inc_by(metrics.rows_scanned as u64);
63 }
64 if metrics.bytes_scanned > 0 {
65 self.rdb_num_bytes_scanned.inc_by(metrics.bytes_scanned as u64);
66 }
67 if metrics.bytes_written > 0 {
68 self.rdb_num_bytes_written.inc_by(metrics.bytes_written as u64);
69 }
70 if metrics.bytes_sent_to_clients > 0 {
71 self.bytes_sent_to_clients.inc_by(metrics.bytes_sent_to_clients as u64);
72 }
73 if metrics.delta_queries_matched > 0 {
74 self.delta_queries_matched.inc_by(metrics.delta_queries_matched);
75 }
76 if metrics.delta_queries_evaluated > 0 {
77 self.delta_queries_evaluated.inc_by(metrics.delta_queries_evaluated);
78 }
79 if metrics.duplicate_rows_evaluated > 0 {
80 self.duplicate_rows_evaluated.inc_by(metrics.duplicate_rows_evaluated);
81 }
82 if metrics.duplicate_rows_sent > 0 {
83 self.duplicate_rows_sent.inc_by(metrics.duplicate_rows_sent);
84 }
85 }
86}
87
88impl MetricsRecorder for ExecutionCounters {
89 fn record(&self, metrics: &ExecutionMetrics) {
90 self.record(metrics);
91 }
92}
93
94pub fn execute_plan<Tx, F>(plan_fragments: &[PipelinedProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)>
96where
97 Tx: Datastore + DeltaStore,
98 F: WebsocketFormat,
99{
100 let mut rows = vec![];
101 let mut metrics = ExecutionMetrics::default();
102
103 for fragment in plan_fragments {
104 fragment.execute(tx, &mut metrics, &mut |row| {
105 rows.push(row);
106 Ok(())
107 })?;
108 }
109
110 let (list, n) = F::encode_list(rows.into_iter());
111 metrics.bytes_scanned += list.num_bytes();
112 metrics.bytes_sent_to_clients += list.num_bytes();
113 Ok((list, n, metrics))
114}
115
116#[derive(Debug, Clone, Copy)]
119pub enum TableUpdateType {
120 Subscribe,
121 Unsubscribe,
122}
123
124pub fn collect_table_update<Tx, F>(
126 plan_fragments: &[PipelinedProject],
127 table_id: TableId,
128 table_name: Box<str>,
129 tx: &Tx,
130 update_type: TableUpdateType,
131) -> Result<(TableUpdate<F>, ExecutionMetrics)>
132where
133 Tx: Datastore + DeltaStore,
134 F: WebsocketFormat,
135{
136 execute_plan::<Tx, F>(plan_fragments, tx).map(|(rows, num_rows, metrics)| {
137 let empty = F::List::default();
138 let qu = match update_type {
139 TableUpdateType::Subscribe => QueryUpdate {
140 deletes: empty,
141 inserts: rows,
142 },
143 TableUpdateType::Unsubscribe => QueryUpdate {
144 deletes: rows,
145 inserts: empty,
146 },
147 };
148 let update = F::into_query_update(qu, Compression::None);
152 (
153 TableUpdate::new(table_id, table_name, SingleQueryUpdate { update, num_rows }),
154 metrics,
155 )
156 })
157}
158
159pub fn execute_plans<Tx, F>(
161 plans: &[Arc<Plan>],
162 tx: &Tx,
163 update_type: TableUpdateType,
164) -> Result<(DatabaseUpdate<F>, ExecutionMetrics), DBError>
165where
166 Tx: Datastore + DeltaStore + Sync,
167 F: WebsocketFormat,
168{
169 plans
170 .par_iter()
171 .flat_map_iter(|plan| plan.plans_fragments().map(|fragment| (plan.sql(), fragment)))
172 .filter(|(_, plan)| {
173 plan.table_ids()
176 .all(|table_id| tx.table(table_id).is_some_and(|t| t.row_count > 0))
177 })
178 .map(|(sql, plan)| (sql, plan, plan.subscribed_table_id(), plan.subscribed_table_name()))
179 .map(|(sql, plan, table_id, table_name)| {
180 plan.optimized_physical_plan()
181 .clone()
182 .optimize()
183 .map(|plan| (sql, PipelinedProject::from(plan)))
184 .and_then(|(_, plan)| collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type))
185 .map_err(|err| DBError::WithSql {
186 sql: sql.into(),
187 error: Box::new(DBError::Other(err)),
188 })
189 })
190 .collect::<Result<Vec<_>, _>>()
191 .map(|table_updates_with_metrics| {
192 let n = table_updates_with_metrics.len();
193 let mut tables = Vec::with_capacity(n);
194 let mut aggregated_metrics = ExecutionMetrics::default();
195 for (update, metrics) in table_updates_with_metrics {
196 tables.push(update);
197 aggregated_metrics.merge(metrics);
198 }
199 (DatabaseUpdate { tables }, aggregated_metrics)
200 })
201}