spacetimedb/subscription/
module_subscription_actor.rs

1use super::execution_unit::QueryHash;
2use super::module_subscription_manager::{
3    spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager,
4};
5use super::query::compile_query_with_hashes;
6use super::tx::DeltaTx;
7use super::{collect_table_update, TableUpdateType};
8use crate::client::messages::{
9    SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult,
10    SubscriptionRows, SubscriptionUpdateMessage, TransactionUpdateMessage,
11};
12use crate::client::{ClientActorId, ClientConnectionSender, Protocol};
13use crate::db::datastore::locking_tx_datastore::tx::TxId;
14use crate::db::db_metrics::DB_METRICS;
15use crate::db::relational_db::{MutTx, RelationalDB, Tx};
16use crate::error::DBError;
17use crate::estimation::estimate_rows_scanned;
18use crate::execution_context::{Workload, WorkloadType};
19use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent};
20use crate::messages::websocket::Subscribe;
21use crate::subscription::execute_plans;
22use crate::subscription::query::is_subscribe_to_all_tables;
23use crate::util::prometheus_handle::IntGaugeExt;
24use crate::vm::check_row_limit;
25use crate::worker_metrics::WORKER_METRICS;
26use parking_lot::RwLock;
27use prometheus::{Histogram, HistogramTimer, IntCounter, IntGauge};
28use spacetimedb_client_api_messages::websocket::{
29    self as ws, BsatnFormat, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, TableUpdate, Unsubscribe,
30    UnsubscribeMulti,
31};
32use spacetimedb_execution::pipelined::PipelinedProject;
33use spacetimedb_lib::identity::AuthCtx;
34use spacetimedb_lib::metrics::ExecutionMetrics;
35use spacetimedb_lib::Identity;
36use std::{sync::Arc, time::Instant};
37
38type Subscriptions = Arc<RwLock<SubscriptionManager>>;
39
40#[derive(Debug, Clone)]
41pub struct ModuleSubscriptions {
42    relational_db: Arc<RelationalDB>,
43    /// If taking a lock (tx) on the db at the same time, ALWAYS lock the db first.
44    /// You will deadlock otherwise.
45    subscriptions: Subscriptions,
46    broadcast_queue: BroadcastQueue,
47    owner_identity: Identity,
48    stats: Arc<SubscriptionGauges>,
49}
50
51#[derive(Debug, Clone)]
52pub struct SubscriptionGauges {
53    db_identity: Identity,
54    num_queries: IntGauge,
55    num_connections: IntGauge,
56    num_subscription_sets: IntGauge,
57    num_query_subscriptions: IntGauge,
58    num_legacy_subscriptions: IntGauge,
59}
60
61impl SubscriptionGauges {
62    fn new(db_identity: &Identity) -> Self {
63        let num_queries = WORKER_METRICS.subscription_queries.with_label_values(db_identity);
64        let num_connections = DB_METRICS.subscription_connections.with_label_values(db_identity);
65        let num_subscription_sets = DB_METRICS.subscription_sets.with_label_values(db_identity);
66        let num_query_subscriptions = DB_METRICS.total_query_subscriptions.with_label_values(db_identity);
67        let num_legacy_subscriptions = DB_METRICS.num_legacy_subscriptions.with_label_values(db_identity);
68        Self {
69            db_identity: *db_identity,
70            num_queries,
71            num_connections,
72            num_subscription_sets,
73            num_query_subscriptions,
74            num_legacy_subscriptions,
75        }
76    }
77
78    // Clear the subscription gauges for this database.
79    fn unregister(&self) {
80        let _ = WORKER_METRICS
81            .subscription_queries
82            .remove_label_values(&self.db_identity);
83        let _ = DB_METRICS
84            .subscription_connections
85            .remove_label_values(&self.db_identity);
86        let _ = DB_METRICS.subscription_sets.remove_label_values(&self.db_identity);
87        let _ = DB_METRICS
88            .total_query_subscriptions
89            .remove_label_values(&self.db_identity);
90        let _ = DB_METRICS
91            .num_legacy_subscriptions
92            .remove_label_values(&self.db_identity);
93    }
94
95    fn report(&self, stats: &SubscriptionGaugeStats) {
96        self.num_queries.set(stats.num_queries as i64);
97        self.num_connections.set(stats.num_connections as i64);
98        self.num_subscription_sets.set(stats.num_subscription_sets as i64);
99        self.num_query_subscriptions.set(stats.num_query_subscriptions as i64);
100        self.num_legacy_subscriptions.set(stats.num_legacy_subscriptions as i64);
101    }
102}
103
104pub struct SubscriptionMetrics {
105    pub lock_waiters: IntGauge,
106    pub lock_wait_time: Histogram,
107    pub compilation_time: Histogram,
108    pub num_queries_subscribed: IntCounter,
109    pub num_new_queries_subscribed: IntCounter,
110    pub num_queries_evaluated: IntCounter,
111}
112
113impl SubscriptionMetrics {
114    pub fn new(db: &Identity, workload: &WorkloadType) -> Self {
115        Self {
116            lock_waiters: DB_METRICS.subscription_lock_waiters.with_label_values(db, workload),
117            lock_wait_time: DB_METRICS.subscription_lock_wait_time.with_label_values(db, workload),
118            compilation_time: DB_METRICS.subscription_compile_time.with_label_values(db, workload),
119            num_queries_subscribed: DB_METRICS.num_queries_subscribed.with_label_values(db),
120            num_new_queries_subscribed: DB_METRICS.num_new_queries_subscribed.with_label_values(db),
121            num_queries_evaluated: DB_METRICS.num_queries_evaluated.with_label_values(db, workload),
122        }
123    }
124}
125
126type AssertTxFn = Arc<dyn Fn(&Tx)>;
127type SubscriptionUpdate = FormatSwitch<TableUpdate<BsatnFormat>, TableUpdate<JsonFormat>>;
128type FullSubscriptionUpdate = FormatSwitch<ws::DatabaseUpdate<BsatnFormat>, ws::DatabaseUpdate<JsonFormat>>;
129
130/// A utility for sending an error message to a client and returning early
131macro_rules! return_on_err {
132    ($expr:expr, $handler:expr, $metrics:expr) => {
133        match $expr {
134            Ok(val) => val,
135            Err(e) => {
136                // TODO: Handle errors sending messages.
137                let _ = $handler(e.to_string().into());
138                return Ok($metrics);
139            }
140        }
141    };
142}
143
144/// A utility for sending an error message to a client and returning early
145macro_rules! return_on_err_with_sql {
146    ($expr:expr, $sql:expr, $handler:expr) => {
147        match $expr.map_err(|err| DBError::WithSql {
148            sql: $sql.into(),
149            error: Box::new(DBError::Other(err.into())),
150        }) {
151            Ok(val) => val,
152            Err(e) => {
153                // TODO: Handle errors sending messages.
154                let _ = $handler(e.to_string().into());
155                return Ok(None);
156            }
157        }
158    };
159}
160
161impl ModuleSubscriptions {
162    pub fn new(
163        relational_db: Arc<RelationalDB>,
164        subscriptions: Subscriptions,
165        broadcast_queue: BroadcastQueue,
166        owner_identity: Identity,
167    ) -> Self {
168        let db = &relational_db.database_identity();
169        let stats = Arc::new(SubscriptionGauges::new(db));
170
171        Self {
172            relational_db,
173            subscriptions,
174            broadcast_queue,
175            owner_identity,
176            stats,
177        }
178    }
179
180    /// Construct a new [`ModuleSubscriptions`] for use in testing,
181    /// creating a new [`tokio::runtime::Runtime`] to run its send worker.
182    pub fn for_test_new_runtime(db: Arc<RelationalDB>) -> (ModuleSubscriptions, tokio::runtime::Runtime) {
183        let runtime = tokio::runtime::Runtime::new().unwrap();
184        let _rt = runtime.enter();
185        (Self::for_test_enclosing_runtime(db), runtime)
186    }
187
188    /// Construct a new [`ModuleSubscriptions`] for use in testing,
189    /// running its send worker on the dynamically enclosing [`tokio::runtime::Runtime`]
190    pub fn for_test_enclosing_runtime(db: Arc<RelationalDB>) -> ModuleSubscriptions {
191        let send_worker_queue = spawn_send_worker(None);
192        ModuleSubscriptions::new(
193            db,
194            SubscriptionManager::for_test_without_metrics_arc_rwlock(),
195            send_worker_queue,
196            Identity::ZERO,
197        )
198    }
199
200    // Recompute gauges to update metrics.
201    pub fn update_gauges(&self) {
202        let num_queries = self.subscriptions.read().calculate_gauge_stats();
203        self.stats.report(&num_queries);
204    }
205
206    // Remove the subscription gauges for this database.
207    // TODO: This should be called when the database is shut down.
208    pub fn remove_gauges(&self) {
209        self.stats.unregister();
210    }
211
212    /// Run auth and row limit checks for a new subscriber, then compute the initial query results.
213    fn evaluate_initial_subscription(
214        &self,
215        sender: Arc<ClientConnectionSender>,
216        query: Arc<Plan>,
217        tx: &TxId,
218        auth: &AuthCtx,
219        update_type: TableUpdateType,
220    ) -> Result<(SubscriptionUpdate, ExecutionMetrics), DBError> {
221        check_row_limit(
222            &[&query],
223            &self.relational_db,
224            tx,
225            |plan, tx| {
226                plan.plans_fragments()
227                    .map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
228                    .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
229            },
230            auth,
231        )?;
232
233        let table_id = query.subscribed_table_id();
234        let table_name = query.subscribed_table_name();
235
236        let plans = query
237            .plans_fragments()
238            .map(|fragment| fragment.optimized_physical_plan())
239            .cloned()
240            .map(|plan| plan.optimize())
241            .collect::<Result<Vec<_>, _>>()?
242            .into_iter()
243            .map(PipelinedProject::from)
244            .collect::<Vec<_>>();
245
246        let tx = DeltaTx::from(tx);
247
248        Ok(match sender.config.protocol {
249            Protocol::Binary => collect_table_update(&plans, table_id, table_name.into(), &tx, update_type)
250                .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics)),
251            Protocol::Text => collect_table_update(&plans, table_id, table_name.into(), &tx, update_type)
252                .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics)),
253        }?)
254    }
255
256    fn evaluate_queries(
257        &self,
258        sender: Arc<ClientConnectionSender>,
259        queries: &[Arc<Plan>],
260        tx: &TxId,
261        auth: &AuthCtx,
262        update_type: TableUpdateType,
263    ) -> Result<(FullSubscriptionUpdate, ExecutionMetrics), DBError> {
264        check_row_limit(
265            queries,
266            &self.relational_db,
267            tx,
268            |plan, tx| {
269                plan.plans_fragments()
270                    .map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
271                    .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
272            },
273            auth,
274        )?;
275
276        let tx = DeltaTx::from(tx);
277        match sender.config.protocol {
278            Protocol::Binary => {
279                let (update, metrics) = execute_plans(queries, &tx, update_type)?;
280                Ok((FormatSwitch::Bsatn(update), metrics))
281            }
282            Protocol::Text => {
283                let (update, metrics) = execute_plans(queries, &tx, update_type)?;
284                Ok((FormatSwitch::Json(update), metrics))
285            }
286        }
287    }
288
289    /// Add a subscription to a single query.
290    #[tracing::instrument(level = "trace", skip_all)]
291    pub fn add_single_subscription(
292        &self,
293        sender: Arc<ClientConnectionSender>,
294        request: SubscribeSingle,
295        timer: Instant,
296        _assert: Option<AssertTxFn>,
297    ) -> Result<Option<ExecutionMetrics>, DBError> {
298        // Send an error message to the client
299        let send_err_msg = |message| {
300            self.broadcast_queue.send_client_message(
301                sender.clone(),
302                SubscriptionMessage {
303                    request_id: Some(request.request_id),
304                    query_id: Some(request.query_id),
305                    timer: Some(timer),
306                    result: SubscriptionResult::Error(SubscriptionError {
307                        table_id: None,
308                        message,
309                    }),
310                },
311            )
312        };
313
314        let sql = request.query;
315        let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
316        let hash = QueryHash::from_string(&sql, auth.caller, false);
317        let hash_with_param = QueryHash::from_string(&sql, auth.caller, true);
318
319        let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| {
320            let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
321            self.relational_db.report(&reducer, &tx_metrics, None);
322        });
323
324        let existing_query = {
325            let guard = self.subscriptions.read();
326            guard.query(&hash)
327        };
328
329        let query = return_on_err_with_sql!(
330            existing_query.map(Ok).unwrap_or_else(|| compile_query_with_hashes(
331                &auth,
332                &tx,
333                &sql,
334                hash,
335                hash_with_param
336            )
337            .map(Arc::new)),
338            sql,
339            send_err_msg
340        );
341
342        let (table_rows, metrics) = return_on_err_with_sql!(
343            self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Subscribe),
344            query.sql(),
345            send_err_msg
346        );
347
348        // It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
349        // This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
350        // but that should not pose an issue.
351        let mut subscriptions = self.subscriptions.write();
352        subscriptions.add_subscription(sender.clone(), query.clone(), request.query_id)?;
353
354        #[cfg(test)]
355        if let Some(assert) = _assert {
356            assert(&tx);
357        }
358
359        // Note: to make sure transaction updates are consistent, we need to put this in the broadcast
360        // queue while we are still holding a read-lock on the database.
361
362        // That will avoid race conditions because reducers first grab a write lock on the db, then
363        // grab a read lock on the subscriptions.
364
365        // Holding a write lock on `self.subscriptions` would also be sufficient.
366        let _ = self.broadcast_queue.send_client_message(
367            sender.clone(),
368            SubscriptionMessage {
369                request_id: Some(request.request_id),
370                query_id: Some(request.query_id),
371                timer: Some(timer),
372                result: SubscriptionResult::Subscribe(SubscriptionRows {
373                    table_id: query.subscribed_table_id(),
374                    table_name: query.subscribed_table_name().into(),
375                    table_rows,
376                }),
377            },
378        );
379        Ok(Some(metrics))
380    }
381
382    /// Remove a subscription for a single query.
383    pub fn remove_single_subscription(
384        &self,
385        sender: Arc<ClientConnectionSender>,
386        request: Unsubscribe,
387        timer: Instant,
388    ) -> Result<Option<ExecutionMetrics>, DBError> {
389        // Send an error message to the client
390        let send_err_msg = |message| {
391            self.broadcast_queue.send_client_message(
392                sender.clone(),
393                SubscriptionMessage {
394                    request_id: Some(request.request_id),
395                    query_id: Some(request.query_id),
396                    timer: Some(timer),
397                    result: SubscriptionResult::Error(SubscriptionError {
398                        table_id: None,
399                        message,
400                    }),
401                },
402            )
403        };
404
405        let mut subscriptions = self.subscriptions.write();
406
407        let queries = return_on_err!(
408            subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id),
409            // Apparently we ignore errors sending messages.
410            send_err_msg,
411            None
412        );
413        // This is technically a bug, since this could be empty if the client has another duplicate subscription.
414        // This whole function should be removed soon, so I don't think we need to fix it.
415        let [query] = &*queries else {
416            // Apparently we ignore errors sending messages.
417            let _ = send_err_msg("Internal error".into());
418            return Ok(None);
419        };
420
421        let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| {
422            let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
423            self.relational_db.report(&reducer, &tx_metrics, None);
424        });
425        let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
426        let (table_rows, metrics) = return_on_err_with_sql!(
427            self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Unsubscribe),
428            query.sql(),
429            send_err_msg
430        );
431
432        // Note: to make sure transaction updates are consistent, we need to put this in the broadcast
433        // queue while we are still holding a read-lock on the database.
434
435        // That will avoid race conditions because reducers first grab a write lock on the db, then
436        // grab a read lock on the subscriptions.
437
438        // Holding a write lock on `self.subscriptions` would also be sufficient.
439        let _ = self.broadcast_queue.send_client_message(
440            sender.clone(),
441            SubscriptionMessage {
442                request_id: Some(request.request_id),
443                query_id: Some(request.query_id),
444                timer: Some(timer),
445                result: SubscriptionResult::Unsubscribe(SubscriptionRows {
446                    table_id: query.subscribed_table_id(),
447                    table_name: query.subscribed_table_name().into(),
448                    table_rows,
449                }),
450            },
451        );
452        Ok(Some(metrics))
453    }
454
455    /// Remove a client's subscription for a set of queries.
456    #[tracing::instrument(level = "trace", skip_all)]
457    pub fn remove_multi_subscription(
458        &self,
459        sender: Arc<ClientConnectionSender>,
460        request: UnsubscribeMulti,
461        timer: Instant,
462    ) -> Result<Option<ExecutionMetrics>, DBError> {
463        // Send an error message to the client
464        let send_err_msg = |message| {
465            self.broadcast_queue.send_client_message(
466                sender.clone(),
467                SubscriptionMessage {
468                    request_id: Some(request.request_id),
469                    query_id: Some(request.query_id),
470                    timer: Some(timer),
471                    result: SubscriptionResult::Error(SubscriptionError {
472                        table_id: None,
473                        message,
474                    }),
475                },
476            )
477        };
478
479        let subscription_metrics = SubscriptionMetrics::new(&self.owner_identity, &WorkloadType::Unsubscribe);
480
481        // Always lock the db before the subscription lock to avoid deadlocks.
482        let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| {
483            let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
484            self.relational_db.report(&reducer, &tx_metrics, None);
485        });
486
487        let removed_queries = {
488            let mut subscriptions = {
489                // How contended is the lock?
490                let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
491                let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
492                self.subscriptions.write()
493            };
494
495            return_on_err!(
496                subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id),
497                send_err_msg,
498                None
499            )
500        };
501
502        let (update, metrics) = return_on_err!(
503            self.evaluate_queries(
504                sender.clone(),
505                &removed_queries,
506                &tx,
507                &AuthCtx::new(self.owner_identity, sender.id.identity),
508                TableUpdateType::Unsubscribe,
509            ),
510            send_err_msg,
511            None
512        );
513
514        // How many queries did we evaluate?
515        subscription_metrics
516            .num_queries_evaluated
517            .inc_by(removed_queries.len() as _);
518
519        // Note: to make sure transaction updates are consistent, we need to put this in the broadcast
520        // queue while we are still holding a read-lock on the database.
521
522        // That will avoid race conditions because reducers first grab a write lock on the db, then
523        // grab a read lock on the subscriptions.
524
525        // Holding a write lock on `self.subscriptions` would also be sufficient.
526        let _ = self.broadcast_queue.send_client_message(
527            sender,
528            SubscriptionMessage {
529                request_id: Some(request.request_id),
530                query_id: Some(request.query_id),
531                timer: Some(timer),
532                result: SubscriptionResult::UnsubscribeMulti(SubscriptionData { data: update }),
533            },
534        );
535
536        Ok(Some(metrics))
537    }
538
539    /// Compiles the queries in a [Subscribe] or [SubscribeMulti] message.
540    ///
541    /// Note, we hash queries to avoid recompilation,
542    /// but we need to know if a query is parameterized in order to hash it correctly.
543    /// This requires that we type check which in turn requires that we start a tx.
544    ///
545    /// Unfortunately parsing with sqlparser is quite expensive,
546    /// so we'd like to avoid that cost while holding the tx lock,
547    /// especially since all we're trying to do is generate a hash.
548    ///
549    /// Instead we generate two hashes and outside of the tx lock.
550    /// If either one is currently tracked, we can avoid recompilation.
551    #[allow(clippy::type_complexity)]
552    fn compile_queries(
553        &self,
554        sender: Identity,
555        queries: impl IntoIterator<Item = Box<str>>,
556        num_queries: usize,
557        metrics: &SubscriptionMetrics,
558    ) -> Result<(Vec<Arc<Plan>>, AuthCtx, TxId, HistogramTimer), DBError> {
559        let mut subscribe_to_all_tables = false;
560        let mut plans = Vec::with_capacity(num_queries);
561        let mut query_hashes = Vec::with_capacity(num_queries);
562
563        for sql in queries {
564            if is_subscribe_to_all_tables(&sql) {
565                subscribe_to_all_tables = true;
566                continue;
567            }
568            let hash = QueryHash::from_string(&sql, sender, false);
569            let hash_with_param = QueryHash::from_string(&sql, sender, true);
570            query_hashes.push((sql, hash, hash_with_param));
571        }
572
573        let auth = AuthCtx::new(self.owner_identity, sender);
574
575        // We always get the db lock before the subscription lock to avoid deadlocks.
576        let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| {
577            let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
578            self.relational_db.report(&reducer, &tx_metrics, None);
579        });
580
581        let compile_timer = metrics.compilation_time.start_timer();
582
583        let guard = {
584            // How contended is the lock?
585            let _wait_guard = metrics.lock_waiters.inc_scope();
586            let _wait_timer = metrics.lock_wait_time.start_timer();
587            self.subscriptions.read()
588        };
589
590        if subscribe_to_all_tables {
591            plans.extend(
592                super::subscription::get_all(&self.relational_db, &tx, &auth)?
593                    .into_iter()
594                    .map(Arc::new),
595            );
596        }
597
598        let mut new_queries = 0;
599
600        for (sql, hash, hash_with_param) in query_hashes {
601            if let Some(unit) = guard.query(&hash) {
602                plans.push(unit);
603            } else if let Some(unit) = guard.query(&hash_with_param) {
604                plans.push(unit);
605            } else {
606                plans.push(Arc::new(
607                    compile_query_with_hashes(&auth, &tx, &sql, hash, hash_with_param).map_err(|err| {
608                        DBError::WithSql {
609                            error: Box::new(DBError::Other(err.into())),
610                            sql,
611                        }
612                    })?,
613                ));
614                new_queries += 1;
615            }
616        }
617
618        // How many queries in this subscription are not cached?
619        metrics.num_new_queries_subscribed.inc_by(new_queries);
620
621        Ok((plans, auth, scopeguard::ScopeGuard::into_inner(tx), compile_timer))
622    }
623
624    /// Send a message to a client connection.
625    /// This will eventually be sent by the send-worker.
626    /// This takes a `TxId`, because this should be called while still holding a lock on the database.
627    pub fn send_client_message(
628        &self,
629        recipient: Arc<ClientConnectionSender>,
630        message: impl Into<SerializableMessage>,
631        _tx_id: &TxId,
632    ) -> Result<(), BroadcastError> {
633        self.broadcast_queue.send_client_message(recipient, message)
634    }
635
636    #[tracing::instrument(level = "trace", skip_all)]
637    pub fn add_multi_subscription(
638        &self,
639        sender: Arc<ClientConnectionSender>,
640        request: SubscribeMulti,
641        timer: Instant,
642        _assert: Option<AssertTxFn>,
643    ) -> Result<Option<ExecutionMetrics>, DBError> {
644        // Send an error message to the client
645        let send_err_msg = |message| {
646            let _ = self.broadcast_queue.send_client_message(
647                sender.clone(),
648                SubscriptionMessage {
649                    request_id: Some(request.request_id),
650                    query_id: Some(request.query_id),
651                    timer: Some(timer),
652                    result: SubscriptionResult::Error(SubscriptionError {
653                        table_id: None,
654                        message,
655                    }),
656                },
657            );
658        };
659
660        let num_queries = request.query_strings.len();
661
662        let subscription_metrics = SubscriptionMetrics::new(&self.owner_identity, &WorkloadType::Subscribe);
663
664        // How many queries make up this subscription?
665        subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);
666
667        let (queries, auth, tx, compile_timer) = return_on_err!(
668            self.compile_queries(
669                sender.id.identity,
670                request.query_strings,
671                num_queries,
672                &subscription_metrics
673            ),
674            send_err_msg,
675            None
676        );
677        let tx = scopeguard::guard(tx, |tx| {
678            let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
679            self.relational_db.report(&reducer, &tx_metrics, None);
680        });
681
682        // We minimize locking so that other clients can add subscriptions concurrently.
683        // We are protected from race conditions with broadcasts, because we have the db lock,
684        // an `commit_and_broadcast_event` grabs a read lock on `subscriptions` while it still has a
685        // write lock on the db.
686        let queries = {
687            let mut subscriptions = {
688                // How contended is the lock?
689                let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
690                let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
691                self.subscriptions.write()
692            };
693
694            subscriptions.add_subscription_multi(sender.clone(), queries, request.query_id)?
695        };
696
697        // Record how long it took to compile the subscription
698        drop(compile_timer);
699
700        let Ok((update, metrics)) =
701            self.evaluate_queries(sender.clone(), &queries, &tx, &auth, TableUpdateType::Subscribe)
702        else {
703            // If we fail the query, we need to remove the subscription.
704            let mut subscriptions = {
705                // How contended is the lock?
706                let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
707                let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
708                self.subscriptions.write()
709            };
710            {
711                let _compile_timer = subscription_metrics.compilation_time.start_timer();
712                subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id)?;
713            }
714
715            send_err_msg("Internal error evaluating queries".into());
716            return Ok(None);
717        };
718
719        // How many queries did we actually evaluate?
720        subscription_metrics.num_queries_evaluated.inc_by(queries.len() as _);
721
722        #[cfg(test)]
723        if let Some(assert) = _assert {
724            assert(&tx);
725        }
726
727        // Note: to make sure transaction updates are consistent, we need to put this in the broadcast
728        // queue while we are still holding a read-lock on the database.
729
730        // That will avoid race conditions because reducers first grab a write lock on the db, then
731        // grab a read lock on the subscriptions.
732
733        // Holding a write lock on `self.subscriptions` would also be sufficient.
734
735        let _ = self.broadcast_queue.send_client_message(
736            sender.clone(),
737            SubscriptionMessage {
738                request_id: Some(request.request_id),
739                query_id: Some(request.query_id),
740                timer: Some(timer),
741                result: SubscriptionResult::SubscribeMulti(SubscriptionData { data: update }),
742            },
743        );
744
745        Ok(Some(metrics))
746    }
747
748    /// Add a subscriber to the module. NOTE: this function is blocking.
749    /// This is used for the legacy subscription API which uses a set of queries.
750    #[tracing::instrument(level = "trace", skip_all)]
751    pub fn add_legacy_subscriber(
752        &self,
753        sender: Arc<ClientConnectionSender>,
754        subscription: Subscribe,
755        timer: Instant,
756        _assert: Option<AssertTxFn>,
757    ) -> Result<ExecutionMetrics, DBError> {
758        let num_queries = subscription.query_strings.len();
759        let subscription_metrics = SubscriptionMetrics::new(&self.owner_identity, &WorkloadType::Subscribe);
760
761        // How many queries make up this subscription?
762        subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);
763
764        let (queries, auth, tx, compile_timer) = self.compile_queries(
765            sender.id.identity,
766            subscription.query_strings,
767            num_queries,
768            &subscription_metrics,
769        )?;
770        let tx = scopeguard::guard(tx, |tx| {
771            let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
772            self.relational_db.report(&reducer, &tx_metrics, None);
773        });
774
775        check_row_limit(
776            &queries,
777            &self.relational_db,
778            &tx,
779            |plan, tx| {
780                plan.plans_fragments()
781                    .map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
782                    .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
783            },
784            &auth,
785        )?;
786
787        // Record how long it took to compile the subscription
788        drop(compile_timer);
789
790        let tx = DeltaTx::from(&*tx);
791        let (database_update, metrics) = match sender.config.protocol {
792            Protocol::Binary => execute_plans(&queries, &tx, TableUpdateType::Subscribe)
793                .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
794            Protocol::Text => execute_plans(&queries, &tx, TableUpdateType::Subscribe)
795                .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
796        };
797
798        // It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
799        // This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
800        // but that should not pose an issue.
801        {
802            let _compile_timer = subscription_metrics.compilation_time.start_timer();
803
804            let mut subscriptions = {
805                // How contended is the lock?
806                let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
807                let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
808                self.subscriptions.write()
809            };
810
811            subscriptions.set_legacy_subscription(sender.clone(), queries.into_iter());
812        }
813
814        #[cfg(test)]
815        if let Some(assert) = _assert {
816            assert(&tx);
817        }
818
819        // Note: to make sure transaction updates are consistent, we need to put this in the broadcast
820        // queue while we are still holding a read-lock on the database.
821
822        // That will avoid race conditions because reducers first grab a write lock on the db, then
823        // grab a read lock on the subscriptions.
824
825        // Holding a write lock on `self.subscriptions` would also be sufficient.
826        let _ = self.broadcast_queue.send_client_message(
827            sender,
828            SubscriptionUpdateMessage {
829                database_update,
830                request_id: Some(subscription.request_id),
831                timer: Some(timer),
832            },
833        );
834
835        Ok(metrics)
836    }
837
838    pub fn remove_subscriber(&self, client_id: ClientActorId) {
839        let mut subscriptions = self.subscriptions.write();
840        subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id));
841    }
842
843    /// Commit a transaction and broadcast its ModuleEvent to all interested subscribers.
844    ///
845    /// The returned [`ExecutionMetrics`] are reported in this method via `report_tx_metrics`.
846    /// They are returned for testing purposes but should not be reported separately.
847    pub fn commit_and_broadcast_event(
848        &self,
849        caller: Option<Arc<ClientConnectionSender>>,
850        mut event: ModuleEvent,
851        tx: MutTx,
852    ) -> Result<Result<(Arc<ModuleEvent>, ExecutionMetrics), WriteConflict>, DBError> {
853        let subscription_metrics = SubscriptionMetrics::new(&self.owner_identity, &WorkloadType::Update);
854
855        // Take a read lock on `subscriptions` before committing tx
856        // else it can result in subscriber receiving duplicate updates.
857        let subscriptions = {
858            // How contended is the lock?
859            let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
860            let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
861            self.subscriptions.read()
862        };
863
864        let stdb = &self.relational_db;
865        // Downgrade mutable tx.
866        // We'll later ensure tx is released/cleaned up once out of scope.
867        let (read_tx, tx_data, tx_metrics_mut) = match &mut event.status {
868            EventStatus::Committed(db_update) => {
869                let Some((tx_data, tx_metrics, read_tx)) = stdb.commit_tx_downgrade(tx, Workload::Update)? else {
870                    return Ok(Err(WriteConflict));
871                };
872                *db_update = DatabaseUpdate::from_writes(&tx_data);
873                (read_tx, Some(tx_data), tx_metrics)
874            }
875            EventStatus::Failed(_) | EventStatus::OutOfEnergy => {
876                let (tx_metrics, tx) = stdb.rollback_mut_tx_downgrade(tx, Workload::Update);
877                (tx, None, tx_metrics)
878            }
879        };
880
881        // When we're done with this method, release the tx and report metrics.
882        let mut read_tx = scopeguard::guard(read_tx, |tx| {
883            let (tx_metrics_read, reducer) = self.relational_db.release_tx(tx);
884            stdb.report_tx_metricses(&reducer, tx_data.as_ref(), Some(&tx_metrics_mut), &tx_metrics_read);
885        });
886        // Create the delta transaction we'll use to eval updates against.
887        let delta_read_tx = tx_data
888            .as_ref()
889            .map(|tx_data| DeltaTx::new(&read_tx, tx_data, subscriptions.index_ids_for_subscriptions()))
890            .unwrap_or_else(|| DeltaTx::from(&*read_tx));
891
892        let event = Arc::new(event);
893        let mut update_metrics: ExecutionMetrics = ExecutionMetrics::default();
894
895        match &event.status {
896            EventStatus::Committed(_) => {
897                update_metrics = subscriptions.eval_updates_sequential(&delta_read_tx, event.clone(), caller);
898            }
899            EventStatus::Failed(_) => {
900                if let Some(client) = caller {
901                    let message = TransactionUpdateMessage {
902                        event: Some(event.clone()),
903                        database_update: SubscriptionUpdateMessage::default_for_protocol(client.config.protocol, None),
904                    };
905
906                    let _ = self.broadcast_queue.send_client_message(client, message);
907                } else {
908                    log::trace!("Reducer failed but there is no client to send the failure to!")
909                }
910            }
911            EventStatus::OutOfEnergy => {} // ?
912        }
913
914        // Merge in the subscription evaluation metrics.
915        read_tx.metrics.merge(update_metrics);
916
917        Ok(Ok((event, update_metrics)))
918    }
919}
920
921pub struct WriteConflict;
922
923#[cfg(test)]
924mod tests {
925    use super::{AssertTxFn, ModuleSubscriptions};
926    use crate::client::messages::{
927        SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult,
928        SubscriptionUpdateMessage, TransactionUpdateMessage,
929    };
930    use crate::client::{ClientActorId, ClientConfig, ClientConnectionSender, ClientName, Protocol};
931    use crate::db::datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID};
932    use crate::db::relational_db::tests_utils::{
933        begin_mut_tx, begin_tx, insert, with_auto_commit, with_read_only, TestDB,
934    };
935    use crate::db::relational_db::RelationalDB;
936    use crate::error::DBError;
937    use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall};
938    use crate::messages::websocket as ws;
939    use crate::sql::execute::run;
940    use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
941    use crate::subscription::query::compile_read_only_query;
942    use crate::subscription::TableUpdateType;
943    use hashbrown::HashMap;
944    use itertools::Itertools;
945    use pretty_assertions::assert_matches;
946    use spacetimedb_client_api_messages::energy::EnergyQuanta;
947    use spacetimedb_client_api_messages::websocket::{
948        CompressableQueryUpdate, Compression, FormatSwitch, QueryId, Subscribe, SubscribeMulti, SubscribeSingle,
949        TableUpdate, Unsubscribe, UnsubscribeMulti,
950    };
951    use spacetimedb_execution::dml::MutDatastore;
952    use spacetimedb_lib::bsatn::ToBsatn;
953    use spacetimedb_lib::db::auth::StAccess;
954    use spacetimedb_lib::identity::AuthCtx;
955    use spacetimedb_lib::metrics::ExecutionMetrics;
956    use spacetimedb_lib::{bsatn, ConnectionId, ProductType, ProductValue, Timestamp};
957    use spacetimedb_lib::{error::ResultTest, AlgebraicType, Identity};
958    use spacetimedb_primitives::TableId;
959    use spacetimedb_sats::product;
960    use std::time::Instant;
961    use std::{sync::Arc, time::Duration};
962    use tokio::sync::mpsc::{self, Receiver};
963
964    fn add_subscriber(db: Arc<RelationalDB>, sql: &str, assert: Option<AssertTxFn>) -> Result<(), DBError> {
965        // Create and enter a Tokio runtime to run the `ModuleSubscriptions`' background workers in parallel.
966        let runtime = tokio::runtime::Runtime::new().unwrap();
967        let _rt = runtime.enter();
968        let owner = Identity::from_byte_array([1; 32]);
969        let client = ClientActorId::for_test(Identity::ZERO);
970        let config = ClientConfig::for_test();
971        let sender = Arc::new(ClientConnectionSender::dummy(client, config));
972        let send_worker_queue = spawn_send_worker(None);
973        let module_subscriptions = ModuleSubscriptions::new(
974            db.clone(),
975            SubscriptionManager::for_test_without_metrics_arc_rwlock(),
976            send_worker_queue,
977            owner,
978        );
979
980        let subscribe = Subscribe {
981            query_strings: [sql.into()].into(),
982            request_id: 0,
983        };
984        module_subscriptions.add_legacy_subscriber(sender, subscribe, Instant::now(), assert)?;
985        Ok(())
986    }
987
988    /// An in-memory `RelationalDB` for testing
989    fn relational_db() -> anyhow::Result<Arc<RelationalDB>> {
990        let TestDB { db, .. } = TestDB::in_memory()?;
991        Ok(Arc::new(db))
992    }
993
994    /// A [SubscribeSingle] message for testing
995    fn single_subscribe(sql: &str, query_id: u32) -> SubscribeSingle {
996        SubscribeSingle {
997            query: sql.into(),
998            request_id: 0,
999            query_id: QueryId::new(query_id),
1000        }
1001    }
1002
1003    /// A [SubscribeMulti] message for testing
1004    fn multi_subscribe(query_strings: &[&'static str], query_id: u32) -> SubscribeMulti {
1005        SubscribeMulti {
1006            query_strings: query_strings
1007                .iter()
1008                .map(|sql| String::from(*sql).into_boxed_str())
1009                .collect(),
1010            request_id: 0,
1011            query_id: QueryId::new(query_id),
1012        }
1013    }
1014
1015    /// A [SubscribeMulti] message for testing
1016    fn multi_unsubscribe(query_id: u32) -> UnsubscribeMulti {
1017        UnsubscribeMulti {
1018            request_id: 0,
1019            query_id: QueryId::new(query_id),
1020        }
1021    }
1022
1023    /// An [Unsubscribe] message for testing
1024    fn single_unsubscribe(query_id: u32) -> Unsubscribe {
1025        Unsubscribe {
1026            request_id: 0,
1027            query_id: QueryId::new(query_id),
1028        }
1029    }
1030
1031    /// A dummy [ModuleEvent] for testing
1032    fn module_event() -> ModuleEvent {
1033        ModuleEvent {
1034            timestamp: Timestamp::now(),
1035            caller_identity: Identity::ZERO,
1036            caller_connection_id: None,
1037            function_call: ModuleFunctionCall::default(),
1038            status: EventStatus::Committed(DatabaseUpdate::default()),
1039            energy_quanta_used: EnergyQuanta { quanta: 0 },
1040            host_execution_duration: Duration::from_millis(0),
1041            request_id: None,
1042            timer: None,
1043        }
1044    }
1045
1046    /// Create an [Identity] from a [u8]
1047    fn identity_from_u8(v: u8) -> Identity {
1048        Identity::from_byte_array([v; 32])
1049    }
1050
1051    /// Create an [ConnectionId] from a [u8]
1052    fn connection_id_from_u8(v: u8) -> ConnectionId {
1053        ConnectionId::from_be_byte_array([v; 16])
1054    }
1055
1056    /// Create an [ClientActorId] from a [u8].
1057    /// Calls [identity_from_u8] internally with the passed value.
1058    fn client_id_from_u8(v: u8) -> ClientActorId {
1059        ClientActorId {
1060            identity: identity_from_u8(v),
1061            connection_id: connection_id_from_u8(v),
1062            name: ClientName(v as u64),
1063        }
1064    }
1065
1066    /// Instantiate a client connection with compression
1067    fn client_connection_with_compression(
1068        client_id: ClientActorId,
1069        compression: Compression,
1070    ) -> (Arc<ClientConnectionSender>, Receiver<SerializableMessage>) {
1071        let (sender, rx) = ClientConnectionSender::dummy_with_channel(
1072            client_id,
1073            ClientConfig {
1074                protocol: Protocol::Binary,
1075                compression,
1076                tx_update_full: true,
1077            },
1078        );
1079        (Arc::new(sender), rx)
1080    }
1081
1082    /// Instantiate a client connection
1083    fn client_connection(client_id: ClientActorId) -> (Arc<ClientConnectionSender>, Receiver<SerializableMessage>) {
1084        client_connection_with_compression(client_id, Compression::None)
1085    }
1086
1087    /// Insert rules into the RLS system table
1088    fn insert_rls_rules(
1089        db: &RelationalDB,
1090        table_ids: impl IntoIterator<Item = TableId>,
1091        rules: impl IntoIterator<Item = &'static str>,
1092    ) -> anyhow::Result<()> {
1093        with_auto_commit(db, |tx| {
1094            for (table_id, sql) in table_ids.into_iter().zip(rules) {
1095                db.insert(
1096                    tx,
1097                    ST_ROW_LEVEL_SECURITY_ID,
1098                    &ProductValue::from(StRowLevelSecurityRow {
1099                        table_id,
1100                        sql: sql.into(),
1101                    })
1102                    .to_bsatn_vec()?,
1103                )?;
1104            }
1105            Ok(())
1106        })
1107    }
1108
1109    /// Subscribe to a query as a client
1110    fn subscribe_single(
1111        subs: &ModuleSubscriptions,
1112        sql: &'static str,
1113        sender: Arc<ClientConnectionSender>,
1114        counter: &mut u32,
1115    ) -> anyhow::Result<()> {
1116        *counter += 1;
1117        subs.add_single_subscription(sender, single_subscribe(sql, *counter), Instant::now(), None)?;
1118        Ok(())
1119    }
1120
1121    /// Subscribe to a set of queries as a client
1122    fn subscribe_multi(
1123        subs: &ModuleSubscriptions,
1124        queries: &[&'static str],
1125        sender: Arc<ClientConnectionSender>,
1126        counter: &mut u32,
1127    ) -> anyhow::Result<ExecutionMetrics> {
1128        *counter += 1;
1129        let metrics = subs
1130            .add_multi_subscription(sender, multi_subscribe(queries, *counter), Instant::now(), None)
1131            .map(|metrics| metrics.unwrap_or_default())?;
1132        Ok(metrics)
1133    }
1134
1135    /// Unsubscribe from a single query
1136    fn unsubscribe_single(
1137        subs: &ModuleSubscriptions,
1138        sender: Arc<ClientConnectionSender>,
1139        query_id: u32,
1140    ) -> anyhow::Result<()> {
1141        subs.remove_single_subscription(sender, single_unsubscribe(query_id), Instant::now())?;
1142        Ok(())
1143    }
1144
1145    /// Unsubscribe from a set of queries
1146    fn unsubscribe_multi(
1147        subs: &ModuleSubscriptions,
1148        sender: Arc<ClientConnectionSender>,
1149        query_id: u32,
1150    ) -> anyhow::Result<()> {
1151        subs.remove_multi_subscription(sender, multi_unsubscribe(query_id), Instant::now())?;
1152        Ok(())
1153    }
1154
1155    /// Pull a message from receiver and assert that it is a `TxUpdate` with the expected rows
1156    async fn assert_tx_update_for_table(
1157        rx: &mut Receiver<SerializableMessage>,
1158        table_id: TableId,
1159        schema: &ProductType,
1160        inserts: impl IntoIterator<Item = ProductValue>,
1161        deletes: impl IntoIterator<Item = ProductValue>,
1162    ) {
1163        match rx.recv().await {
1164            Some(SerializableMessage::TxUpdate(TransactionUpdateMessage {
1165                database_update:
1166                    SubscriptionUpdateMessage {
1167                        database_update: FormatSwitch::Bsatn(ws::DatabaseUpdate { mut tables }),
1168                        ..
1169                    },
1170                ..
1171            })) => {
1172                // Assume an update for only one table
1173                assert_eq!(tables.len(), 1);
1174
1175                let table_update = tables.pop().unwrap();
1176
1177                // We should not be sending empty updates to clients
1178                assert_ne!(table_update.num_rows, 0);
1179
1180                // It should be the table we expect
1181                assert_eq!(table_update.table_id, table_id);
1182
1183                let mut rows_received: HashMap<ProductValue, i32> = HashMap::new();
1184
1185                for uncompressed in table_update.updates {
1186                    let CompressableQueryUpdate::Uncompressed(table_update) = uncompressed else {
1187                        panic!("expected an uncompressed table update")
1188                    };
1189
1190                    for row in table_update
1191                        .inserts
1192                        .into_iter()
1193                        .map(|bytes| ProductValue::decode(schema, &mut &*bytes).unwrap())
1194                    {
1195                        *rows_received.entry(row).or_insert(0) += 1;
1196                    }
1197
1198                    for row in table_update
1199                        .deletes
1200                        .into_iter()
1201                        .map(|bytes| ProductValue::decode(schema, &mut &*bytes).unwrap())
1202                    {
1203                        *rows_received.entry(row).or_insert(0) -= 1;
1204                    }
1205                }
1206
1207                assert_eq!(
1208                    rows_received
1209                        .iter()
1210                        .filter(|(_, n)| n > &&0)
1211                        .map(|(row, _)| row)
1212                        .cloned()
1213                        .sorted()
1214                        .collect::<Vec<_>>(),
1215                    inserts.into_iter().sorted().collect::<Vec<_>>()
1216                );
1217                assert_eq!(
1218                    rows_received
1219                        .iter()
1220                        .filter(|(_, n)| n < &&0)
1221                        .map(|(row, _)| row)
1222                        .cloned()
1223                        .sorted()
1224                        .collect::<Vec<_>>(),
1225                    deletes.into_iter().sorted().collect::<Vec<_>>()
1226                );
1227            }
1228            Some(msg) => panic!("expected a TxUpdate, but got {:#?}", msg),
1229            None => panic!("The receiver closed due to an error"),
1230        }
1231    }
1232
1233    /// Commit a set of row updates and broadcast to subscribers
1234    fn commit_tx(
1235        db: &RelationalDB,
1236        subs: &ModuleSubscriptions,
1237        deletes: impl IntoIterator<Item = (TableId, ProductValue)>,
1238        inserts: impl IntoIterator<Item = (TableId, ProductValue)>,
1239    ) -> anyhow::Result<ExecutionMetrics> {
1240        let mut tx = begin_mut_tx(db);
1241        for (table_id, row) in deletes {
1242            tx.delete_product_value(table_id, &row)?;
1243        }
1244        for (table_id, row) in inserts {
1245            db.insert(&mut tx, table_id, &bsatn::to_vec(&row)?)?;
1246        }
1247
1248        let Ok(Ok((_, metrics))) = subs.commit_and_broadcast_event(None, module_event(), tx) else {
1249            panic!("Encountered an error in `commit_and_broadcast_event`");
1250        };
1251        Ok(metrics)
1252    }
1253
1254    #[test]
1255    fn test_subscribe_metrics() -> anyhow::Result<()> {
1256        let client_id = client_id_from_u8(1);
1257        let (sender, _) = client_connection(client_id);
1258
1259        let db = relational_db()?;
1260        let (subs, _runtime) = ModuleSubscriptions::for_test_new_runtime(db.clone());
1261
1262        // Create a table `t` with index on `id`
1263        let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::U64)], &[0.into()])?;
1264        with_auto_commit(&db, |tx| -> anyhow::Result<_> {
1265            db.insert(tx, table_id, &bsatn::to_vec(&product![1_u64])?)?;
1266            Ok(())
1267        })?;
1268
1269        let auth = AuthCtx::for_testing();
1270        let sql = "select * from t where id = 1";
1271        let tx = begin_tx(&db);
1272        let plan = compile_read_only_query(&auth, &tx, sql)?;
1273        let plan = Arc::new(plan);
1274
1275        let (_, metrics) = subs.evaluate_queries(sender, &[plan], &tx, &auth, TableUpdateType::Subscribe)?;
1276
1277        // We only probe the index once
1278        assert_eq!(metrics.index_seeks, 1);
1279        // We scan a single u64 when serializing the result
1280        assert_eq!(metrics.bytes_scanned, 8);
1281        // Subscriptions are read-only
1282        assert_eq!(metrics.bytes_written, 0);
1283        // Bytes scanned and bytes sent will always be the same for an initial subscription,
1284        // because a subscription is initiated by a single client.
1285        assert_eq!(metrics.bytes_sent_to_clients, 8);
1286
1287        // Note, rows scanned may be greater than one.
1288        // It depends on the number of operators used to answer the query.
1289        assert!(metrics.rows_scanned > 0);
1290        Ok(())
1291    }
1292
1293    fn check_subscription_err(sql: &str, result: Option<SerializableMessage>) {
1294        if let Some(SerializableMessage::Subscription(SubscriptionMessage {
1295            result: SubscriptionResult::Error(SubscriptionError { message, .. }),
1296            ..
1297        })) = result
1298        {
1299            assert!(
1300                message.contains(sql),
1301                "Expected error message to contain the SQL query: {sql}, but got: {message}",
1302            );
1303            return;
1304        }
1305        panic!("Expected a subscription error message, but got: {:?}", result);
1306    }
1307
1308    /// Test that clients receive error messages on subscribe
1309    #[tokio::test]
1310    async fn subscribe_single_error() -> anyhow::Result<()> {
1311        let client_id = client_id_from_u8(1);
1312        let (tx, mut rx) = client_connection(client_id);
1313
1314        let db = relational_db()?;
1315        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1316
1317        db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?;
1318
1319        // Subscribe to an invalid query (r is not in scope)
1320        let sql = "select r.* from t";
1321        subscribe_single(&subs, sql, tx, &mut 0)?;
1322
1323        check_subscription_err(sql, rx.recv().await);
1324
1325        Ok(())
1326    }
1327
1328    /// Test that clients receive error messages on subscribe
1329    #[tokio::test]
1330    async fn subscribe_multi_error() -> anyhow::Result<()> {
1331        let client_id = client_id_from_u8(1);
1332        let (tx, mut rx) = client_connection(client_id);
1333
1334        let db = relational_db()?;
1335        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1336
1337        db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?;
1338
1339        // Subscribe to an invalid query (r is not in scope)
1340        let sql = "select r.* from t";
1341        subscribe_multi(&subs, &[sql], tx, &mut 0)?;
1342
1343        check_subscription_err(sql, rx.recv().await);
1344
1345        Ok(())
1346    }
1347
1348    /// Test that clients receive error messages on unsubscribe
1349    #[tokio::test]
1350    async fn unsubscribe_single_error() -> anyhow::Result<()> {
1351        let client_id = client_id_from_u8(1);
1352        let (tx, mut rx) = client_connection(client_id);
1353
1354        let db = relational_db()?;
1355        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1356
1357        // Create a table `t` with an index on `id`
1358        let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::U8)], &[0.into()])?;
1359        let index_id = with_read_only(&db, |tx| {
1360            db.schema_for_table(&*tx, table_id).map(|schema| {
1361                schema
1362                    .indexes
1363                    .first()
1364                    .map(|index_schema| index_schema.index_id)
1365                    .unwrap()
1366            })
1367        })?;
1368
1369        let mut query_id = 0;
1370
1371        // Subscribe to `t`
1372        let sql = "select * from t where id = 1";
1373        subscribe_single(&subs, sql, tx.clone(), &mut query_id)?;
1374
1375        // The initial subscription should succeed
1376        assert!(matches!(
1377            rx.recv().await,
1378            Some(SerializableMessage::Subscription(SubscriptionMessage {
1379                result: SubscriptionResult::Subscribe(..),
1380                ..
1381            }))
1382        ));
1383
1384        // Remove the index from `id`
1385        with_auto_commit(&db, |tx| db.drop_index(tx, index_id))?;
1386
1387        // Unsubscribe from `t`
1388        unsubscribe_single(&subs, tx, query_id)?;
1389
1390        // Why does the unsubscribe fail?
1391        // This relies on some knowledge of the underlying implementation.
1392        // Specifically that we do not recompile queries on unsubscribe.
1393        // We execute the cached plan which in this case is an index scan.
1394        // The index no longer exists, and therefore it fails.
1395        check_subscription_err(sql, rx.recv().await);
1396
1397        Ok(())
1398    }
1399
1400    /// Test that clients receive error messages on unsubscribe
1401    ///
1402    /// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel.
1403    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1404    async fn unsubscribe_multi_error() -> anyhow::Result<()> {
1405        let client_id = client_id_from_u8(1);
1406        let (tx, mut rx) = client_connection(client_id);
1407
1408        let db = relational_db()?;
1409        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1410
1411        // Create a table `t` with an index on `id`
1412        let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::U8)], &[0.into()])?;
1413        let index_id = with_read_only(&db, |tx| {
1414            db.schema_for_table(&*tx, table_id).map(|schema| {
1415                schema
1416                    .indexes
1417                    .first()
1418                    .map(|index_schema| index_schema.index_id)
1419                    .unwrap()
1420            })
1421        })?;
1422
1423        commit_tx(&db, &subs, [], [(table_id, product![0_u8])])?;
1424
1425        let mut query_id = 0;
1426
1427        // Subscribe to `t`
1428        let sql = "select * from t where id = 1";
1429        subscribe_multi(&subs, &[sql], tx.clone(), &mut query_id)?;
1430
1431        // The initial subscription should succeed
1432        assert!(matches!(
1433            rx.recv().await,
1434            Some(SerializableMessage::Subscription(SubscriptionMessage {
1435                result: SubscriptionResult::SubscribeMulti(..),
1436                ..
1437            }))
1438        ));
1439
1440        // Remove the index from `id`
1441        with_auto_commit(&db, |tx| db.drop_index(tx, index_id))?;
1442
1443        // Unsubscribe from `t`
1444        unsubscribe_multi(&subs, tx, query_id)?;
1445
1446        // Why does the unsubscribe fail?
1447        // This relies on some knowledge of the underlying implementation.
1448        // Specifically that we do not recompile queries on unsubscribe.
1449        // We execute the cached plan which in this case is an index scan.
1450        // The index no longer exists, and therefore it fails.
1451        check_subscription_err(sql, rx.recv().await);
1452
1453        Ok(())
1454    }
1455
1456    /// Test that clients receive error messages on tx updates
1457    #[tokio::test]
1458    async fn tx_update_error() -> anyhow::Result<()> {
1459        let client_id = client_id_from_u8(1);
1460        let (tx, mut rx) = client_connection(client_id);
1461
1462        let db = relational_db()?;
1463        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1464
1465        // Create two tables `t` and `s` with indexes on their `id` columns
1466        let t_id = db.create_table_for_test("t", &[("id", AlgebraicType::U8)], &[0.into()])?;
1467        let s_id = db.create_table_for_test("s", &[("id", AlgebraicType::U8)], &[0.into()])?;
1468        let index_id = with_read_only(&db, |tx| {
1469            db.schema_for_table(&*tx, s_id).map(|schema| {
1470                schema
1471                    .indexes
1472                    .first()
1473                    .map(|index_schema| index_schema.index_id)
1474                    .unwrap()
1475            })
1476        })?;
1477        let sql = "select t.* from t join s on t.id = s.id";
1478        subscribe_single(&subs, sql, tx, &mut 0)?;
1479
1480        // The initial subscription should succeed
1481        assert!(matches!(
1482            rx.recv().await,
1483            Some(SerializableMessage::Subscription(SubscriptionMessage {
1484                result: SubscriptionResult::Subscribe(..),
1485                ..
1486            }))
1487        ));
1488
1489        // Remove the index from `s`
1490        with_auto_commit(&db, |tx| db.drop_index(tx, index_id))?;
1491
1492        // Start a new transaction and insert a new row into `t`
1493        let mut tx = begin_mut_tx(&db);
1494        db.insert(&mut tx, t_id, &bsatn::to_vec(&product![2_u8])?)?;
1495
1496        assert!(matches!(
1497            subs.commit_and_broadcast_event(None, module_event(), tx),
1498            Ok(Ok(_))
1499        ));
1500
1501        // Why does the update fail?
1502        // This relies on some knowledge of the underlying implementation.
1503        // Specifically, plans are cached on the initial subscribe.
1504        // Hence we execute a cached plan which happens to be an index join.
1505        // We've removed the index on `s`, and therefore it fails.
1506        check_subscription_err(sql, rx.recv().await);
1507
1508        Ok(())
1509    }
1510
1511    /// Test that two clients can subscribe to a parameterized query and get the correct rows.
1512    #[tokio::test]
1513    async fn test_parameterized_subscription() -> anyhow::Result<()> {
1514        // Create identities for two different clients
1515        let id_for_a = identity_from_u8(1);
1516        let id_for_b = identity_from_u8(2);
1517
1518        let client_id_for_a = client_id_from_u8(1);
1519        let client_id_for_b = client_id_from_u8(2);
1520
1521        // Establish a connection for each client
1522        let (tx_for_a, mut rx_for_a) = client_connection(client_id_for_a);
1523        let (tx_for_b, mut rx_for_b) = client_connection(client_id_for_b);
1524
1525        let db = relational_db()?;
1526        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1527
1528        let schema = [("identity", AlgebraicType::identity())];
1529
1530        let table_id = db.create_table_for_test("t", &schema, &[])?;
1531
1532        let mut query_ids = 0;
1533
1534        // Have each client subscribe to the same parameterized query.
1535        // Each client should receive different rows.
1536        subscribe_multi(
1537            &subs,
1538            &["select * from t where identity = :sender"],
1539            tx_for_a,
1540            &mut query_ids,
1541        )?;
1542        subscribe_multi(
1543            &subs,
1544            &["select * from t where identity = :sender"],
1545            tx_for_b,
1546            &mut query_ids,
1547        )?;
1548
1549        // Wait for both subscriptions
1550        assert!(matches!(
1551            rx_for_a.recv().await,
1552            Some(SerializableMessage::Subscription(_))
1553        ));
1554        assert!(matches!(
1555            rx_for_b.recv().await,
1556            Some(SerializableMessage::Subscription(_))
1557        ));
1558
1559        // Insert two identities - one for each caller - into the table
1560        let mut tx = begin_mut_tx(&db);
1561        db.insert(&mut tx, table_id, &bsatn::to_vec(&product![id_for_a])?)?;
1562        db.insert(&mut tx, table_id, &bsatn::to_vec(&product![id_for_b])?)?;
1563
1564        assert!(matches!(
1565            subs.commit_and_broadcast_event(None, module_event(), tx),
1566            Ok(Ok(_))
1567        ));
1568
1569        let schema = ProductType::from([AlgebraicType::identity()]);
1570
1571        // Both clients should only receive their identities and not the other's.
1572        assert_tx_update_for_table(&mut rx_for_a, table_id, &schema, [product![id_for_a]], []).await;
1573        assert_tx_update_for_table(&mut rx_for_b, table_id, &schema, [product![id_for_b]], []).await;
1574        Ok(())
1575    }
1576
1577    /// Test that two clients can subscribe to a table with RLS rules and get the correct rows
1578    #[tokio::test]
1579    async fn test_rls_subscription() -> anyhow::Result<()> {
1580        // Create identities for two different clients
1581        let id_for_a = identity_from_u8(1);
1582        let id_for_b = identity_from_u8(2);
1583
1584        let client_id_for_a = client_id_from_u8(1);
1585        let client_id_for_b = client_id_from_u8(2);
1586
1587        // Establish a connection for each client
1588        let (tx_for_a, mut rx_for_a) = client_connection(client_id_for_a);
1589        let (tx_for_b, mut rx_for_b) = client_connection(client_id_for_b);
1590
1591        let db = relational_db()?;
1592        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1593
1594        let schema = [("id", AlgebraicType::identity())];
1595
1596        let u_id = db.create_table_for_test("u", &schema, &[0.into()])?;
1597        let v_id = db.create_table_for_test("v", &schema, &[0.into()])?;
1598        let w_id = db.create_table_for_test("w", &schema, &[0.into()])?;
1599
1600        insert_rls_rules(
1601            &db,
1602            [u_id, v_id, w_id, w_id],
1603            [
1604                "select * from u where id = :sender",
1605                "select * from v where id = :sender",
1606                "select w.* from u join w on u.id = w.id",
1607                "select w.* from v join w on v.id = w.id",
1608            ],
1609        )?;
1610
1611        let mut query_ids = 0;
1612
1613        // Have each client subscribe to `w`.
1614        // Because `w` is gated using parameterized RLS rules,
1615        // each client should receive different rows.
1616        subscribe_multi(&subs, &["select * from w"], tx_for_a, &mut query_ids)?;
1617        subscribe_multi(&subs, &["select * from w"], tx_for_b, &mut query_ids)?;
1618
1619        // Wait for both subscriptions
1620        assert!(matches!(
1621            rx_for_a.recv().await,
1622            Some(SerializableMessage::Subscription(_))
1623        ));
1624        assert!(matches!(
1625            rx_for_b.recv().await,
1626            Some(SerializableMessage::Subscription(_))
1627        ));
1628
1629        // Insert a row into `u` for client "a".
1630        // Insert a row into `v` for client "b".
1631        // Insert a row into `w` for both.
1632        let mut tx = begin_mut_tx(&db);
1633        db.insert(&mut tx, u_id, &bsatn::to_vec(&product![id_for_a])?)?;
1634        db.insert(&mut tx, v_id, &bsatn::to_vec(&product![id_for_b])?)?;
1635        db.insert(&mut tx, w_id, &bsatn::to_vec(&product![id_for_a])?)?;
1636        db.insert(&mut tx, w_id, &bsatn::to_vec(&product![id_for_b])?)?;
1637
1638        assert!(matches!(
1639            subs.commit_and_broadcast_event(None, module_event(), tx),
1640            Ok(Ok(_))
1641        ));
1642
1643        let schema = ProductType::from([AlgebraicType::identity()]);
1644
1645        // Both clients should only receive their identities and not the other's.
1646        assert_tx_update_for_table(&mut rx_for_a, w_id, &schema, [product![id_for_a]], []).await;
1647        assert_tx_update_for_table(&mut rx_for_b, w_id, &schema, [product![id_for_b]], []).await;
1648        Ok(())
1649    }
1650
1651    /// Test that a client and the database owner can subscribe to the same query
1652    #[tokio::test]
1653    async fn test_rls_for_owner() -> anyhow::Result<()> {
1654        // Establish a connection for owner and client
1655        let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(0));
1656        let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(1));
1657
1658        let db = relational_db()?;
1659        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1660
1661        // Create table `t`
1662        let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::identity())], &[0.into()])?;
1663
1664        // Restrict access to `t`
1665        insert_rls_rules(&db, [table_id], ["select * from t where id = :sender"])?;
1666
1667        let mut query_ids = 0;
1668
1669        // Have owner and client subscribe to `t`
1670        subscribe_multi(&subs, &["select * from t"], tx_for_a, &mut query_ids)?;
1671        subscribe_multi(&subs, &["select * from t"], tx_for_b, &mut query_ids)?;
1672
1673        // Wait for both subscriptions
1674        assert_matches!(
1675            rx_for_a.recv().await,
1676            Some(SerializableMessage::Subscription(SubscriptionMessage {
1677                result: SubscriptionResult::SubscribeMulti(_),
1678                ..
1679            }))
1680        );
1681        assert_matches!(
1682            rx_for_b.recv().await,
1683            Some(SerializableMessage::Subscription(SubscriptionMessage {
1684                result: SubscriptionResult::SubscribeMulti(_),
1685                ..
1686            }))
1687        );
1688
1689        let schema = ProductType::from([AlgebraicType::identity()]);
1690
1691        let id_for_b = identity_from_u8(1);
1692        let id_for_c = identity_from_u8(2);
1693
1694        commit_tx(
1695            &db,
1696            &subs,
1697            [],
1698            [
1699                // Insert an identity for client `b` plus a random identity
1700                (table_id, product![id_for_b]),
1701                (table_id, product![id_for_c]),
1702            ],
1703        )?;
1704
1705        assert_tx_update_for_table(
1706            &mut rx_for_a,
1707            table_id,
1708            &schema,
1709            // The owner should receive both identities
1710            [product![id_for_b], product![id_for_c]],
1711            [],
1712        )
1713        .await;
1714
1715        assert_tx_update_for_table(
1716            &mut rx_for_b,
1717            table_id,
1718            &schema,
1719            // Client `b` should only receive its identity
1720            [product![id_for_b]],
1721            [],
1722        )
1723        .await;
1724
1725        Ok(())
1726    }
1727
1728    /// Test that we do not send empty updates to clients
1729    #[tokio::test]
1730    async fn test_no_empty_updates() -> anyhow::Result<()> {
1731        // Establish a client connection
1732        let (tx, mut rx) = client_connection(client_id_from_u8(1));
1733
1734        let db = relational_db()?;
1735        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1736
1737        let schema = [("x", AlgebraicType::U8)];
1738
1739        let t_id = db.create_table_for_test("t", &schema, &[])?;
1740
1741        // Subscribe to rows of `t` where `x` is 0
1742        subscribe_multi(&subs, &["select * from t where x = 0"], tx, &mut 0)?;
1743
1744        // Wait to receive the initial subscription message
1745        assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))));
1746
1747        // Insert a row that does not match the query
1748        let mut tx = begin_mut_tx(&db);
1749        db.insert(&mut tx, t_id, &bsatn::to_vec(&product![1_u8])?)?;
1750
1751        assert!(matches!(
1752            subs.commit_and_broadcast_event(None, module_event(), tx),
1753            Ok(Ok(_))
1754        ));
1755
1756        // Insert a row that does match the query
1757        let mut tx = begin_mut_tx(&db);
1758        db.insert(&mut tx, t_id, &bsatn::to_vec(&product![0_u8])?)?;
1759
1760        assert!(matches!(
1761            subs.commit_and_broadcast_event(None, module_event(), tx),
1762            Ok(Ok(_))
1763        ));
1764
1765        let schema = ProductType::from([AlgebraicType::U8]);
1766
1767        // If the server sends empty updates, this assertion will fail,
1768        // because we will receive one for the first transaction.
1769        assert_tx_update_for_table(&mut rx, t_id, &schema, [product![0_u8]], []).await;
1770        Ok(())
1771    }
1772
1773    /// Test that we do not compress within a [SubscriptionMessage].
1774    /// The message itself is compressed before being sent over the wire,
1775    /// but we don't care about that for this test.
1776    ///
1777    /// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel.
1778    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1779    async fn test_no_compression_for_subscribe() -> anyhow::Result<()> {
1780        // Establish a client connection with compression
1781        let (tx, mut rx) = client_connection_with_compression(client_id_from_u8(1), Compression::Brotli);
1782
1783        let db = relational_db()?;
1784        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1785
1786        let table_id = db.create_table_for_test("t", &[("x", AlgebraicType::U64)], &[])?;
1787
1788        let mut inserts = vec![];
1789
1790        for i in 0..16_000u64 {
1791            inserts.push((table_id, product![i]));
1792        }
1793
1794        // Insert a lot of rows into `t`.
1795        // We want to insert enough to cross any threshold there might be for compression.
1796        commit_tx(&db, &subs, [], inserts)?;
1797
1798        // Subscribe to the entire table
1799        subscribe_multi(&subs, &["select * from t"], tx, &mut 0)?;
1800
1801        // Assert the table updates within this message are all be uncompressed
1802        match rx.recv().await {
1803            Some(SerializableMessage::Subscription(SubscriptionMessage {
1804                result:
1805                    SubscriptionResult::SubscribeMulti(SubscriptionData {
1806                        data: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }),
1807                    }),
1808                ..
1809            })) => {
1810                assert!(tables.iter().all(|TableUpdate { updates, .. }| updates
1811                    .iter()
1812                    .all(|query_update| matches!(query_update, CompressableQueryUpdate::Uncompressed(_)))));
1813            }
1814            Some(_) => panic!("unexpected message from subscription"),
1815            None => panic!("channel unexpectedly closed"),
1816        };
1817
1818        Ok(())
1819    }
1820
1821    /// Test that we receive subscription updates for DML
1822    #[tokio::test]
1823    async fn test_updates_for_dml() -> anyhow::Result<()> {
1824        // Establish a client connection
1825        let (tx, mut rx) = client_connection(client_id_from_u8(1));
1826
1827        let db = relational_db()?;
1828        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1829        let schema = [("x", AlgebraicType::U8), ("y", AlgebraicType::U8)];
1830        let t_id = db.create_table_for_test("t", &schema, &[])?;
1831
1832        // Subscribe to `t`
1833        subscribe_multi(&subs, &["select * from t"], tx, &mut 0)?;
1834
1835        // Wait to receive the initial subscription message
1836        assert_matches!(rx.recv().await, Some(SerializableMessage::Subscription(_)));
1837
1838        let schema = ProductType::from([AlgebraicType::U8, AlgebraicType::U8]);
1839
1840        // Only the owner can invoke DML commands
1841        let auth = AuthCtx::new(identity_from_u8(0), identity_from_u8(0));
1842
1843        run(
1844            &db,
1845            "INSERT INTO t (x, y) VALUES (0, 1)",
1846            auth,
1847            Some(&subs),
1848            &mut vec![],
1849        )?;
1850
1851        // Client should receive insert
1852        assert_tx_update_for_table(&mut rx, t_id, &schema, [product![0_u8, 1_u8]], []).await;
1853
1854        run(&db, "UPDATE t SET y=2 WHERE x=0", auth, Some(&subs), &mut vec![])?;
1855
1856        // Client should receive update
1857        assert_tx_update_for_table(&mut rx, t_id, &schema, [product![0_u8, 2_u8]], [product![0_u8, 1_u8]]).await;
1858
1859        run(&db, "DELETE FROM t WHERE x=0", auth, Some(&subs), &mut vec![])?;
1860
1861        // Client should receive delete
1862        assert_tx_update_for_table(&mut rx, t_id, &schema, [], [product![0_u8, 2_u8]]).await;
1863        Ok(())
1864    }
1865
1866    /// Test that we do not compress within a [TransactionUpdateMessage].
1867    /// The message itself is compressed before being sent over the wire,
1868    /// but we don't care about that for this test.
1869    #[tokio::test]
1870    async fn test_no_compression_for_update() -> anyhow::Result<()> {
1871        // Establish a client connection with compression
1872        let (tx, mut rx) = client_connection_with_compression(client_id_from_u8(1), Compression::Brotli);
1873
1874        let db = relational_db()?;
1875        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1876
1877        let table_id = db.create_table_for_test("t", &[("x", AlgebraicType::U64)], &[])?;
1878
1879        let mut inserts = vec![];
1880
1881        for i in 0..16_000u64 {
1882            inserts.push((table_id, product![i]));
1883        }
1884
1885        // Subscribe to the entire table
1886        subscribe_multi(&subs, &["select * from t"], tx, &mut 0)?;
1887
1888        // Wait to receive the initial subscription message
1889        assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))));
1890
1891        // Insert a lot of rows into `t`.
1892        // We want to insert enough to cross any threshold there might be for compression.
1893        commit_tx(&db, &subs, [], inserts)?;
1894
1895        // Assert the table updates within this message are all be uncompressed
1896        match rx.recv().await {
1897            Some(SerializableMessage::TxUpdate(TransactionUpdateMessage {
1898                database_update:
1899                    SubscriptionUpdateMessage {
1900                        database_update: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }),
1901                        ..
1902                    },
1903                ..
1904            })) => {
1905                assert!(tables.iter().all(|TableUpdate { updates, .. }| updates
1906                    .iter()
1907                    .all(|query_update| matches!(query_update, CompressableQueryUpdate::Uncompressed(_)))));
1908            }
1909            Some(_) => panic!("unexpected message from subscription"),
1910            None => panic!("channel unexpectedly closed"),
1911        };
1912
1913        Ok(())
1914    }
1915
1916    /// In this test we subscribe to a join query, update the lhs table,
1917    /// and assert that the server sends the correct delta to the client.
1918    #[tokio::test]
1919    async fn test_update_for_join() -> anyhow::Result<()> {
1920        async fn test_subscription_updates(queries: &[&'static str]) -> anyhow::Result<()> {
1921            // Establish a client connection
1922            let (sender, mut rx) = client_connection(client_id_from_u8(1));
1923
1924            let db = relational_db()?;
1925            let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
1926
1927            let p_schema = [("id", AlgebraicType::U64), ("signed_in", AlgebraicType::Bool)];
1928            let l_schema = [
1929                ("id", AlgebraicType::U64),
1930                ("x", AlgebraicType::U64),
1931                ("z", AlgebraicType::U64),
1932            ];
1933
1934            let p_id = db.create_table_for_test("p", &p_schema, &[0.into()])?;
1935            let l_id = db.create_table_for_test("l", &l_schema, &[0.into()])?;
1936
1937            subscribe_multi(&subs, queries, sender, &mut 0)?;
1938
1939            assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))));
1940
1941            // Insert two matching player rows
1942            commit_tx(
1943                &db,
1944                &subs,
1945                [],
1946                [
1947                    (p_id, product![1_u64, true]),
1948                    (p_id, product![2_u64, true]),
1949                    (l_id, product![1_u64, 2_u64, 2_u64]),
1950                    (l_id, product![2_u64, 3_u64, 3_u64]),
1951                ],
1952            )?;
1953
1954            let schema = ProductType::from(p_schema);
1955
1956            // We should receive both matching player rows
1957            assert_tx_update_for_table(
1958                &mut rx,
1959                p_id,
1960                &schema,
1961                [product![1_u64, true], product![2_u64, true]],
1962                [],
1963            )
1964            .await;
1965
1966            // Update one of the matching player rows
1967            commit_tx(
1968                &db,
1969                &subs,
1970                [(p_id, product![2_u64, true])],
1971                [(p_id, product![2_u64, false])],
1972            )?;
1973
1974            // We should receive an update for it because it is still matching
1975            assert_tx_update_for_table(
1976                &mut rx,
1977                p_id,
1978                &schema,
1979                [product![2_u64, false]],
1980                [product![2_u64, true]],
1981            )
1982            .await;
1983
1984            // Update the the same matching player row
1985            commit_tx(
1986                &db,
1987                &subs,
1988                [(p_id, product![2_u64, false])],
1989                [(p_id, product![2_u64, true])],
1990            )?;
1991
1992            // We should receive an update for it because it is still matching
1993            assert_tx_update_for_table(
1994                &mut rx,
1995                p_id,
1996                &schema,
1997                [product![2_u64, true]],
1998                [product![2_u64, false]],
1999            )
2000            .await;
2001
2002            Ok(())
2003        }
2004
2005        test_subscription_updates(&[
2006            "select * from p where id = 1",
2007            "select p.* from p join l on p.id = l.id where l.x > 0 and l.x < 5 and l.z > 0 and l.z < 5",
2008        ])
2009        .await?;
2010        test_subscription_updates(&[
2011            "select * from p where id = 1",
2012            "select p.* from p join l on p.id = l.id where 0 < l.x and l.x < 5 and 0 < l.z and l.z < 5",
2013        ])
2014        .await?;
2015        test_subscription_updates(&[
2016            "select * from p where id = 1",
2017            "select p.* from p join l on p.id = l.id where l.x > 0 and l.x < 5 and l.x > 0 and l.z < 5 and l.id != 1",
2018        ])
2019        .await?;
2020        test_subscription_updates(&[
2021            "select * from p where id = 1",
2022            "select p.* from p join l on p.id = l.id where 0 < l.x and l.x < 5 and 0 < l.z and l.z < 5 and l.id != 1",
2023        ])
2024        .await?;
2025
2026        Ok(())
2027    }
2028
2029    /// Test that we do not evaluate queries that we know will not match table update rows
2030    ///
2031    /// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel.
2032    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
2033    async fn test_query_pruning() -> anyhow::Result<()> {
2034        // Establish a connection for each client
2035        let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1));
2036        let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2));
2037
2038        let db = relational_db()?;
2039        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
2040
2041        let u_id = db.create_table_for_test(
2042            "u",
2043            &[
2044                ("i", AlgebraicType::U64),
2045                ("a", AlgebraicType::U64),
2046                ("b", AlgebraicType::U64),
2047            ],
2048            &[0.into()],
2049        )?;
2050        let v_id = db.create_table_for_test(
2051            "v",
2052            &[
2053                ("i", AlgebraicType::U64),
2054                ("x", AlgebraicType::U64),
2055                ("y", AlgebraicType::U64),
2056            ],
2057            &[0.into(), 1.into()],
2058        )?;
2059
2060        commit_tx(
2061            &db,
2062            &subs,
2063            [],
2064            [
2065                (u_id, product![0u64, 1u64, 1u64]),
2066                (u_id, product![1u64, 2u64, 2u64]),
2067                (u_id, product![2u64, 3u64, 3u64]),
2068                (v_id, product![0u64, 4u64, 4u64]),
2069                (v_id, product![1u64, 5u64, 5u64]),
2070            ],
2071        )?;
2072
2073        let mut query_ids = 0;
2074
2075        // Returns (i: 0, a: 1, b: 1)
2076        subscribe_multi(
2077            &subs,
2078            &[
2079                "select u.* from u join v on u.i = v.i where v.x = 4",
2080                "select u.* from u join v on u.i = v.i where v.x = 6",
2081            ],
2082            tx_for_a,
2083            &mut query_ids,
2084        )?;
2085
2086        // Returns (i: 1, a: 2, b: 2)
2087        subscribe_multi(
2088            &subs,
2089            &[
2090                "select u.* from u join v on u.i = v.i where v.x = 5",
2091                "select u.* from u join v on u.i = v.i where v.x = 7",
2092            ],
2093            tx_for_b,
2094            &mut query_ids,
2095        )?;
2096
2097        // Wait for both subscriptions
2098        assert!(matches!(
2099            rx_for_a.recv().await,
2100            Some(SerializableMessage::Subscription(SubscriptionMessage {
2101                result: SubscriptionResult::SubscribeMulti(_),
2102                ..
2103            }))
2104        ));
2105        assert!(matches!(
2106            rx_for_b.recv().await,
2107            Some(SerializableMessage::Subscription(SubscriptionMessage {
2108                result: SubscriptionResult::SubscribeMulti(_),
2109                ..
2110            }))
2111        ));
2112
2113        // Modify a single row in `v`
2114        let metrics = commit_tx(
2115            &db,
2116            &subs,
2117            [(v_id, product![1u64, 5u64, 5u64])],
2118            [(v_id, product![1u64, 5u64, 6u64])],
2119        )?;
2120
2121        // We should only have evaluated a single query
2122        assert_eq!(metrics.delta_queries_evaluated, 1);
2123        assert_eq!(metrics.delta_queries_matched, 0);
2124
2125        // Insert a new row into `v`
2126        let metrics = commit_tx(&db, &subs, [], [(v_id, product![2u64, 6u64, 6u64])])?;
2127
2128        assert_tx_update_for_table(
2129            &mut rx_for_a,
2130            u_id,
2131            &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
2132            [product![2u64, 3u64, 3u64]],
2133            [],
2134        )
2135        .await;
2136
2137        // We should only have evaluated a single query
2138        assert_eq!(metrics.delta_queries_evaluated, 1);
2139        assert_eq!(metrics.delta_queries_matched, 1);
2140
2141        // Modify a matching row in `u`
2142        let metrics = commit_tx(
2143            &db,
2144            &subs,
2145            [(u_id, product![1u64, 2u64, 2u64])],
2146            [(u_id, product![1u64, 2u64, 3u64])],
2147        )?;
2148
2149        assert_tx_update_for_table(
2150            &mut rx_for_b,
2151            u_id,
2152            &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
2153            [product![1u64, 2u64, 3u64]],
2154            [product![1u64, 2u64, 2u64]],
2155        )
2156        .await;
2157
2158        // We should have evaluated all of the queries
2159        assert_eq!(metrics.delta_queries_evaluated, 4);
2160        assert_eq!(metrics.delta_queries_matched, 1);
2161
2162        // Insert a non-matching row in `u`
2163        let metrics = commit_tx(&db, &subs, [], [(u_id, product![3u64, 0u64, 0u64])])?;
2164
2165        // We should have evaluated all of the queries
2166        assert_eq!(metrics.delta_queries_evaluated, 4);
2167        assert_eq!(metrics.delta_queries_matched, 0);
2168
2169        Ok(())
2170    }
2171
2172    /// Test that we do not evaluate queries that we know will not match row updates
2173    #[tokio::test]
2174    async fn test_join_pruning() -> anyhow::Result<()> {
2175        let (tx, mut rx) = client_connection(client_id_from_u8(1));
2176
2177        let db = relational_db()?;
2178        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
2179
2180        let u_id = db.create_table_for_test_with_the_works(
2181            "u",
2182            &[
2183                ("i", AlgebraicType::U64),
2184                ("a", AlgebraicType::U64),
2185                ("b", AlgebraicType::U64),
2186            ],
2187            &[0.into()],
2188            &[0.into()],
2189            StAccess::Public,
2190        )?;
2191        let v_id = db.create_table_for_test_with_the_works(
2192            "v",
2193            &[
2194                ("i", AlgebraicType::U64),
2195                ("x", AlgebraicType::U64),
2196                ("y", AlgebraicType::U64),
2197            ],
2198            &[0.into(), 1.into()],
2199            &[0.into()],
2200            StAccess::Public,
2201        )?;
2202
2203        let schema = ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]);
2204
2205        commit_tx(
2206            &db,
2207            &subs,
2208            [],
2209            [
2210                (v_id, product![1u64, 1u64, 1u64]),
2211                (v_id, product![2u64, 2u64, 2u64]),
2212                (v_id, product![3u64, 3u64, 3u64]),
2213                (v_id, product![4u64, 4u64, 4u64]),
2214                (v_id, product![5u64, 5u64, 5u64]),
2215            ],
2216        )?;
2217
2218        let mut query_ids = 0;
2219
2220        subscribe_multi(
2221            &subs,
2222            &[
2223                "select u.* from u join v on u.i = v.i where v.x = 1",
2224                "select u.* from u join v on u.i = v.i where v.x = 2",
2225                "select u.* from u join v on u.i = v.i where v.x = 3",
2226                "select u.* from u join v on u.i = v.i where v.x = 4",
2227                "select u.* from u join v on u.i = v.i where v.x = 5",
2228            ],
2229            tx,
2230            &mut query_ids,
2231        )?;
2232
2233        assert_matches!(
2234            rx.recv().await,
2235            Some(SerializableMessage::Subscription(SubscriptionMessage {
2236                result: SubscriptionResult::SubscribeMulti(_),
2237                ..
2238            }))
2239        );
2240
2241        // Insert a new row into `u` that joins with `x = 1`
2242        let metrics = commit_tx(&db, &subs, [], [(u_id, product![1u64, 2u64, 3u64])])?;
2243
2244        assert_tx_update_for_table(&mut rx, u_id, &schema, [product![1u64, 2u64, 3u64]], []).await;
2245
2246        // We should only have evaluated a single query
2247        assert_eq!(metrics.delta_queries_evaluated, 1);
2248        assert_eq!(metrics.delta_queries_matched, 1);
2249
2250        // UPDATE v SET y = 2 WHERE id = 1
2251        let metrics = commit_tx(
2252            &db,
2253            &subs,
2254            [(v_id, product![1u64, 1u64, 1u64])],
2255            [(v_id, product![1u64, 1u64, 2u64])],
2256        )?;
2257
2258        // We should only have evaluated a single query
2259        assert_eq!(metrics.delta_queries_evaluated, 1);
2260        assert_eq!(metrics.delta_queries_matched, 0);
2261
2262        // UPDATE v SET x = 2 WHERE id = 1
2263        let metrics = commit_tx(
2264            &db,
2265            &subs,
2266            [(v_id, product![1u64, 1u64, 2u64])],
2267            [(v_id, product![1u64, 2u64, 2u64])],
2268        )?;
2269
2270        // Results in a no-op
2271        assert_tx_update_for_table(&mut rx, u_id, &schema, [], []).await;
2272
2273        // We should have evaluated queries for `x = 1` and `x = 2`
2274        assert_eq!(metrics.delta_queries_evaluated, 2);
2275        assert_eq!(metrics.delta_queries_matched, 2);
2276
2277        // Insert new row into `u` that joins with `x = 3`
2278        // UPDATE v SET x = 4 WHERE id = 3
2279        let metrics = commit_tx(
2280            &db,
2281            &subs,
2282            [(v_id, product![3u64, 3u64, 3u64])],
2283            [(v_id, product![3u64, 4u64, 3u64]), (u_id, product![3u64, 4u64, 5u64])],
2284        )?;
2285
2286        assert_tx_update_for_table(&mut rx, u_id, &schema, [product![3u64, 4u64, 5u64]], []).await;
2287
2288        // We should have evaluated queries for `x = 3` and `x = 4`
2289        assert_eq!(metrics.delta_queries_evaluated, 2);
2290        assert_eq!(metrics.delta_queries_matched, 2);
2291
2292        // UPDATE v SET x = 0 WHERE id = 3
2293        let metrics = commit_tx(
2294            &db,
2295            &subs,
2296            [(v_id, product![3u64, 4u64, 3u64])],
2297            [(v_id, product![3u64, 0u64, 3u64])],
2298        )?;
2299
2300        assert_tx_update_for_table(&mut rx, u_id, &schema, [], [product![3u64, 4u64, 5u64]]).await;
2301
2302        // We should only have evaluated the query for `x = 4`
2303        assert_eq!(metrics.delta_queries_evaluated, 1);
2304        assert_eq!(metrics.delta_queries_matched, 1);
2305
2306        // Insert new row into `u` that joins with `x = 5`
2307        // UPDATE v SET x = 6 WHERE id = 5
2308        let metrics = commit_tx(
2309            &db,
2310            &subs,
2311            [(v_id, product![5u64, 5u64, 5u64])],
2312            [(v_id, product![5u64, 6u64, 6u64]), (u_id, product![5u64, 6u64, 7u64])],
2313        )?;
2314
2315        // Results in a no-op
2316        assert_tx_update_for_table(&mut rx, u_id, &schema, [], []).await;
2317
2318        // We should only have evaluated the query for `x = 5`
2319        assert_eq!(metrics.delta_queries_evaluated, 1);
2320        assert_eq!(metrics.delta_queries_matched, 1);
2321
2322        Ok(())
2323    }
2324
2325    /// Test that one client unsubscribing does not affect another
2326    #[tokio::test]
2327    async fn test_unsubscribe() -> anyhow::Result<()> {
2328        // Establish a connection for each client
2329        let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1));
2330        let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2));
2331
2332        let db = relational_db()?;
2333        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
2334
2335        let u_id = db.create_table_for_test(
2336            "u",
2337            &[
2338                ("i", AlgebraicType::U64),
2339                ("a", AlgebraicType::U64),
2340                ("b", AlgebraicType::U64),
2341            ],
2342            &[0.into()],
2343        )?;
2344        let v_id = db.create_table_for_test(
2345            "v",
2346            &[
2347                ("i", AlgebraicType::U64),
2348                ("x", AlgebraicType::U64),
2349                ("y", AlgebraicType::U64),
2350            ],
2351            &[0.into(), 1.into()],
2352        )?;
2353
2354        commit_tx(&db, &subs, [], [(v_id, product![1u64, 1u64, 1u64])])?;
2355
2356        let mut query_ids = 0;
2357
2358        subscribe_multi(
2359            &subs,
2360            &["select u.* from u join v on u.i = v.i where v.x = 1"],
2361            tx_for_a,
2362            &mut query_ids,
2363        )?;
2364        subscribe_multi(
2365            &subs,
2366            &["select u.* from u join v on u.i = v.i where v.x = 1"],
2367            tx_for_b.clone(),
2368            &mut query_ids,
2369        )?;
2370
2371        // Wait for both subscriptions
2372        assert_matches!(
2373            rx_for_a.recv().await,
2374            Some(SerializableMessage::Subscription(SubscriptionMessage {
2375                result: SubscriptionResult::SubscribeMulti(_),
2376                ..
2377            }))
2378        );
2379        assert_matches!(
2380            rx_for_b.recv().await,
2381            Some(SerializableMessage::Subscription(SubscriptionMessage {
2382                result: SubscriptionResult::SubscribeMulti(_),
2383                ..
2384            }))
2385        );
2386
2387        unsubscribe_multi(&subs, tx_for_b, query_ids)?;
2388
2389        assert_matches!(
2390            rx_for_b.recv().await,
2391            Some(SerializableMessage::Subscription(SubscriptionMessage {
2392                result: SubscriptionResult::UnsubscribeMulti(_),
2393                ..
2394            }))
2395        );
2396
2397        // Insert a new row into `u`
2398        let metrics = commit_tx(&db, &subs, [], [(u_id, product![1u64, 0u64, 0u64])])?;
2399
2400        assert_tx_update_for_table(
2401            &mut rx_for_a,
2402            u_id,
2403            &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
2404            [product![1u64, 0u64, 0u64]],
2405            [],
2406        )
2407        .await;
2408
2409        // We should only have evaluated a single query
2410        assert_eq!(metrics.delta_queries_evaluated, 1);
2411        assert_eq!(metrics.delta_queries_matched, 1);
2412
2413        // Modify a matching row in `v`
2414        let metrics = commit_tx(
2415            &db,
2416            &subs,
2417            [(v_id, product![1u64, 1u64, 1u64])],
2418            [(v_id, product![1u64, 2u64, 2u64])],
2419        )?;
2420
2421        // We should only have evaluated a single query
2422        assert_eq!(metrics.delta_queries_evaluated, 1);
2423        assert_eq!(metrics.delta_queries_matched, 1);
2424
2425        Ok(())
2426    }
2427
2428    /// Test that we do not evaluate queries that return trivially empty results
2429    ///
2430    /// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel.
2431    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
2432    async fn test_query_pruning_for_empty_tables() -> anyhow::Result<()> {
2433        // Establish a client connection
2434        let (tx, mut rx) = client_connection(client_id_from_u8(1));
2435
2436        let db = relational_db()?;
2437        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
2438
2439        let schema = &[("id", AlgebraicType::U64), ("a", AlgebraicType::U64)];
2440        let indices = &[0.into()];
2441        // Create tables `t` and `s` with `(i: u64, a: u64)`.
2442        db.create_table_for_test("t", schema, indices)?;
2443        let s_id = db.create_table_for_test("s", schema, indices)?;
2444
2445        // Insert one row into `s`, but leave `t` empty.
2446        commit_tx(&db, &subs, [], [(s_id, product![0u64, 0u64])])?;
2447
2448        // Subscribe to queries that return empty results
2449        let metrics = subscribe_multi(
2450            &subs,
2451            &[
2452                "select t.* from t where a = 0",
2453                "select t.* from t join s on t.id = s.id where s.a = 0",
2454                "select s.* from t join s on t.id = s.id where t.a = 0",
2455            ],
2456            tx,
2457            &mut 0,
2458        )?;
2459
2460        assert_matches!(
2461            rx.recv().await,
2462            Some(SerializableMessage::Subscription(SubscriptionMessage {
2463                result: SubscriptionResult::SubscribeMulti(_),
2464                ..
2465            }))
2466        );
2467
2468        assert_eq!(metrics.rows_scanned, 0);
2469        assert_eq!(metrics.index_seeks, 0);
2470
2471        Ok(())
2472    }
2473
2474    /// Asserts that a subscription holds a tx handle for the entire length of its evaluation.
2475    #[test]
2476    fn test_tx_subscription_ordering() -> ResultTest<()> {
2477        let test_db = TestDB::durable()?;
2478
2479        let runtime = test_db.runtime().cloned().unwrap();
2480        let db = Arc::new(test_db.db.clone());
2481
2482        // Create table with one row
2483        let table_id = db.create_table_for_test("T", &[("a", AlgebraicType::U8)], &[])?;
2484        with_auto_commit(&db, |tx| insert(&db, tx, table_id, &product!(1_u8)).map(drop))?;
2485
2486        let (send, mut recv) = mpsc::unbounded_channel();
2487
2488        // Subscribing to T should return a single row.
2489        let db2 = db.clone();
2490        let query_handle = runtime.spawn_blocking(move || {
2491            add_subscriber(
2492                db.clone(),
2493                "select * from T",
2494                Some(Arc::new(move |tx: &_| {
2495                    // Wake up writer thread after starting the reader tx
2496                    let _ = send.send(());
2497                    // Then go to sleep
2498                    std::thread::sleep(Duration::from_secs(1));
2499                    // Assuming subscription evaluation holds a lock on the db,
2500                    // any mutations to T will necessarily occur after,
2501                    // and therefore we should only see a single row returned.
2502                    assert_eq!(1, db.iter(tx, table_id).unwrap().count());
2503                })),
2504            )
2505        });
2506
2507        // Write a second row to T concurrently with the reader thread
2508        let write_handle = runtime.spawn(async move {
2509            let _ = recv.recv().await;
2510            with_auto_commit(&db2, |tx| insert(&db2, tx, table_id, &product!(2_u8)).map(drop))
2511        });
2512
2513        runtime.block_on(write_handle)??;
2514        runtime.block_on(query_handle)??;
2515
2516        test_db.close()?;
2517
2518        Ok(())
2519    }
2520
2521    #[test]
2522    fn subs_cannot_access_private_tables() -> ResultTest<()> {
2523        let test_db = TestDB::durable()?;
2524        let db = Arc::new(test_db.db.clone());
2525
2526        // Create a public table.
2527        let indexes = &[0.into()];
2528        let cols = &[("a", AlgebraicType::U8)];
2529        let _ = db.create_table_for_test("public", cols, indexes)?;
2530
2531        // Create a private table.
2532        let _ = db.create_table_for_test_with_access("private", cols, indexes, StAccess::Private)?;
2533
2534        // We can subscribe to a public table.
2535        let subscribe = |sql| add_subscriber(db.clone(), sql, None);
2536        assert!(subscribe("SELECT * FROM public").is_ok());
2537
2538        // We cannot subscribe when a private table is mentioned,
2539        // not even when in a join where the projection doesn't mention the table,
2540        // as the mere fact of joining can leak information from the private table.
2541        for sql in [
2542            "SELECT * FROM private",
2543            // Even if the query will return no rows, we still reject it.
2544            "SELECT * FROM private WHERE false",
2545            "SELECT private.* FROM private",
2546            "SELECT public.* FROM public JOIN private ON public.a = private.a WHERE private.a = 1",
2547            "SELECT private.* FROM private JOIN public ON private.a = public.a WHERE public.a = 1",
2548        ] {
2549            assert!(subscribe(sql).is_err(),);
2550        }
2551
2552        Ok(())
2553    }
2554}