Skip to main content

vibesql_server/subscription/manager/
events.rs

1//! Change event handling and notification for subscriptions.
2
3use std::sync::Arc;
4
5use tracing::{debug, trace, warn};
6use vibesql_storage::change_events::RecvError;
7use vibesql_storage::Database;
8
9use super::SubscriptionManager;
10use crate::subscription::SubscriptionId;
11
12impl SubscriptionManager {
13    /// Find all subscriptions affected by a change to a given table
14    ///
15    /// This is the core lookup operation for fanout during change handling.
16    /// Uses the table index for O(1) lookup of the subscription ID set.
17    ///
18    /// # Arguments
19    ///
20    /// * `table_name` - The table that changed
21    ///
22    /// # Returns
23    ///
24    /// Vector of subscription IDs that depend on this table
25    pub fn find_affected_subscriptions(&self, table_name: &str) -> Vec<SubscriptionId> {
26        let table = table_name.to_lowercase();
27        self.table_index.get(&table).map(|ids| ids.iter().copied().collect()).unwrap_or_default()
28    }
29
30    /// Handle a change event from the storage layer
31    ///
32    /// Finds all subscriptions affected by the change and checks if their
33    /// results have changed. Sends notifications for changed results.
34    ///
35    /// # Arguments
36    ///
37    /// * `event` - The change event to process (from storage layer)
38    /// * `db` - Database to re-execute queries against
39    pub async fn handle_change(&self, event: vibesql_storage::ChangeEvent, db: &Database) {
40        let table = event.table_name();
41
42        trace!(
43            table = %table,
44            event = ?event,
45            "Processing change event from storage"
46        );
47
48        // Find subscriptions affected by this table
49        let affected_ids = self.find_affected_subscriptions(table);
50
51        if affected_ids.is_empty() {
52            trace!(table = %table, "No subscriptions affected");
53            return;
54        }
55
56        debug!(
57            table = %table,
58            affected_count = affected_ids.len(),
59            "Found affected subscriptions"
60        );
61
62        // Check each affected subscription
63        for id in affected_ids {
64            self.check_and_notify(id, db).await;
65        }
66    }
67
68    /// Check a subscription and notify if results changed
69    ///
70    /// This method re-executes the subscription query, computes the delta
71    /// from the previous result, and sends either a Delta or Full update
72    /// to the subscriber.
73    async fn check_and_notify(&self, id: SubscriptionId, db: &Database) {
74        // Get mutable reference to subscription
75        let mut sub_ref = match self.subscriptions.get_mut(&id) {
76            Some(sub) => sub,
77            None => {
78                trace!(subscription_id = %id, "Subscription not found (may have been removed)");
79                return;
80            }
81        };
82
83        let subscription = sub_ref.value_mut();
84
85        // Try to execute with retry logic
86        self.execute_with_retry(subscription, db, id).await;
87    }
88
89    /// Run the subscription manager event loop
90    ///
91    /// Listens for change events from the storage layer and processes them.
92    /// This method runs indefinitely until the change channel is closed.
93    ///
94    /// # Arguments
95    ///
96    /// * `db` - Database reference for re-executing subscription queries
97    ///
98    /// # Note
99    ///
100    /// This method should be spawned as a tokio task at server startup using `tokio::spawn`.
101    /// It will poll the change receiver and handle events until closed.
102    pub async fn run_event_loop(
103        &self,
104        mut change_rx: vibesql_storage::ChangeEventReceiver,
105        db: Arc<Database>,
106    ) {
107        loop {
108            match change_rx.try_recv() {
109                Ok(event) => {
110                    self.handle_change(event, &db).await;
111                }
112                Err(RecvError::Lagged(n)) => {
113                    warn!(lagged_count = n, "SubscriptionManager lagged behind change events");
114                }
115                Err(RecvError::Closed) => {
116                    debug!("Change event channel closed, stopping subscription manager");
117                    break;
118                }
119                Err(RecvError::Empty) => {
120                    // No events available, yield to other tasks
121                    tokio::task::yield_now().await;
122                }
123            }
124        }
125    }
126}