1use super::execution_unit::QueryHash;
2use super::module_subscription_manager::{
3 spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager,
4};
5use super::query::compile_query_with_hashes;
6use super::tx::DeltaTx;
7use super::{collect_table_update, TableUpdateType};
8use crate::client::messages::{
9 SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult,
10 SubscriptionRows, SubscriptionUpdateMessage, TransactionUpdateMessage,
11};
12use crate::client::{ClientActorId, ClientConnectionSender, Protocol};
13use crate::db::datastore::locking_tx_datastore::tx::TxId;
14use crate::db::db_metrics::DB_METRICS;
15use crate::db::relational_db::{MutTx, RelationalDB, Tx};
16use crate::error::DBError;
17use crate::estimation::estimate_rows_scanned;
18use crate::execution_context::{Workload, WorkloadType};
19use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent};
20use crate::messages::websocket::Subscribe;
21use crate::subscription::execute_plans;
22use crate::subscription::query::is_subscribe_to_all_tables;
23use crate::util::prometheus_handle::IntGaugeExt;
24use crate::vm::check_row_limit;
25use crate::worker_metrics::WORKER_METRICS;
26use parking_lot::RwLock;
27use prometheus::{Histogram, HistogramTimer, IntCounter, IntGauge};
28use spacetimedb_client_api_messages::websocket::{
29 self as ws, BsatnFormat, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, TableUpdate, Unsubscribe,
30 UnsubscribeMulti,
31};
32use spacetimedb_execution::pipelined::PipelinedProject;
33use spacetimedb_lib::identity::AuthCtx;
34use spacetimedb_lib::metrics::ExecutionMetrics;
35use spacetimedb_lib::Identity;
36use std::{sync::Arc, time::Instant};
37
38type Subscriptions = Arc<RwLock<SubscriptionManager>>;
39
40#[derive(Debug, Clone)]
41pub struct ModuleSubscriptions {
42 relational_db: Arc<RelationalDB>,
43 subscriptions: Subscriptions,
46 broadcast_queue: BroadcastQueue,
47 owner_identity: Identity,
48 stats: Arc<SubscriptionGauges>,
49}
50
51#[derive(Debug, Clone)]
52pub struct SubscriptionGauges {
53 db_identity: Identity,
54 num_queries: IntGauge,
55 num_connections: IntGauge,
56 num_subscription_sets: IntGauge,
57 num_query_subscriptions: IntGauge,
58 num_legacy_subscriptions: IntGauge,
59}
60
61impl SubscriptionGauges {
62 fn new(db_identity: &Identity) -> Self {
63 let num_queries = WORKER_METRICS.subscription_queries.with_label_values(db_identity);
64 let num_connections = DB_METRICS.subscription_connections.with_label_values(db_identity);
65 let num_subscription_sets = DB_METRICS.subscription_sets.with_label_values(db_identity);
66 let num_query_subscriptions = DB_METRICS.total_query_subscriptions.with_label_values(db_identity);
67 let num_legacy_subscriptions = DB_METRICS.num_legacy_subscriptions.with_label_values(db_identity);
68 Self {
69 db_identity: *db_identity,
70 num_queries,
71 num_connections,
72 num_subscription_sets,
73 num_query_subscriptions,
74 num_legacy_subscriptions,
75 }
76 }
77
78 fn unregister(&self) {
80 let _ = WORKER_METRICS
81 .subscription_queries
82 .remove_label_values(&self.db_identity);
83 let _ = DB_METRICS
84 .subscription_connections
85 .remove_label_values(&self.db_identity);
86 let _ = DB_METRICS.subscription_sets.remove_label_values(&self.db_identity);
87 let _ = DB_METRICS
88 .total_query_subscriptions
89 .remove_label_values(&self.db_identity);
90 let _ = DB_METRICS
91 .num_legacy_subscriptions
92 .remove_label_values(&self.db_identity);
93 }
94
95 fn report(&self, stats: &SubscriptionGaugeStats) {
96 self.num_queries.set(stats.num_queries as i64);
97 self.num_connections.set(stats.num_connections as i64);
98 self.num_subscription_sets.set(stats.num_subscription_sets as i64);
99 self.num_query_subscriptions.set(stats.num_query_subscriptions as i64);
100 self.num_legacy_subscriptions.set(stats.num_legacy_subscriptions as i64);
101 }
102}
103
104pub struct SubscriptionMetrics {
105 pub lock_waiters: IntGauge,
106 pub lock_wait_time: Histogram,
107 pub compilation_time: Histogram,
108 pub num_queries_subscribed: IntCounter,
109 pub num_new_queries_subscribed: IntCounter,
110 pub num_queries_evaluated: IntCounter,
111}
112
113impl SubscriptionMetrics {
114 pub fn new(db: &Identity, workload: &WorkloadType) -> Self {
115 Self {
116 lock_waiters: DB_METRICS.subscription_lock_waiters.with_label_values(db, workload),
117 lock_wait_time: DB_METRICS.subscription_lock_wait_time.with_label_values(db, workload),
118 compilation_time: DB_METRICS.subscription_compile_time.with_label_values(db, workload),
119 num_queries_subscribed: DB_METRICS.num_queries_subscribed.with_label_values(db),
120 num_new_queries_subscribed: DB_METRICS.num_new_queries_subscribed.with_label_values(db),
121 num_queries_evaluated: DB_METRICS.num_queries_evaluated.with_label_values(db, workload),
122 }
123 }
124}
125
126type AssertTxFn = Arc<dyn Fn(&Tx)>;
127type SubscriptionUpdate = FormatSwitch<TableUpdate<BsatnFormat>, TableUpdate<JsonFormat>>;
128type FullSubscriptionUpdate = FormatSwitch<ws::DatabaseUpdate<BsatnFormat>, ws::DatabaseUpdate<JsonFormat>>;
129
130macro_rules! return_on_err {
132 ($expr:expr, $handler:expr, $metrics:expr) => {
133 match $expr {
134 Ok(val) => val,
135 Err(e) => {
136 let _ = $handler(e.to_string().into());
138 return Ok($metrics);
139 }
140 }
141 };
142}
143
144macro_rules! return_on_err_with_sql {
146 ($expr:expr, $sql:expr, $handler:expr) => {
147 match $expr.map_err(|err| DBError::WithSql {
148 sql: $sql.into(),
149 error: Box::new(DBError::Other(err.into())),
150 }) {
151 Ok(val) => val,
152 Err(e) => {
153 let _ = $handler(e.to_string().into());
155 return Ok(None);
156 }
157 }
158 };
159}
160
161impl ModuleSubscriptions {
162 pub fn new(
163 relational_db: Arc<RelationalDB>,
164 subscriptions: Subscriptions,
165 broadcast_queue: BroadcastQueue,
166 owner_identity: Identity,
167 ) -> Self {
168 let db = &relational_db.database_identity();
169 let stats = Arc::new(SubscriptionGauges::new(db));
170
171 Self {
172 relational_db,
173 subscriptions,
174 broadcast_queue,
175 owner_identity,
176 stats,
177 }
178 }
179
180 pub fn for_test_new_runtime(db: Arc<RelationalDB>) -> (ModuleSubscriptions, tokio::runtime::Runtime) {
183 let runtime = tokio::runtime::Runtime::new().unwrap();
184 let _rt = runtime.enter();
185 (Self::for_test_enclosing_runtime(db), runtime)
186 }
187
188 pub fn for_test_enclosing_runtime(db: Arc<RelationalDB>) -> ModuleSubscriptions {
191 let send_worker_queue = spawn_send_worker(None);
192 ModuleSubscriptions::new(
193 db,
194 SubscriptionManager::for_test_without_metrics_arc_rwlock(),
195 send_worker_queue,
196 Identity::ZERO,
197 )
198 }
199
200 pub fn update_gauges(&self) {
202 let num_queries = self.subscriptions.read().calculate_gauge_stats();
203 self.stats.report(&num_queries);
204 }
205
206 pub fn remove_gauges(&self) {
209 self.stats.unregister();
210 }
211
212 fn evaluate_initial_subscription(
214 &self,
215 sender: Arc<ClientConnectionSender>,
216 query: Arc<Plan>,
217 tx: &TxId,
218 auth: &AuthCtx,
219 update_type: TableUpdateType,
220 ) -> Result<(SubscriptionUpdate, ExecutionMetrics), DBError> {
221 check_row_limit(
222 &[&query],
223 &self.relational_db,
224 tx,
225 |plan, tx| {
226 plan.plans_fragments()
227 .map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
228 .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
229 },
230 auth,
231 )?;
232
233 let table_id = query.subscribed_table_id();
234 let table_name = query.subscribed_table_name();
235
236 let plans = query
237 .plans_fragments()
238 .map(|fragment| fragment.optimized_physical_plan())
239 .cloned()
240 .map(|plan| plan.optimize())
241 .collect::<Result<Vec<_>, _>>()?
242 .into_iter()
243 .map(PipelinedProject::from)
244 .collect::<Vec<_>>();
245
246 let tx = DeltaTx::from(tx);
247
248 Ok(match sender.config.protocol {
249 Protocol::Binary => collect_table_update(&plans, table_id, table_name.into(), &tx, update_type)
250 .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics)),
251 Protocol::Text => collect_table_update(&plans, table_id, table_name.into(), &tx, update_type)
252 .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics)),
253 }?)
254 }
255
256 fn evaluate_queries(
257 &self,
258 sender: Arc<ClientConnectionSender>,
259 queries: &[Arc<Plan>],
260 tx: &TxId,
261 auth: &AuthCtx,
262 update_type: TableUpdateType,
263 ) -> Result<(FullSubscriptionUpdate, ExecutionMetrics), DBError> {
264 check_row_limit(
265 queries,
266 &self.relational_db,
267 tx,
268 |plan, tx| {
269 plan.plans_fragments()
270 .map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
271 .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
272 },
273 auth,
274 )?;
275
276 let tx = DeltaTx::from(tx);
277 match sender.config.protocol {
278 Protocol::Binary => {
279 let (update, metrics) = execute_plans(queries, &tx, update_type)?;
280 Ok((FormatSwitch::Bsatn(update), metrics))
281 }
282 Protocol::Text => {
283 let (update, metrics) = execute_plans(queries, &tx, update_type)?;
284 Ok((FormatSwitch::Json(update), metrics))
285 }
286 }
287 }
288
289 #[tracing::instrument(level = "trace", skip_all)]
291 pub fn add_single_subscription(
292 &self,
293 sender: Arc<ClientConnectionSender>,
294 request: SubscribeSingle,
295 timer: Instant,
296 _assert: Option<AssertTxFn>,
297 ) -> Result<Option<ExecutionMetrics>, DBError> {
298 let send_err_msg = |message| {
300 self.broadcast_queue.send_client_message(
301 sender.clone(),
302 SubscriptionMessage {
303 request_id: Some(request.request_id),
304 query_id: Some(request.query_id),
305 timer: Some(timer),
306 result: SubscriptionResult::Error(SubscriptionError {
307 table_id: None,
308 message,
309 }),
310 },
311 )
312 };
313
314 let sql = request.query;
315 let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
316 let hash = QueryHash::from_string(&sql, auth.caller, false);
317 let hash_with_param = QueryHash::from_string(&sql, auth.caller, true);
318
319 let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| {
320 let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
321 self.relational_db.report(&reducer, &tx_metrics, None);
322 });
323
324 let existing_query = {
325 let guard = self.subscriptions.read();
326 guard.query(&hash)
327 };
328
329 let query = return_on_err_with_sql!(
330 existing_query.map(Ok).unwrap_or_else(|| compile_query_with_hashes(
331 &auth,
332 &tx,
333 &sql,
334 hash,
335 hash_with_param
336 )
337 .map(Arc::new)),
338 sql,
339 send_err_msg
340 );
341
342 let (table_rows, metrics) = return_on_err_with_sql!(
343 self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Subscribe),
344 query.sql(),
345 send_err_msg
346 );
347
348 let mut subscriptions = self.subscriptions.write();
352 subscriptions.add_subscription(sender.clone(), query.clone(), request.query_id)?;
353
354 #[cfg(test)]
355 if let Some(assert) = _assert {
356 assert(&tx);
357 }
358
359 let _ = self.broadcast_queue.send_client_message(
367 sender.clone(),
368 SubscriptionMessage {
369 request_id: Some(request.request_id),
370 query_id: Some(request.query_id),
371 timer: Some(timer),
372 result: SubscriptionResult::Subscribe(SubscriptionRows {
373 table_id: query.subscribed_table_id(),
374 table_name: query.subscribed_table_name().into(),
375 table_rows,
376 }),
377 },
378 );
379 Ok(Some(metrics))
380 }
381
382 pub fn remove_single_subscription(
384 &self,
385 sender: Arc<ClientConnectionSender>,
386 request: Unsubscribe,
387 timer: Instant,
388 ) -> Result<Option<ExecutionMetrics>, DBError> {
389 let send_err_msg = |message| {
391 self.broadcast_queue.send_client_message(
392 sender.clone(),
393 SubscriptionMessage {
394 request_id: Some(request.request_id),
395 query_id: Some(request.query_id),
396 timer: Some(timer),
397 result: SubscriptionResult::Error(SubscriptionError {
398 table_id: None,
399 message,
400 }),
401 },
402 )
403 };
404
405 let mut subscriptions = self.subscriptions.write();
406
407 let queries = return_on_err!(
408 subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id),
409 send_err_msg,
411 None
412 );
413 let [query] = &*queries else {
416 let _ = send_err_msg("Internal error".into());
418 return Ok(None);
419 };
420
421 let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| {
422 let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
423 self.relational_db.report(&reducer, &tx_metrics, None);
424 });
425 let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
426 let (table_rows, metrics) = return_on_err_with_sql!(
427 self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Unsubscribe),
428 query.sql(),
429 send_err_msg
430 );
431
432 let _ = self.broadcast_queue.send_client_message(
440 sender.clone(),
441 SubscriptionMessage {
442 request_id: Some(request.request_id),
443 query_id: Some(request.query_id),
444 timer: Some(timer),
445 result: SubscriptionResult::Unsubscribe(SubscriptionRows {
446 table_id: query.subscribed_table_id(),
447 table_name: query.subscribed_table_name().into(),
448 table_rows,
449 }),
450 },
451 );
452 Ok(Some(metrics))
453 }
454
455 #[tracing::instrument(level = "trace", skip_all)]
457 pub fn remove_multi_subscription(
458 &self,
459 sender: Arc<ClientConnectionSender>,
460 request: UnsubscribeMulti,
461 timer: Instant,
462 ) -> Result<Option<ExecutionMetrics>, DBError> {
463 let send_err_msg = |message| {
465 self.broadcast_queue.send_client_message(
466 sender.clone(),
467 SubscriptionMessage {
468 request_id: Some(request.request_id),
469 query_id: Some(request.query_id),
470 timer: Some(timer),
471 result: SubscriptionResult::Error(SubscriptionError {
472 table_id: None,
473 message,
474 }),
475 },
476 )
477 };
478
479 let subscription_metrics = SubscriptionMetrics::new(&self.owner_identity, &WorkloadType::Unsubscribe);
480
481 let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| {
483 let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
484 self.relational_db.report(&reducer, &tx_metrics, None);
485 });
486
487 let removed_queries = {
488 let mut subscriptions = {
489 let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
491 let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
492 self.subscriptions.write()
493 };
494
495 return_on_err!(
496 subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id),
497 send_err_msg,
498 None
499 )
500 };
501
502 let (update, metrics) = return_on_err!(
503 self.evaluate_queries(
504 sender.clone(),
505 &removed_queries,
506 &tx,
507 &AuthCtx::new(self.owner_identity, sender.id.identity),
508 TableUpdateType::Unsubscribe,
509 ),
510 send_err_msg,
511 None
512 );
513
514 subscription_metrics
516 .num_queries_evaluated
517 .inc_by(removed_queries.len() as _);
518
519 let _ = self.broadcast_queue.send_client_message(
527 sender,
528 SubscriptionMessage {
529 request_id: Some(request.request_id),
530 query_id: Some(request.query_id),
531 timer: Some(timer),
532 result: SubscriptionResult::UnsubscribeMulti(SubscriptionData { data: update }),
533 },
534 );
535
536 Ok(Some(metrics))
537 }
538
539 #[allow(clippy::type_complexity)]
552 fn compile_queries(
553 &self,
554 sender: Identity,
555 queries: impl IntoIterator<Item = Box<str>>,
556 num_queries: usize,
557 metrics: &SubscriptionMetrics,
558 ) -> Result<(Vec<Arc<Plan>>, AuthCtx, TxId, HistogramTimer), DBError> {
559 let mut subscribe_to_all_tables = false;
560 let mut plans = Vec::with_capacity(num_queries);
561 let mut query_hashes = Vec::with_capacity(num_queries);
562
563 for sql in queries {
564 if is_subscribe_to_all_tables(&sql) {
565 subscribe_to_all_tables = true;
566 continue;
567 }
568 let hash = QueryHash::from_string(&sql, sender, false);
569 let hash_with_param = QueryHash::from_string(&sql, sender, true);
570 query_hashes.push((sql, hash, hash_with_param));
571 }
572
573 let auth = AuthCtx::new(self.owner_identity, sender);
574
575 let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| {
577 let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
578 self.relational_db.report(&reducer, &tx_metrics, None);
579 });
580
581 let compile_timer = metrics.compilation_time.start_timer();
582
583 let guard = {
584 let _wait_guard = metrics.lock_waiters.inc_scope();
586 let _wait_timer = metrics.lock_wait_time.start_timer();
587 self.subscriptions.read()
588 };
589
590 if subscribe_to_all_tables {
591 plans.extend(
592 super::subscription::get_all(&self.relational_db, &tx, &auth)?
593 .into_iter()
594 .map(Arc::new),
595 );
596 }
597
598 let mut new_queries = 0;
599
600 for (sql, hash, hash_with_param) in query_hashes {
601 if let Some(unit) = guard.query(&hash) {
602 plans.push(unit);
603 } else if let Some(unit) = guard.query(&hash_with_param) {
604 plans.push(unit);
605 } else {
606 plans.push(Arc::new(
607 compile_query_with_hashes(&auth, &tx, &sql, hash, hash_with_param).map_err(|err| {
608 DBError::WithSql {
609 error: Box::new(DBError::Other(err.into())),
610 sql,
611 }
612 })?,
613 ));
614 new_queries += 1;
615 }
616 }
617
618 metrics.num_new_queries_subscribed.inc_by(new_queries);
620
621 Ok((plans, auth, scopeguard::ScopeGuard::into_inner(tx), compile_timer))
622 }
623
624 pub fn send_client_message(
628 &self,
629 recipient: Arc<ClientConnectionSender>,
630 message: impl Into<SerializableMessage>,
631 _tx_id: &TxId,
632 ) -> Result<(), BroadcastError> {
633 self.broadcast_queue.send_client_message(recipient, message)
634 }
635
636 #[tracing::instrument(level = "trace", skip_all)]
637 pub fn add_multi_subscription(
638 &self,
639 sender: Arc<ClientConnectionSender>,
640 request: SubscribeMulti,
641 timer: Instant,
642 _assert: Option<AssertTxFn>,
643 ) -> Result<Option<ExecutionMetrics>, DBError> {
644 let send_err_msg = |message| {
646 let _ = self.broadcast_queue.send_client_message(
647 sender.clone(),
648 SubscriptionMessage {
649 request_id: Some(request.request_id),
650 query_id: Some(request.query_id),
651 timer: Some(timer),
652 result: SubscriptionResult::Error(SubscriptionError {
653 table_id: None,
654 message,
655 }),
656 },
657 );
658 };
659
660 let num_queries = request.query_strings.len();
661
662 let subscription_metrics = SubscriptionMetrics::new(&self.owner_identity, &WorkloadType::Subscribe);
663
664 subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);
666
667 let (queries, auth, tx, compile_timer) = return_on_err!(
668 self.compile_queries(
669 sender.id.identity,
670 request.query_strings,
671 num_queries,
672 &subscription_metrics
673 ),
674 send_err_msg,
675 None
676 );
677 let tx = scopeguard::guard(tx, |tx| {
678 let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
679 self.relational_db.report(&reducer, &tx_metrics, None);
680 });
681
682 let queries = {
687 let mut subscriptions = {
688 let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
690 let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
691 self.subscriptions.write()
692 };
693
694 subscriptions.add_subscription_multi(sender.clone(), queries, request.query_id)?
695 };
696
697 drop(compile_timer);
699
700 let Ok((update, metrics)) =
701 self.evaluate_queries(sender.clone(), &queries, &tx, &auth, TableUpdateType::Subscribe)
702 else {
703 let mut subscriptions = {
705 let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
707 let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
708 self.subscriptions.write()
709 };
710 {
711 let _compile_timer = subscription_metrics.compilation_time.start_timer();
712 subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id)?;
713 }
714
715 send_err_msg("Internal error evaluating queries".into());
716 return Ok(None);
717 };
718
719 subscription_metrics.num_queries_evaluated.inc_by(queries.len() as _);
721
722 #[cfg(test)]
723 if let Some(assert) = _assert {
724 assert(&tx);
725 }
726
727 let _ = self.broadcast_queue.send_client_message(
736 sender.clone(),
737 SubscriptionMessage {
738 request_id: Some(request.request_id),
739 query_id: Some(request.query_id),
740 timer: Some(timer),
741 result: SubscriptionResult::SubscribeMulti(SubscriptionData { data: update }),
742 },
743 );
744
745 Ok(Some(metrics))
746 }
747
748 #[tracing::instrument(level = "trace", skip_all)]
751 pub fn add_legacy_subscriber(
752 &self,
753 sender: Arc<ClientConnectionSender>,
754 subscription: Subscribe,
755 timer: Instant,
756 _assert: Option<AssertTxFn>,
757 ) -> Result<ExecutionMetrics, DBError> {
758 let num_queries = subscription.query_strings.len();
759 let subscription_metrics = SubscriptionMetrics::new(&self.owner_identity, &WorkloadType::Subscribe);
760
761 subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);
763
764 let (queries, auth, tx, compile_timer) = self.compile_queries(
765 sender.id.identity,
766 subscription.query_strings,
767 num_queries,
768 &subscription_metrics,
769 )?;
770 let tx = scopeguard::guard(tx, |tx| {
771 let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
772 self.relational_db.report(&reducer, &tx_metrics, None);
773 });
774
775 check_row_limit(
776 &queries,
777 &self.relational_db,
778 &tx,
779 |plan, tx| {
780 plan.plans_fragments()
781 .map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
782 .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
783 },
784 &auth,
785 )?;
786
787 drop(compile_timer);
789
790 let tx = DeltaTx::from(&*tx);
791 let (database_update, metrics) = match sender.config.protocol {
792 Protocol::Binary => execute_plans(&queries, &tx, TableUpdateType::Subscribe)
793 .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
794 Protocol::Text => execute_plans(&queries, &tx, TableUpdateType::Subscribe)
795 .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
796 };
797
798 {
802 let _compile_timer = subscription_metrics.compilation_time.start_timer();
803
804 let mut subscriptions = {
805 let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
807 let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
808 self.subscriptions.write()
809 };
810
811 subscriptions.set_legacy_subscription(sender.clone(), queries.into_iter());
812 }
813
814 #[cfg(test)]
815 if let Some(assert) = _assert {
816 assert(&tx);
817 }
818
819 let _ = self.broadcast_queue.send_client_message(
827 sender,
828 SubscriptionUpdateMessage {
829 database_update,
830 request_id: Some(subscription.request_id),
831 timer: Some(timer),
832 },
833 );
834
835 Ok(metrics)
836 }
837
838 pub fn remove_subscriber(&self, client_id: ClientActorId) {
839 let mut subscriptions = self.subscriptions.write();
840 subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id));
841 }
842
843 pub fn commit_and_broadcast_event(
848 &self,
849 caller: Option<Arc<ClientConnectionSender>>,
850 mut event: ModuleEvent,
851 tx: MutTx,
852 ) -> Result<Result<(Arc<ModuleEvent>, ExecutionMetrics), WriteConflict>, DBError> {
853 let subscription_metrics = SubscriptionMetrics::new(&self.owner_identity, &WorkloadType::Update);
854
855 let subscriptions = {
858 let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
860 let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
861 self.subscriptions.read()
862 };
863
864 let stdb = &self.relational_db;
865 let (read_tx, tx_data, tx_metrics_mut) = match &mut event.status {
868 EventStatus::Committed(db_update) => {
869 let Some((tx_data, tx_metrics, read_tx)) = stdb.commit_tx_downgrade(tx, Workload::Update)? else {
870 return Ok(Err(WriteConflict));
871 };
872 *db_update = DatabaseUpdate::from_writes(&tx_data);
873 (read_tx, Some(tx_data), tx_metrics)
874 }
875 EventStatus::Failed(_) | EventStatus::OutOfEnergy => {
876 let (tx_metrics, tx) = stdb.rollback_mut_tx_downgrade(tx, Workload::Update);
877 (tx, None, tx_metrics)
878 }
879 };
880
881 let mut read_tx = scopeguard::guard(read_tx, |tx| {
883 let (tx_metrics_read, reducer) = self.relational_db.release_tx(tx);
884 stdb.report_tx_metricses(&reducer, tx_data.as_ref(), Some(&tx_metrics_mut), &tx_metrics_read);
885 });
886 let delta_read_tx = tx_data
888 .as_ref()
889 .map(|tx_data| DeltaTx::new(&read_tx, tx_data, subscriptions.index_ids_for_subscriptions()))
890 .unwrap_or_else(|| DeltaTx::from(&*read_tx));
891
892 let event = Arc::new(event);
893 let mut update_metrics: ExecutionMetrics = ExecutionMetrics::default();
894
895 match &event.status {
896 EventStatus::Committed(_) => {
897 update_metrics = subscriptions.eval_updates_sequential(&delta_read_tx, event.clone(), caller);
898 }
899 EventStatus::Failed(_) => {
900 if let Some(client) = caller {
901 let message = TransactionUpdateMessage {
902 event: Some(event.clone()),
903 database_update: SubscriptionUpdateMessage::default_for_protocol(client.config.protocol, None),
904 };
905
906 let _ = self.broadcast_queue.send_client_message(client, message);
907 } else {
908 log::trace!("Reducer failed but there is no client to send the failure to!")
909 }
910 }
911 EventStatus::OutOfEnergy => {} }
913
914 read_tx.metrics.merge(update_metrics);
916
917 Ok(Ok((event, update_metrics)))
918 }
919}
920
921pub struct WriteConflict;
922
923#[cfg(test)]
924mod tests {
925 use super::{AssertTxFn, ModuleSubscriptions};
926 use crate::client::messages::{
927 SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult,
928 SubscriptionUpdateMessage, TransactionUpdateMessage,
929 };
930 use crate::client::{ClientActorId, ClientConfig, ClientConnectionSender, ClientName, Protocol};
931 use crate::db::datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID};
932 use crate::db::relational_db::tests_utils::{
933 begin_mut_tx, begin_tx, insert, with_auto_commit, with_read_only, TestDB,
934 };
935 use crate::db::relational_db::RelationalDB;
936 use crate::error::DBError;
937 use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall};
938 use crate::messages::websocket as ws;
939 use crate::sql::execute::run;
940 use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
941 use crate::subscription::query::compile_read_only_query;
942 use crate::subscription::TableUpdateType;
943 use hashbrown::HashMap;
944 use itertools::Itertools;
945 use pretty_assertions::assert_matches;
946 use spacetimedb_client_api_messages::energy::EnergyQuanta;
947 use spacetimedb_client_api_messages::websocket::{
948 CompressableQueryUpdate, Compression, FormatSwitch, QueryId, Subscribe, SubscribeMulti, SubscribeSingle,
949 TableUpdate, Unsubscribe, UnsubscribeMulti,
950 };
951 use spacetimedb_execution::dml::MutDatastore;
952 use spacetimedb_lib::bsatn::ToBsatn;
953 use spacetimedb_lib::db::auth::StAccess;
954 use spacetimedb_lib::identity::AuthCtx;
955 use spacetimedb_lib::metrics::ExecutionMetrics;
956 use spacetimedb_lib::{bsatn, ConnectionId, ProductType, ProductValue, Timestamp};
957 use spacetimedb_lib::{error::ResultTest, AlgebraicType, Identity};
958 use spacetimedb_primitives::TableId;
959 use spacetimedb_sats::product;
960 use std::time::Instant;
961 use std::{sync::Arc, time::Duration};
962 use tokio::sync::mpsc::{self, Receiver};
963
964 fn add_subscriber(db: Arc<RelationalDB>, sql: &str, assert: Option<AssertTxFn>) -> Result<(), DBError> {
965 let runtime = tokio::runtime::Runtime::new().unwrap();
967 let _rt = runtime.enter();
968 let owner = Identity::from_byte_array([1; 32]);
969 let client = ClientActorId::for_test(Identity::ZERO);
970 let config = ClientConfig::for_test();
971 let sender = Arc::new(ClientConnectionSender::dummy(client, config));
972 let send_worker_queue = spawn_send_worker(None);
973 let module_subscriptions = ModuleSubscriptions::new(
974 db.clone(),
975 SubscriptionManager::for_test_without_metrics_arc_rwlock(),
976 send_worker_queue,
977 owner,
978 );
979
980 let subscribe = Subscribe {
981 query_strings: [sql.into()].into(),
982 request_id: 0,
983 };
984 module_subscriptions.add_legacy_subscriber(sender, subscribe, Instant::now(), assert)?;
985 Ok(())
986 }
987
988 fn relational_db() -> anyhow::Result<Arc<RelationalDB>> {
990 let TestDB { db, .. } = TestDB::in_memory()?;
991 Ok(Arc::new(db))
992 }
993
994 fn single_subscribe(sql: &str, query_id: u32) -> SubscribeSingle {
996 SubscribeSingle {
997 query: sql.into(),
998 request_id: 0,
999 query_id: QueryId::new(query_id),
1000 }
1001 }
1002
1003 fn multi_subscribe(query_strings: &[&'static str], query_id: u32) -> SubscribeMulti {
1005 SubscribeMulti {
1006 query_strings: query_strings
1007 .iter()
1008 .map(|sql| String::from(*sql).into_boxed_str())
1009 .collect(),
1010 request_id: 0,
1011 query_id: QueryId::new(query_id),
1012 }
1013 }
1014
1015 fn multi_unsubscribe(query_id: u32) -> UnsubscribeMulti {
1017 UnsubscribeMulti {
1018 request_id: 0,
1019 query_id: QueryId::new(query_id),
1020 }
1021 }
1022
1023 fn single_unsubscribe(query_id: u32) -> Unsubscribe {
1025 Unsubscribe {
1026 request_id: 0,
1027 query_id: QueryId::new(query_id),
1028 }
1029 }
1030
1031 fn module_event() -> ModuleEvent {
1033 ModuleEvent {
1034 timestamp: Timestamp::now(),
1035 caller_identity: Identity::ZERO,
1036 caller_connection_id: None,
1037 function_call: ModuleFunctionCall::default(),
1038 status: EventStatus::Committed(DatabaseUpdate::default()),
1039 energy_quanta_used: EnergyQuanta { quanta: 0 },
1040 host_execution_duration: Duration::from_millis(0),
1041 request_id: None,
1042 timer: None,
1043 }
1044 }
1045
1046 fn identity_from_u8(v: u8) -> Identity {
1048 Identity::from_byte_array([v; 32])
1049 }
1050
1051 fn connection_id_from_u8(v: u8) -> ConnectionId {
1053 ConnectionId::from_be_byte_array([v; 16])
1054 }
1055
1056 fn client_id_from_u8(v: u8) -> ClientActorId {
1059 ClientActorId {
1060 identity: identity_from_u8(v),
1061 connection_id: connection_id_from_u8(v),
1062 name: ClientName(v as u64),
1063 }
1064 }
1065
1066 fn client_connection_with_compression(
1068 client_id: ClientActorId,
1069 compression: Compression,
1070 ) -> (Arc<ClientConnectionSender>, Receiver<SerializableMessage>) {
1071 let (sender, rx) = ClientConnectionSender::dummy_with_channel(
1072 client_id,
1073 ClientConfig {
1074 protocol: Protocol::Binary,
1075 compression,
1076 tx_update_full: true,
1077 },
1078 );
1079 (Arc::new(sender), rx)
1080 }
1081
1082 fn client_connection(client_id: ClientActorId) -> (Arc<ClientConnectionSender>, Receiver<SerializableMessage>) {
1084 client_connection_with_compression(client_id, Compression::None)
1085 }
1086
1087 fn insert_rls_rules(
1089 db: &RelationalDB,
1090 table_ids: impl IntoIterator<Item = TableId>,
1091 rules: impl IntoIterator<Item = &'static str>,
1092 ) -> anyhow::Result<()> {
1093 with_auto_commit(db, |tx| {
1094 for (table_id, sql) in table_ids.into_iter().zip(rules) {
1095 db.insert(
1096 tx,
1097 ST_ROW_LEVEL_SECURITY_ID,
1098 &ProductValue::from(StRowLevelSecurityRow {
1099 table_id,
1100 sql: sql.into(),
1101 })
1102 .to_bsatn_vec()?,
1103 )?;
1104 }
1105 Ok(())
1106 })
1107 }
1108
1109 fn subscribe_single(
1111 subs: &ModuleSubscriptions,
1112 sql: &'static str,
1113 sender: Arc<ClientConnectionSender>,
1114 counter: &mut u32,
1115 ) -> anyhow::Result<()> {
1116 *counter += 1;
1117 subs.add_single_subscription(sender, single_subscribe(sql, *counter), Instant::now(), None)?;
1118 Ok(())
1119 }
1120
1121 fn subscribe_multi(
1123 subs: &ModuleSubscriptions,
1124 queries: &[&'static str],
1125 sender: Arc<ClientConnectionSender>,
1126 counter: &mut u32,
1127 ) -> anyhow::Result<ExecutionMetrics> {
1128 *counter += 1;
1129 let metrics = subs
1130 .add_multi_subscription(sender, multi_subscribe(queries, *counter), Instant::now(), None)
1131 .map(|metrics| metrics.unwrap_or_default())?;
1132 Ok(metrics)
1133 }
1134
1135 fn unsubscribe_single(
1137 subs: &ModuleSubscriptions,
1138 sender: Arc<ClientConnectionSender>,
1139 query_id: u32,
1140 ) -> anyhow::Result<()> {
1141 subs.remove_single_subscription(sender, single_unsubscribe(query_id), Instant::now())?;
1142 Ok(())
1143 }
1144
1145 fn unsubscribe_multi(
1147 subs: &ModuleSubscriptions,
1148 sender: Arc<ClientConnectionSender>,
1149 query_id: u32,
1150 ) -> anyhow::Result<()> {
1151 subs.remove_multi_subscription(sender, multi_unsubscribe(query_id), Instant::now())?;
1152 Ok(())
1153 }
1154
1155 async fn assert_tx_update_for_table(
1157 rx: &mut Receiver<SerializableMessage>,
1158 table_id: TableId,
1159 schema: &ProductType,
1160 inserts: impl IntoIterator<Item = ProductValue>,
1161 deletes: impl IntoIterator<Item = ProductValue>,
1162 ) {
1163 match rx.recv().await {
1164 Some(SerializableMessage::TxUpdate(TransactionUpdateMessage {
1165 database_update:
1166 SubscriptionUpdateMessage {
1167 database_update: FormatSwitch::Bsatn(ws::DatabaseUpdate { mut tables }),
1168 ..
1169 },
1170 ..
1171 })) => {
1172 assert_eq!(tables.len(), 1);
1174
1175 let table_update = tables.pop().unwrap();
1176
1177 assert_ne!(table_update.num_rows, 0);
1179
1180 assert_eq!(table_update.table_id, table_id);
1182
1183 let mut rows_received: HashMap<ProductValue, i32> = HashMap::new();
1184
1185 for uncompressed in table_update.updates {
1186 let CompressableQueryUpdate::Uncompressed(table_update) = uncompressed else {
1187 panic!("expected an uncompressed table update")
1188 };
1189
1190 for row in table_update
1191 .inserts
1192 .into_iter()
1193 .map(|bytes| ProductValue::decode(schema, &mut &*bytes).unwrap())
1194 {
1195 *rows_received.entry(row).or_insert(0) += 1;
1196 }
1197
1198 for row in table_update
1199 .deletes
1200 .into_iter()
1201 .map(|bytes| ProductValue::decode(schema, &mut &*bytes).unwrap())
1202 {
1203 *rows_received.entry(row).or_insert(0) -= 1;
1204 }
1205 }
1206
1207 assert_eq!(
1208 rows_received
1209 .iter()
1210 .filter(|(_, n)| n > &&0)
1211 .map(|(row, _)| row)
1212 .cloned()
1213 .sorted()
1214 .collect::<Vec<_>>(),
1215 inserts.into_iter().sorted().collect::<Vec<_>>()
1216 );
1217 assert_eq!(
1218 rows_received
1219 .iter()
1220 .filter(|(_, n)| n < &&0)
1221 .map(|(row, _)| row)
1222 .cloned()
1223 .sorted()
1224 .collect::<Vec<_>>(),
1225 deletes.into_iter().sorted().collect::<Vec<_>>()
1226 );
1227 }
1228 Some(msg) => panic!("expected a TxUpdate, but got {:#?}", msg),
1229 None => panic!("The receiver closed due to an error"),
1230 }
1231 }
1232
1233 fn commit_tx(
1235 db: &RelationalDB,
1236 subs: &ModuleSubscriptions,
1237 deletes: impl IntoIterator<Item = (TableId, ProductValue)>,
1238 inserts: impl IntoIterator<Item = (TableId, ProductValue)>,
1239 ) -> anyhow::Result<ExecutionMetrics> {
1240 let mut tx = begin_mut_tx(db);
1241 for (table_id, row) in deletes {
1242 tx.delete_product_value(table_id, &row)?;
1243 }
1244 for (table_id, row) in inserts {
1245 db.insert(&mut tx, table_id, &bsatn::to_vec(&row)?)?;
1246 }
1247
1248 let Ok(Ok((_, metrics))) = subs.commit_and_broadcast_event(None, module_event(), tx) else {
1249 panic!("Encountered an error in `commit_and_broadcast_event`");
1250 };
1251 Ok(metrics)
1252 }
1253
1254 #[test]
1255 fn test_subscribe_metrics() -> anyhow::Result<()> {
1256 let client_id = client_id_from_u8(1);
1257 let (sender, _) = client_connection(client_id);
1258
1259 let db = relational_db()?;
1260 let (subs, _runtime) = ModuleSubscriptions::for_test_new_runtime(db.clone());
1261
1262 let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::U64)], &[0.into()])?;
1264 with_auto_commit(&db, |tx| -> anyhow::Result<_> {
1265 db.insert(tx, table_id, &bsatn::to_vec(&product![1_u64])?)?;
1266 Ok(())
1267 })?;
1268
1269 let auth = AuthCtx::for_testing();
1270 let sql = "select * from t where id = 1";
1271 let tx = begin_tx(&db);
1272 let plan = compile_read_only_query(&auth, &tx, sql)?;
1273 let plan = Arc::new(plan);
1274
1275 let (_, metrics) = subs.evaluate_queries(sender, &[plan], &tx, &auth, TableUpdateType::Subscribe)?;
1276
1277 assert_eq!(metrics.index_seeks, 1);
1279 assert_eq!(metrics.bytes_scanned, 8);
1281 assert_eq!(metrics.bytes_written, 0);
1283 assert_eq!(metrics.bytes_sent_to_clients, 8);
1286
1287 assert!(metrics.rows_scanned > 0);
1290 Ok(())
1291 }
1292
1293 fn check_subscription_err(sql: &str, result: Option<SerializableMessage>) {
1294 if let Some(SerializableMessage::Subscription(SubscriptionMessage {
1295 result: SubscriptionResult::Error(SubscriptionError { message, .. }),
1296 ..
1297 })) = result
1298 {
1299 assert!(
1300 message.contains(sql),
1301 "Expected error message to contain the SQL query: {sql}, but got: {message}",
1302 );
1303 return;
1304 }
1305 panic!("Expected a subscription error message, but got: {:?}", result);
1306 }
1307
1308 #[tokio::test]
1310 async fn subscribe_single_error() -> anyhow::Result<()> {
1311 let client_id = client_id_from_u8(1);
1312 let (tx, mut rx) = client_connection(client_id);
1313
1314 let db = relational_db()?;
1315 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1316
1317 db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?;
1318
1319 let sql = "select r.* from t";
1321 subscribe_single(&subs, sql, tx, &mut 0)?;
1322
1323 check_subscription_err(sql, rx.recv().await);
1324
1325 Ok(())
1326 }
1327
1328 #[tokio::test]
1330 async fn subscribe_multi_error() -> anyhow::Result<()> {
1331 let client_id = client_id_from_u8(1);
1332 let (tx, mut rx) = client_connection(client_id);
1333
1334 let db = relational_db()?;
1335 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1336
1337 db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?;
1338
1339 let sql = "select r.* from t";
1341 subscribe_multi(&subs, &[sql], tx, &mut 0)?;
1342
1343 check_subscription_err(sql, rx.recv().await);
1344
1345 Ok(())
1346 }
1347
1348 #[tokio::test]
1350 async fn unsubscribe_single_error() -> anyhow::Result<()> {
1351 let client_id = client_id_from_u8(1);
1352 let (tx, mut rx) = client_connection(client_id);
1353
1354 let db = relational_db()?;
1355 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1356
1357 let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::U8)], &[0.into()])?;
1359 let index_id = with_read_only(&db, |tx| {
1360 db.schema_for_table(&*tx, table_id).map(|schema| {
1361 schema
1362 .indexes
1363 .first()
1364 .map(|index_schema| index_schema.index_id)
1365 .unwrap()
1366 })
1367 })?;
1368
1369 let mut query_id = 0;
1370
1371 let sql = "select * from t where id = 1";
1373 subscribe_single(&subs, sql, tx.clone(), &mut query_id)?;
1374
1375 assert!(matches!(
1377 rx.recv().await,
1378 Some(SerializableMessage::Subscription(SubscriptionMessage {
1379 result: SubscriptionResult::Subscribe(..),
1380 ..
1381 }))
1382 ));
1383
1384 with_auto_commit(&db, |tx| db.drop_index(tx, index_id))?;
1386
1387 unsubscribe_single(&subs, tx, query_id)?;
1389
1390 check_subscription_err(sql, rx.recv().await);
1396
1397 Ok(())
1398 }
1399
1400 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1404 async fn unsubscribe_multi_error() -> anyhow::Result<()> {
1405 let client_id = client_id_from_u8(1);
1406 let (tx, mut rx) = client_connection(client_id);
1407
1408 let db = relational_db()?;
1409 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1410
1411 let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::U8)], &[0.into()])?;
1413 let index_id = with_read_only(&db, |tx| {
1414 db.schema_for_table(&*tx, table_id).map(|schema| {
1415 schema
1416 .indexes
1417 .first()
1418 .map(|index_schema| index_schema.index_id)
1419 .unwrap()
1420 })
1421 })?;
1422
1423 commit_tx(&db, &subs, [], [(table_id, product![0_u8])])?;
1424
1425 let mut query_id = 0;
1426
1427 let sql = "select * from t where id = 1";
1429 subscribe_multi(&subs, &[sql], tx.clone(), &mut query_id)?;
1430
1431 assert!(matches!(
1433 rx.recv().await,
1434 Some(SerializableMessage::Subscription(SubscriptionMessage {
1435 result: SubscriptionResult::SubscribeMulti(..),
1436 ..
1437 }))
1438 ));
1439
1440 with_auto_commit(&db, |tx| db.drop_index(tx, index_id))?;
1442
1443 unsubscribe_multi(&subs, tx, query_id)?;
1445
1446 check_subscription_err(sql, rx.recv().await);
1452
1453 Ok(())
1454 }
1455
1456 #[tokio::test]
1458 async fn tx_update_error() -> anyhow::Result<()> {
1459 let client_id = client_id_from_u8(1);
1460 let (tx, mut rx) = client_connection(client_id);
1461
1462 let db = relational_db()?;
1463 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1464
1465 let t_id = db.create_table_for_test("t", &[("id", AlgebraicType::U8)], &[0.into()])?;
1467 let s_id = db.create_table_for_test("s", &[("id", AlgebraicType::U8)], &[0.into()])?;
1468 let index_id = with_read_only(&db, |tx| {
1469 db.schema_for_table(&*tx, s_id).map(|schema| {
1470 schema
1471 .indexes
1472 .first()
1473 .map(|index_schema| index_schema.index_id)
1474 .unwrap()
1475 })
1476 })?;
1477 let sql = "select t.* from t join s on t.id = s.id";
1478 subscribe_single(&subs, sql, tx, &mut 0)?;
1479
1480 assert!(matches!(
1482 rx.recv().await,
1483 Some(SerializableMessage::Subscription(SubscriptionMessage {
1484 result: SubscriptionResult::Subscribe(..),
1485 ..
1486 }))
1487 ));
1488
1489 with_auto_commit(&db, |tx| db.drop_index(tx, index_id))?;
1491
1492 let mut tx = begin_mut_tx(&db);
1494 db.insert(&mut tx, t_id, &bsatn::to_vec(&product![2_u8])?)?;
1495
1496 assert!(matches!(
1497 subs.commit_and_broadcast_event(None, module_event(), tx),
1498 Ok(Ok(_))
1499 ));
1500
1501 check_subscription_err(sql, rx.recv().await);
1507
1508 Ok(())
1509 }
1510
1511 #[tokio::test]
1513 async fn test_parameterized_subscription() -> anyhow::Result<()> {
1514 let id_for_a = identity_from_u8(1);
1516 let id_for_b = identity_from_u8(2);
1517
1518 let client_id_for_a = client_id_from_u8(1);
1519 let client_id_for_b = client_id_from_u8(2);
1520
1521 let (tx_for_a, mut rx_for_a) = client_connection(client_id_for_a);
1523 let (tx_for_b, mut rx_for_b) = client_connection(client_id_for_b);
1524
1525 let db = relational_db()?;
1526 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1527
1528 let schema = [("identity", AlgebraicType::identity())];
1529
1530 let table_id = db.create_table_for_test("t", &schema, &[])?;
1531
1532 let mut query_ids = 0;
1533
1534 subscribe_multi(
1537 &subs,
1538 &["select * from t where identity = :sender"],
1539 tx_for_a,
1540 &mut query_ids,
1541 )?;
1542 subscribe_multi(
1543 &subs,
1544 &["select * from t where identity = :sender"],
1545 tx_for_b,
1546 &mut query_ids,
1547 )?;
1548
1549 assert!(matches!(
1551 rx_for_a.recv().await,
1552 Some(SerializableMessage::Subscription(_))
1553 ));
1554 assert!(matches!(
1555 rx_for_b.recv().await,
1556 Some(SerializableMessage::Subscription(_))
1557 ));
1558
1559 let mut tx = begin_mut_tx(&db);
1561 db.insert(&mut tx, table_id, &bsatn::to_vec(&product![id_for_a])?)?;
1562 db.insert(&mut tx, table_id, &bsatn::to_vec(&product![id_for_b])?)?;
1563
1564 assert!(matches!(
1565 subs.commit_and_broadcast_event(None, module_event(), tx),
1566 Ok(Ok(_))
1567 ));
1568
1569 let schema = ProductType::from([AlgebraicType::identity()]);
1570
1571 assert_tx_update_for_table(&mut rx_for_a, table_id, &schema, [product![id_for_a]], []).await;
1573 assert_tx_update_for_table(&mut rx_for_b, table_id, &schema, [product![id_for_b]], []).await;
1574 Ok(())
1575 }
1576
1577 #[tokio::test]
1579 async fn test_rls_subscription() -> anyhow::Result<()> {
1580 let id_for_a = identity_from_u8(1);
1582 let id_for_b = identity_from_u8(2);
1583
1584 let client_id_for_a = client_id_from_u8(1);
1585 let client_id_for_b = client_id_from_u8(2);
1586
1587 let (tx_for_a, mut rx_for_a) = client_connection(client_id_for_a);
1589 let (tx_for_b, mut rx_for_b) = client_connection(client_id_for_b);
1590
1591 let db = relational_db()?;
1592 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1593
1594 let schema = [("id", AlgebraicType::identity())];
1595
1596 let u_id = db.create_table_for_test("u", &schema, &[0.into()])?;
1597 let v_id = db.create_table_for_test("v", &schema, &[0.into()])?;
1598 let w_id = db.create_table_for_test("w", &schema, &[0.into()])?;
1599
1600 insert_rls_rules(
1601 &db,
1602 [u_id, v_id, w_id, w_id],
1603 [
1604 "select * from u where id = :sender",
1605 "select * from v where id = :sender",
1606 "select w.* from u join w on u.id = w.id",
1607 "select w.* from v join w on v.id = w.id",
1608 ],
1609 )?;
1610
1611 let mut query_ids = 0;
1612
1613 subscribe_multi(&subs, &["select * from w"], tx_for_a, &mut query_ids)?;
1617 subscribe_multi(&subs, &["select * from w"], tx_for_b, &mut query_ids)?;
1618
1619 assert!(matches!(
1621 rx_for_a.recv().await,
1622 Some(SerializableMessage::Subscription(_))
1623 ));
1624 assert!(matches!(
1625 rx_for_b.recv().await,
1626 Some(SerializableMessage::Subscription(_))
1627 ));
1628
1629 let mut tx = begin_mut_tx(&db);
1633 db.insert(&mut tx, u_id, &bsatn::to_vec(&product![id_for_a])?)?;
1634 db.insert(&mut tx, v_id, &bsatn::to_vec(&product![id_for_b])?)?;
1635 db.insert(&mut tx, w_id, &bsatn::to_vec(&product![id_for_a])?)?;
1636 db.insert(&mut tx, w_id, &bsatn::to_vec(&product![id_for_b])?)?;
1637
1638 assert!(matches!(
1639 subs.commit_and_broadcast_event(None, module_event(), tx),
1640 Ok(Ok(_))
1641 ));
1642
1643 let schema = ProductType::from([AlgebraicType::identity()]);
1644
1645 assert_tx_update_for_table(&mut rx_for_a, w_id, &schema, [product![id_for_a]], []).await;
1647 assert_tx_update_for_table(&mut rx_for_b, w_id, &schema, [product![id_for_b]], []).await;
1648 Ok(())
1649 }
1650
1651 #[tokio::test]
1653 async fn test_rls_for_owner() -> anyhow::Result<()> {
1654 let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(0));
1656 let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(1));
1657
1658 let db = relational_db()?;
1659 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1660
1661 let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::identity())], &[0.into()])?;
1663
1664 insert_rls_rules(&db, [table_id], ["select * from t where id = :sender"])?;
1666
1667 let mut query_ids = 0;
1668
1669 subscribe_multi(&subs, &["select * from t"], tx_for_a, &mut query_ids)?;
1671 subscribe_multi(&subs, &["select * from t"], tx_for_b, &mut query_ids)?;
1672
1673 assert_matches!(
1675 rx_for_a.recv().await,
1676 Some(SerializableMessage::Subscription(SubscriptionMessage {
1677 result: SubscriptionResult::SubscribeMulti(_),
1678 ..
1679 }))
1680 );
1681 assert_matches!(
1682 rx_for_b.recv().await,
1683 Some(SerializableMessage::Subscription(SubscriptionMessage {
1684 result: SubscriptionResult::SubscribeMulti(_),
1685 ..
1686 }))
1687 );
1688
1689 let schema = ProductType::from([AlgebraicType::identity()]);
1690
1691 let id_for_b = identity_from_u8(1);
1692 let id_for_c = identity_from_u8(2);
1693
1694 commit_tx(
1695 &db,
1696 &subs,
1697 [],
1698 [
1699 (table_id, product![id_for_b]),
1701 (table_id, product![id_for_c]),
1702 ],
1703 )?;
1704
1705 assert_tx_update_for_table(
1706 &mut rx_for_a,
1707 table_id,
1708 &schema,
1709 [product![id_for_b], product![id_for_c]],
1711 [],
1712 )
1713 .await;
1714
1715 assert_tx_update_for_table(
1716 &mut rx_for_b,
1717 table_id,
1718 &schema,
1719 [product![id_for_b]],
1721 [],
1722 )
1723 .await;
1724
1725 Ok(())
1726 }
1727
1728 #[tokio::test]
1730 async fn test_no_empty_updates() -> anyhow::Result<()> {
1731 let (tx, mut rx) = client_connection(client_id_from_u8(1));
1733
1734 let db = relational_db()?;
1735 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1736
1737 let schema = [("x", AlgebraicType::U8)];
1738
1739 let t_id = db.create_table_for_test("t", &schema, &[])?;
1740
1741 subscribe_multi(&subs, &["select * from t where x = 0"], tx, &mut 0)?;
1743
1744 assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))));
1746
1747 let mut tx = begin_mut_tx(&db);
1749 db.insert(&mut tx, t_id, &bsatn::to_vec(&product![1_u8])?)?;
1750
1751 assert!(matches!(
1752 subs.commit_and_broadcast_event(None, module_event(), tx),
1753 Ok(Ok(_))
1754 ));
1755
1756 let mut tx = begin_mut_tx(&db);
1758 db.insert(&mut tx, t_id, &bsatn::to_vec(&product![0_u8])?)?;
1759
1760 assert!(matches!(
1761 subs.commit_and_broadcast_event(None, module_event(), tx),
1762 Ok(Ok(_))
1763 ));
1764
1765 let schema = ProductType::from([AlgebraicType::U8]);
1766
1767 assert_tx_update_for_table(&mut rx, t_id, &schema, [product![0_u8]], []).await;
1770 Ok(())
1771 }
1772
1773 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1779 async fn test_no_compression_for_subscribe() -> anyhow::Result<()> {
1780 let (tx, mut rx) = client_connection_with_compression(client_id_from_u8(1), Compression::Brotli);
1782
1783 let db = relational_db()?;
1784 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1785
1786 let table_id = db.create_table_for_test("t", &[("x", AlgebraicType::U64)], &[])?;
1787
1788 let mut inserts = vec![];
1789
1790 for i in 0..16_000u64 {
1791 inserts.push((table_id, product![i]));
1792 }
1793
1794 commit_tx(&db, &subs, [], inserts)?;
1797
1798 subscribe_multi(&subs, &["select * from t"], tx, &mut 0)?;
1800
1801 match rx.recv().await {
1803 Some(SerializableMessage::Subscription(SubscriptionMessage {
1804 result:
1805 SubscriptionResult::SubscribeMulti(SubscriptionData {
1806 data: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }),
1807 }),
1808 ..
1809 })) => {
1810 assert!(tables.iter().all(|TableUpdate { updates, .. }| updates
1811 .iter()
1812 .all(|query_update| matches!(query_update, CompressableQueryUpdate::Uncompressed(_)))));
1813 }
1814 Some(_) => panic!("unexpected message from subscription"),
1815 None => panic!("channel unexpectedly closed"),
1816 };
1817
1818 Ok(())
1819 }
1820
1821 #[tokio::test]
1823 async fn test_updates_for_dml() -> anyhow::Result<()> {
1824 let (tx, mut rx) = client_connection(client_id_from_u8(1));
1826
1827 let db = relational_db()?;
1828 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1829 let schema = [("x", AlgebraicType::U8), ("y", AlgebraicType::U8)];
1830 let t_id = db.create_table_for_test("t", &schema, &[])?;
1831
1832 subscribe_multi(&subs, &["select * from t"], tx, &mut 0)?;
1834
1835 assert_matches!(rx.recv().await, Some(SerializableMessage::Subscription(_)));
1837
1838 let schema = ProductType::from([AlgebraicType::U8, AlgebraicType::U8]);
1839
1840 let auth = AuthCtx::new(identity_from_u8(0), identity_from_u8(0));
1842
1843 run(
1844 &db,
1845 "INSERT INTO t (x, y) VALUES (0, 1)",
1846 auth,
1847 Some(&subs),
1848 &mut vec![],
1849 )?;
1850
1851 assert_tx_update_for_table(&mut rx, t_id, &schema, [product![0_u8, 1_u8]], []).await;
1853
1854 run(&db, "UPDATE t SET y=2 WHERE x=0", auth, Some(&subs), &mut vec![])?;
1855
1856 assert_tx_update_for_table(&mut rx, t_id, &schema, [product![0_u8, 2_u8]], [product![0_u8, 1_u8]]).await;
1858
1859 run(&db, "DELETE FROM t WHERE x=0", auth, Some(&subs), &mut vec![])?;
1860
1861 assert_tx_update_for_table(&mut rx, t_id, &schema, [], [product![0_u8, 2_u8]]).await;
1863 Ok(())
1864 }
1865
1866 #[tokio::test]
1870 async fn test_no_compression_for_update() -> anyhow::Result<()> {
1871 let (tx, mut rx) = client_connection_with_compression(client_id_from_u8(1), Compression::Brotli);
1873
1874 let db = relational_db()?;
1875 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1876
1877 let table_id = db.create_table_for_test("t", &[("x", AlgebraicType::U64)], &[])?;
1878
1879 let mut inserts = vec![];
1880
1881 for i in 0..16_000u64 {
1882 inserts.push((table_id, product![i]));
1883 }
1884
1885 subscribe_multi(&subs, &["select * from t"], tx, &mut 0)?;
1887
1888 assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))));
1890
1891 commit_tx(&db, &subs, [], inserts)?;
1894
1895 match rx.recv().await {
1897 Some(SerializableMessage::TxUpdate(TransactionUpdateMessage {
1898 database_update:
1899 SubscriptionUpdateMessage {
1900 database_update: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }),
1901 ..
1902 },
1903 ..
1904 })) => {
1905 assert!(tables.iter().all(|TableUpdate { updates, .. }| updates
1906 .iter()
1907 .all(|query_update| matches!(query_update, CompressableQueryUpdate::Uncompressed(_)))));
1908 }
1909 Some(_) => panic!("unexpected message from subscription"),
1910 None => panic!("channel unexpectedly closed"),
1911 };
1912
1913 Ok(())
1914 }
1915
1916 #[tokio::test]
1919 async fn test_update_for_join() -> anyhow::Result<()> {
1920 async fn test_subscription_updates(queries: &[&'static str]) -> anyhow::Result<()> {
1921 let (sender, mut rx) = client_connection(client_id_from_u8(1));
1923
1924 let db = relational_db()?;
1925 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1926
1927 let p_schema = [("id", AlgebraicType::U64), ("signed_in", AlgebraicType::Bool)];
1928 let l_schema = [
1929 ("id", AlgebraicType::U64),
1930 ("x", AlgebraicType::U64),
1931 ("z", AlgebraicType::U64),
1932 ];
1933
1934 let p_id = db.create_table_for_test("p", &p_schema, &[0.into()])?;
1935 let l_id = db.create_table_for_test("l", &l_schema, &[0.into()])?;
1936
1937 subscribe_multi(&subs, queries, sender, &mut 0)?;
1938
1939 assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))));
1940
1941 commit_tx(
1943 &db,
1944 &subs,
1945 [],
1946 [
1947 (p_id, product![1_u64, true]),
1948 (p_id, product![2_u64, true]),
1949 (l_id, product![1_u64, 2_u64, 2_u64]),
1950 (l_id, product![2_u64, 3_u64, 3_u64]),
1951 ],
1952 )?;
1953
1954 let schema = ProductType::from(p_schema);
1955
1956 assert_tx_update_for_table(
1958 &mut rx,
1959 p_id,
1960 &schema,
1961 [product![1_u64, true], product![2_u64, true]],
1962 [],
1963 )
1964 .await;
1965
1966 commit_tx(
1968 &db,
1969 &subs,
1970 [(p_id, product![2_u64, true])],
1971 [(p_id, product![2_u64, false])],
1972 )?;
1973
1974 assert_tx_update_for_table(
1976 &mut rx,
1977 p_id,
1978 &schema,
1979 [product![2_u64, false]],
1980 [product![2_u64, true]],
1981 )
1982 .await;
1983
1984 commit_tx(
1986 &db,
1987 &subs,
1988 [(p_id, product![2_u64, false])],
1989 [(p_id, product![2_u64, true])],
1990 )?;
1991
1992 assert_tx_update_for_table(
1994 &mut rx,
1995 p_id,
1996 &schema,
1997 [product![2_u64, true]],
1998 [product![2_u64, false]],
1999 )
2000 .await;
2001
2002 Ok(())
2003 }
2004
2005 test_subscription_updates(&[
2006 "select * from p where id = 1",
2007 "select p.* from p join l on p.id = l.id where l.x > 0 and l.x < 5 and l.z > 0 and l.z < 5",
2008 ])
2009 .await?;
2010 test_subscription_updates(&[
2011 "select * from p where id = 1",
2012 "select p.* from p join l on p.id = l.id where 0 < l.x and l.x < 5 and 0 < l.z and l.z < 5",
2013 ])
2014 .await?;
2015 test_subscription_updates(&[
2016 "select * from p where id = 1",
2017 "select p.* from p join l on p.id = l.id where l.x > 0 and l.x < 5 and l.x > 0 and l.z < 5 and l.id != 1",
2018 ])
2019 .await?;
2020 test_subscription_updates(&[
2021 "select * from p where id = 1",
2022 "select p.* from p join l on p.id = l.id where 0 < l.x and l.x < 5 and 0 < l.z and l.z < 5 and l.id != 1",
2023 ])
2024 .await?;
2025
2026 Ok(())
2027 }
2028
2029 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
2033 async fn test_query_pruning() -> anyhow::Result<()> {
2034 let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1));
2036 let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2));
2037
2038 let db = relational_db()?;
2039 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
2040
2041 let u_id = db.create_table_for_test(
2042 "u",
2043 &[
2044 ("i", AlgebraicType::U64),
2045 ("a", AlgebraicType::U64),
2046 ("b", AlgebraicType::U64),
2047 ],
2048 &[0.into()],
2049 )?;
2050 let v_id = db.create_table_for_test(
2051 "v",
2052 &[
2053 ("i", AlgebraicType::U64),
2054 ("x", AlgebraicType::U64),
2055 ("y", AlgebraicType::U64),
2056 ],
2057 &[0.into(), 1.into()],
2058 )?;
2059
2060 commit_tx(
2061 &db,
2062 &subs,
2063 [],
2064 [
2065 (u_id, product![0u64, 1u64, 1u64]),
2066 (u_id, product![1u64, 2u64, 2u64]),
2067 (u_id, product![2u64, 3u64, 3u64]),
2068 (v_id, product![0u64, 4u64, 4u64]),
2069 (v_id, product![1u64, 5u64, 5u64]),
2070 ],
2071 )?;
2072
2073 let mut query_ids = 0;
2074
2075 subscribe_multi(
2077 &subs,
2078 &[
2079 "select u.* from u join v on u.i = v.i where v.x = 4",
2080 "select u.* from u join v on u.i = v.i where v.x = 6",
2081 ],
2082 tx_for_a,
2083 &mut query_ids,
2084 )?;
2085
2086 subscribe_multi(
2088 &subs,
2089 &[
2090 "select u.* from u join v on u.i = v.i where v.x = 5",
2091 "select u.* from u join v on u.i = v.i where v.x = 7",
2092 ],
2093 tx_for_b,
2094 &mut query_ids,
2095 )?;
2096
2097 assert!(matches!(
2099 rx_for_a.recv().await,
2100 Some(SerializableMessage::Subscription(SubscriptionMessage {
2101 result: SubscriptionResult::SubscribeMulti(_),
2102 ..
2103 }))
2104 ));
2105 assert!(matches!(
2106 rx_for_b.recv().await,
2107 Some(SerializableMessage::Subscription(SubscriptionMessage {
2108 result: SubscriptionResult::SubscribeMulti(_),
2109 ..
2110 }))
2111 ));
2112
2113 let metrics = commit_tx(
2115 &db,
2116 &subs,
2117 [(v_id, product![1u64, 5u64, 5u64])],
2118 [(v_id, product![1u64, 5u64, 6u64])],
2119 )?;
2120
2121 assert_eq!(metrics.delta_queries_evaluated, 1);
2123 assert_eq!(metrics.delta_queries_matched, 0);
2124
2125 let metrics = commit_tx(&db, &subs, [], [(v_id, product![2u64, 6u64, 6u64])])?;
2127
2128 assert_tx_update_for_table(
2129 &mut rx_for_a,
2130 u_id,
2131 &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
2132 [product![2u64, 3u64, 3u64]],
2133 [],
2134 )
2135 .await;
2136
2137 assert_eq!(metrics.delta_queries_evaluated, 1);
2139 assert_eq!(metrics.delta_queries_matched, 1);
2140
2141 let metrics = commit_tx(
2143 &db,
2144 &subs,
2145 [(u_id, product![1u64, 2u64, 2u64])],
2146 [(u_id, product![1u64, 2u64, 3u64])],
2147 )?;
2148
2149 assert_tx_update_for_table(
2150 &mut rx_for_b,
2151 u_id,
2152 &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
2153 [product![1u64, 2u64, 3u64]],
2154 [product![1u64, 2u64, 2u64]],
2155 )
2156 .await;
2157
2158 assert_eq!(metrics.delta_queries_evaluated, 4);
2160 assert_eq!(metrics.delta_queries_matched, 1);
2161
2162 let metrics = commit_tx(&db, &subs, [], [(u_id, product![3u64, 0u64, 0u64])])?;
2164
2165 assert_eq!(metrics.delta_queries_evaluated, 4);
2167 assert_eq!(metrics.delta_queries_matched, 0);
2168
2169 Ok(())
2170 }
2171
2172 #[tokio::test]
2174 async fn test_join_pruning() -> anyhow::Result<()> {
2175 let (tx, mut rx) = client_connection(client_id_from_u8(1));
2176
2177 let db = relational_db()?;
2178 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
2179
2180 let u_id = db.create_table_for_test_with_the_works(
2181 "u",
2182 &[
2183 ("i", AlgebraicType::U64),
2184 ("a", AlgebraicType::U64),
2185 ("b", AlgebraicType::U64),
2186 ],
2187 &[0.into()],
2188 &[0.into()],
2189 StAccess::Public,
2190 )?;
2191 let v_id = db.create_table_for_test_with_the_works(
2192 "v",
2193 &[
2194 ("i", AlgebraicType::U64),
2195 ("x", AlgebraicType::U64),
2196 ("y", AlgebraicType::U64),
2197 ],
2198 &[0.into(), 1.into()],
2199 &[0.into()],
2200 StAccess::Public,
2201 )?;
2202
2203 let schema = ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]);
2204
2205 commit_tx(
2206 &db,
2207 &subs,
2208 [],
2209 [
2210 (v_id, product![1u64, 1u64, 1u64]),
2211 (v_id, product![2u64, 2u64, 2u64]),
2212 (v_id, product![3u64, 3u64, 3u64]),
2213 (v_id, product![4u64, 4u64, 4u64]),
2214 (v_id, product![5u64, 5u64, 5u64]),
2215 ],
2216 )?;
2217
2218 let mut query_ids = 0;
2219
2220 subscribe_multi(
2221 &subs,
2222 &[
2223 "select u.* from u join v on u.i = v.i where v.x = 1",
2224 "select u.* from u join v on u.i = v.i where v.x = 2",
2225 "select u.* from u join v on u.i = v.i where v.x = 3",
2226 "select u.* from u join v on u.i = v.i where v.x = 4",
2227 "select u.* from u join v on u.i = v.i where v.x = 5",
2228 ],
2229 tx,
2230 &mut query_ids,
2231 )?;
2232
2233 assert_matches!(
2234 rx.recv().await,
2235 Some(SerializableMessage::Subscription(SubscriptionMessage {
2236 result: SubscriptionResult::SubscribeMulti(_),
2237 ..
2238 }))
2239 );
2240
2241 let metrics = commit_tx(&db, &subs, [], [(u_id, product![1u64, 2u64, 3u64])])?;
2243
2244 assert_tx_update_for_table(&mut rx, u_id, &schema, [product![1u64, 2u64, 3u64]], []).await;
2245
2246 assert_eq!(metrics.delta_queries_evaluated, 1);
2248 assert_eq!(metrics.delta_queries_matched, 1);
2249
2250 let metrics = commit_tx(
2252 &db,
2253 &subs,
2254 [(v_id, product![1u64, 1u64, 1u64])],
2255 [(v_id, product![1u64, 1u64, 2u64])],
2256 )?;
2257
2258 assert_eq!(metrics.delta_queries_evaluated, 1);
2260 assert_eq!(metrics.delta_queries_matched, 0);
2261
2262 let metrics = commit_tx(
2264 &db,
2265 &subs,
2266 [(v_id, product![1u64, 1u64, 2u64])],
2267 [(v_id, product![1u64, 2u64, 2u64])],
2268 )?;
2269
2270 assert_tx_update_for_table(&mut rx, u_id, &schema, [], []).await;
2272
2273 assert_eq!(metrics.delta_queries_evaluated, 2);
2275 assert_eq!(metrics.delta_queries_matched, 2);
2276
2277 let metrics = commit_tx(
2280 &db,
2281 &subs,
2282 [(v_id, product![3u64, 3u64, 3u64])],
2283 [(v_id, product![3u64, 4u64, 3u64]), (u_id, product![3u64, 4u64, 5u64])],
2284 )?;
2285
2286 assert_tx_update_for_table(&mut rx, u_id, &schema, [product![3u64, 4u64, 5u64]], []).await;
2287
2288 assert_eq!(metrics.delta_queries_evaluated, 2);
2290 assert_eq!(metrics.delta_queries_matched, 2);
2291
2292 let metrics = commit_tx(
2294 &db,
2295 &subs,
2296 [(v_id, product![3u64, 4u64, 3u64])],
2297 [(v_id, product![3u64, 0u64, 3u64])],
2298 )?;
2299
2300 assert_tx_update_for_table(&mut rx, u_id, &schema, [], [product![3u64, 4u64, 5u64]]).await;
2301
2302 assert_eq!(metrics.delta_queries_evaluated, 1);
2304 assert_eq!(metrics.delta_queries_matched, 1);
2305
2306 let metrics = commit_tx(
2309 &db,
2310 &subs,
2311 [(v_id, product![5u64, 5u64, 5u64])],
2312 [(v_id, product![5u64, 6u64, 6u64]), (u_id, product![5u64, 6u64, 7u64])],
2313 )?;
2314
2315 assert_tx_update_for_table(&mut rx, u_id, &schema, [], []).await;
2317
2318 assert_eq!(metrics.delta_queries_evaluated, 1);
2320 assert_eq!(metrics.delta_queries_matched, 1);
2321
2322 Ok(())
2323 }
2324
2325 #[tokio::test]
2327 async fn test_unsubscribe() -> anyhow::Result<()> {
2328 let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1));
2330 let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2));
2331
2332 let db = relational_db()?;
2333 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
2334
2335 let u_id = db.create_table_for_test(
2336 "u",
2337 &[
2338 ("i", AlgebraicType::U64),
2339 ("a", AlgebraicType::U64),
2340 ("b", AlgebraicType::U64),
2341 ],
2342 &[0.into()],
2343 )?;
2344 let v_id = db.create_table_for_test(
2345 "v",
2346 &[
2347 ("i", AlgebraicType::U64),
2348 ("x", AlgebraicType::U64),
2349 ("y", AlgebraicType::U64),
2350 ],
2351 &[0.into(), 1.into()],
2352 )?;
2353
2354 commit_tx(&db, &subs, [], [(v_id, product![1u64, 1u64, 1u64])])?;
2355
2356 let mut query_ids = 0;
2357
2358 subscribe_multi(
2359 &subs,
2360 &["select u.* from u join v on u.i = v.i where v.x = 1"],
2361 tx_for_a,
2362 &mut query_ids,
2363 )?;
2364 subscribe_multi(
2365 &subs,
2366 &["select u.* from u join v on u.i = v.i where v.x = 1"],
2367 tx_for_b.clone(),
2368 &mut query_ids,
2369 )?;
2370
2371 assert_matches!(
2373 rx_for_a.recv().await,
2374 Some(SerializableMessage::Subscription(SubscriptionMessage {
2375 result: SubscriptionResult::SubscribeMulti(_),
2376 ..
2377 }))
2378 );
2379 assert_matches!(
2380 rx_for_b.recv().await,
2381 Some(SerializableMessage::Subscription(SubscriptionMessage {
2382 result: SubscriptionResult::SubscribeMulti(_),
2383 ..
2384 }))
2385 );
2386
2387 unsubscribe_multi(&subs, tx_for_b, query_ids)?;
2388
2389 assert_matches!(
2390 rx_for_b.recv().await,
2391 Some(SerializableMessage::Subscription(SubscriptionMessage {
2392 result: SubscriptionResult::UnsubscribeMulti(_),
2393 ..
2394 }))
2395 );
2396
2397 let metrics = commit_tx(&db, &subs, [], [(u_id, product![1u64, 0u64, 0u64])])?;
2399
2400 assert_tx_update_for_table(
2401 &mut rx_for_a,
2402 u_id,
2403 &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
2404 [product![1u64, 0u64, 0u64]],
2405 [],
2406 )
2407 .await;
2408
2409 assert_eq!(metrics.delta_queries_evaluated, 1);
2411 assert_eq!(metrics.delta_queries_matched, 1);
2412
2413 let metrics = commit_tx(
2415 &db,
2416 &subs,
2417 [(v_id, product![1u64, 1u64, 1u64])],
2418 [(v_id, product![1u64, 2u64, 2u64])],
2419 )?;
2420
2421 assert_eq!(metrics.delta_queries_evaluated, 1);
2423 assert_eq!(metrics.delta_queries_matched, 1);
2424
2425 Ok(())
2426 }
2427
2428 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
2432 async fn test_query_pruning_for_empty_tables() -> anyhow::Result<()> {
2433 let (tx, mut rx) = client_connection(client_id_from_u8(1));
2435
2436 let db = relational_db()?;
2437 let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
2438
2439 let schema = &[("id", AlgebraicType::U64), ("a", AlgebraicType::U64)];
2440 let indices = &[0.into()];
2441 db.create_table_for_test("t", schema, indices)?;
2443 let s_id = db.create_table_for_test("s", schema, indices)?;
2444
2445 commit_tx(&db, &subs, [], [(s_id, product![0u64, 0u64])])?;
2447
2448 let metrics = subscribe_multi(
2450 &subs,
2451 &[
2452 "select t.* from t where a = 0",
2453 "select t.* from t join s on t.id = s.id where s.a = 0",
2454 "select s.* from t join s on t.id = s.id where t.a = 0",
2455 ],
2456 tx,
2457 &mut 0,
2458 )?;
2459
2460 assert_matches!(
2461 rx.recv().await,
2462 Some(SerializableMessage::Subscription(SubscriptionMessage {
2463 result: SubscriptionResult::SubscribeMulti(_),
2464 ..
2465 }))
2466 );
2467
2468 assert_eq!(metrics.rows_scanned, 0);
2469 assert_eq!(metrics.index_seeks, 0);
2470
2471 Ok(())
2472 }
2473
2474 #[test]
2476 fn test_tx_subscription_ordering() -> ResultTest<()> {
2477 let test_db = TestDB::durable()?;
2478
2479 let runtime = test_db.runtime().cloned().unwrap();
2480 let db = Arc::new(test_db.db.clone());
2481
2482 let table_id = db.create_table_for_test("T", &[("a", AlgebraicType::U8)], &[])?;
2484 with_auto_commit(&db, |tx| insert(&db, tx, table_id, &product!(1_u8)).map(drop))?;
2485
2486 let (send, mut recv) = mpsc::unbounded_channel();
2487
2488 let db2 = db.clone();
2490 let query_handle = runtime.spawn_blocking(move || {
2491 add_subscriber(
2492 db.clone(),
2493 "select * from T",
2494 Some(Arc::new(move |tx: &_| {
2495 let _ = send.send(());
2497 std::thread::sleep(Duration::from_secs(1));
2499 assert_eq!(1, db.iter(tx, table_id).unwrap().count());
2503 })),
2504 )
2505 });
2506
2507 let write_handle = runtime.spawn(async move {
2509 let _ = recv.recv().await;
2510 with_auto_commit(&db2, |tx| insert(&db2, tx, table_id, &product!(2_u8)).map(drop))
2511 });
2512
2513 runtime.block_on(write_handle)??;
2514 runtime.block_on(query_handle)??;
2515
2516 test_db.close()?;
2517
2518 Ok(())
2519 }
2520
2521 #[test]
2522 fn subs_cannot_access_private_tables() -> ResultTest<()> {
2523 let test_db = TestDB::durable()?;
2524 let db = Arc::new(test_db.db.clone());
2525
2526 let indexes = &[0.into()];
2528 let cols = &[("a", AlgebraicType::U8)];
2529 let _ = db.create_table_for_test("public", cols, indexes)?;
2530
2531 let _ = db.create_table_for_test_with_access("private", cols, indexes, StAccess::Private)?;
2533
2534 let subscribe = |sql| add_subscriber(db.clone(), sql, None);
2536 assert!(subscribe("SELECT * FROM public").is_ok());
2537
2538 for sql in [
2542 "SELECT * FROM private",
2543 "SELECT * FROM private WHERE false",
2545 "SELECT private.* FROM private",
2546 "SELECT public.* FROM public JOIN private ON public.a = private.a WHERE private.a = 1",
2547 "SELECT private.* FROM private JOIN public ON private.a = public.a WHERE public.a = 1",
2548 ] {
2549 assert!(subscribe(sql).is_err(),);
2550 }
2551
2552 Ok(())
2553 }
2554}