vibesql_server/subscription/manager/events.rs
1//! Change event handling and notification for subscriptions.
2
3use std::sync::Arc;
4
5use tracing::{debug, trace, warn};
6use vibesql_storage::change_events::RecvError;
7use vibesql_storage::Database;
8
9use super::SubscriptionManager;
10use crate::subscription::SubscriptionId;
11
12impl SubscriptionManager {
13 /// Find all subscriptions affected by a change to a given table
14 ///
15 /// This is the core lookup operation for fanout during change handling.
16 /// Uses the table index for O(1) lookup of the subscription ID set.
17 ///
18 /// # Arguments
19 ///
20 /// * `table_name` - The table that changed
21 ///
22 /// # Returns
23 ///
24 /// Vector of subscription IDs that depend on this table
25 pub fn find_affected_subscriptions(&self, table_name: &str) -> Vec<SubscriptionId> {
26 let table = table_name.to_lowercase();
27 self.table_index.get(&table).map(|ids| ids.iter().copied().collect()).unwrap_or_default()
28 }
29
30 /// Handle a change event from the storage layer
31 ///
32 /// Finds all subscriptions affected by the change and checks if their
33 /// results have changed. Sends notifications for changed results.
34 ///
35 /// # Arguments
36 ///
37 /// * `event` - The change event to process (from storage layer)
38 /// * `db` - Database to re-execute queries against
39 pub async fn handle_change(&self, event: vibesql_storage::ChangeEvent, db: &Database) {
40 let table = event.table_name();
41
42 trace!(
43 table = %table,
44 event = ?event,
45 "Processing change event from storage"
46 );
47
48 // Find subscriptions affected by this table
49 let affected_ids = self.find_affected_subscriptions(table);
50
51 if affected_ids.is_empty() {
52 trace!(table = %table, "No subscriptions affected");
53 return;
54 }
55
56 debug!(
57 table = %table,
58 affected_count = affected_ids.len(),
59 "Found affected subscriptions"
60 );
61
62 // Check each affected subscription
63 for id in affected_ids {
64 self.check_and_notify(id, db).await;
65 }
66 }
67
68 /// Check a subscription and notify if results changed
69 ///
70 /// This method re-executes the subscription query, computes the delta
71 /// from the previous result, and sends either a Delta or Full update
72 /// to the subscriber.
73 async fn check_and_notify(&self, id: SubscriptionId, db: &Database) {
74 // Get mutable reference to subscription
75 let mut sub_ref = match self.subscriptions.get_mut(&id) {
76 Some(sub) => sub,
77 None => {
78 trace!(subscription_id = %id, "Subscription not found (may have been removed)");
79 return;
80 }
81 };
82
83 let subscription = sub_ref.value_mut();
84
85 // Try to execute with retry logic
86 self.execute_with_retry(subscription, db, id).await;
87 }
88
89 /// Run the subscription manager event loop
90 ///
91 /// Listens for change events from the storage layer and processes them.
92 /// This method runs indefinitely until the change channel is closed.
93 ///
94 /// # Arguments
95 ///
96 /// * `db` - Database reference for re-executing subscription queries
97 ///
98 /// # Note
99 ///
100 /// This method should be spawned as a tokio task at server startup using `tokio::spawn`.
101 /// It will poll the change receiver and handle events until closed.
102 pub async fn run_event_loop(
103 &self,
104 mut change_rx: vibesql_storage::ChangeEventReceiver,
105 db: Arc<Database>,
106 ) {
107 loop {
108 match change_rx.try_recv() {
109 Ok(event) => {
110 self.handle_change(event, &db).await;
111 }
112 Err(RecvError::Lagged(n)) => {
113 warn!(lagged_count = n, "SubscriptionManager lagged behind change events");
114 }
115 Err(RecvError::Closed) => {
116 debug!("Change event channel closed, stopping subscription manager");
117 break;
118 }
119 Err(RecvError::Empty) => {
120 // No events available, yield to other tasks
121 tokio::task::yield_now().await;
122 }
123 }
124 }
125 }
126}