vibesql_storage/database/
change_events_api.rs

1// ============================================================================
2// Change Event Broadcasting API (Reactive Subscriptions)
3// ============================================================================
4//
5// This module provides change event broadcasting methods for the Database struct.
6// Enables reactive subscriptions for data change notifications.
7
8use super::Database;
9use crate::change_events::{ChangeEvent, ChangeEventReceiver};
10
11impl Database {
12    // ============================================================================
13    // Change Event Broadcasting (Reactive Subscriptions)
14    // ============================================================================
15
16    /// Enable change event broadcasting
17    ///
18    /// Creates a broadcast channel for notifying subscribers when data changes.
19    /// Returns a receiver for the channel.
20    ///
21    /// # Arguments
22    /// * `capacity` - Maximum number of events to buffer before old events are overwritten
23    ///
24    /// # Example
25    /// ```text
26    /// let mut db = Database::new();
27    /// let mut rx = db.enable_change_events(1024);
28    ///
29    /// // Insert some data
30    /// db.insert_row("users", row)?;
31    ///
32    /// // Receive change events
33    /// for event in rx.recv_all() {
34    ///     println!("Change: {:?}", event);
35    /// }
36    /// ```
37    pub fn enable_change_events(&mut self, capacity: usize) -> ChangeEventReceiver {
38        let (sender, receiver) = crate::change_events::channel(capacity);
39        self.change_sender = Some(sender);
40        receiver
41    }
42
43    /// Subscribe to change events
44    ///
45    /// Returns a new receiver for change events if broadcasting is enabled,
46    /// or None if `enable_change_events()` has not been called.
47    ///
48    /// # Example
49    /// ```text
50    /// // Enable broadcasting
51    /// db.enable_change_events(1024);
52    ///
53    /// // Create additional subscribers
54    /// let rx1 = db.subscribe_changes().unwrap();
55    /// let rx2 = db.subscribe_changes().unwrap();
56    /// ```
57    pub fn subscribe_changes(&self) -> Option<ChangeEventReceiver> {
58        self.change_sender.as_ref().map(|s| s.subscribe())
59    }
60
61    /// Check if change event broadcasting is enabled
62    pub fn change_events_enabled(&self) -> bool {
63        self.change_sender.is_some()
64    }
65
66    /// Broadcast a change event to all subscribers (internal use)
67    pub(super) fn broadcast_change(&self, event: ChangeEvent) {
68        if let Some(sender) = &self.change_sender {
69            let _ = sender.send(event);
70        }
71    }
72
73    /// Notify subscribers of an update event
74    ///
75    /// This should be called by the executor after successfully updating a row.
76    /// The storage layer broadcasts the event to any subscribers.
77    ///
78    /// # Arguments
79    /// * `table_name` - Name of the table that was modified
80    /// * `row_index` - Index of the row that was updated
81    pub fn notify_update(&self, table_name: &str, row_index: usize) {
82        self.broadcast_change(ChangeEvent::Update {
83            table_name: table_name.to_string(),
84            row_index,
85        });
86    }
87
88    /// Notify subscribers of a delete event
89    ///
90    /// This should be called by the executor after successfully deleting rows.
91    /// The storage layer broadcasts the event to any subscribers.
92    ///
93    /// # Arguments
94    /// * `table_name` - Name of the table that was modified
95    /// * `row_indices` - Indices of rows that were deleted (before deletion)
96    pub fn notify_deletes(&self, table_name: &str, row_indices: &[usize]) {
97        for &row_index in row_indices {
98            self.broadcast_change(ChangeEvent::Delete {
99                table_name: table_name.to_string(),
100                row_index,
101            });
102        }
103    }
104}