vibesql_storage/database/change_events_api.rs
1// ============================================================================
2// Change Event Broadcasting API (Reactive Subscriptions)
3// ============================================================================
4//
5// This module provides change event broadcasting methods for the Database struct.
6// Enables reactive subscriptions for data change notifications.
7
8use super::Database;
9use crate::change_events::{ChangeEvent, ChangeEventReceiver};
10
11impl Database {
12 // ============================================================================
13 // Change Event Broadcasting (Reactive Subscriptions)
14 // ============================================================================
15
16 /// Enable change event broadcasting
17 ///
18 /// Creates a broadcast channel for notifying subscribers when data changes.
19 /// Returns a receiver for the channel.
20 ///
21 /// # Arguments
22 /// * `capacity` - Maximum number of events to buffer before old events are overwritten
23 ///
24 /// # Example
25 /// ```text
26 /// let mut db = Database::new();
27 /// let mut rx = db.enable_change_events(1024);
28 ///
29 /// // Insert some data
30 /// db.insert_row("users", row)?;
31 ///
32 /// // Receive change events
33 /// for event in rx.recv_all() {
34 /// println!("Change: {:?}", event);
35 /// }
36 /// ```
37 pub fn enable_change_events(&mut self, capacity: usize) -> ChangeEventReceiver {
38 let (sender, receiver) = crate::change_events::channel(capacity);
39 self.change_sender = Some(sender);
40 receiver
41 }
42
43 /// Subscribe to change events
44 ///
45 /// Returns a new receiver for change events if broadcasting is enabled,
46 /// or None if `enable_change_events()` has not been called.
47 ///
48 /// # Example
49 /// ```text
50 /// // Enable broadcasting
51 /// db.enable_change_events(1024);
52 ///
53 /// // Create additional subscribers
54 /// let rx1 = db.subscribe_changes().unwrap();
55 /// let rx2 = db.subscribe_changes().unwrap();
56 /// ```
57 pub fn subscribe_changes(&self) -> Option<ChangeEventReceiver> {
58 self.change_sender.as_ref().map(|s| s.subscribe())
59 }
60
61 /// Check if change event broadcasting is enabled
62 pub fn change_events_enabled(&self) -> bool {
63 self.change_sender.is_some()
64 }
65
66 /// Broadcast a change event to all subscribers (internal use)
67 pub(super) fn broadcast_change(&self, event: ChangeEvent) {
68 if let Some(sender) = &self.change_sender {
69 let _ = sender.send(event);
70 }
71 }
72
73 /// Notify subscribers of an update event
74 ///
75 /// This should be called by the executor after successfully updating a row.
76 /// The storage layer broadcasts the event to any subscribers.
77 ///
78 /// # Arguments
79 /// * `table_name` - Name of the table that was modified
80 /// * `row_index` - Index of the row that was updated
81 pub fn notify_update(&self, table_name: &str, row_index: usize) {
82 self.broadcast_change(ChangeEvent::Update {
83 table_name: table_name.to_string(),
84 row_index,
85 });
86 }
87
88 /// Notify subscribers of a delete event
89 ///
90 /// This should be called by the executor after successfully deleting rows.
91 /// The storage layer broadcasts the event to any subscribers.
92 ///
93 /// # Arguments
94 /// * `table_name` - Name of the table that was modified
95 /// * `row_indices` - Indices of rows that were deleted (before deletion)
96 pub fn notify_deletes(&self, table_name: &str, row_indices: &[usize]) {
97 for &row_index in row_indices {
98 self.broadcast_change(ChangeEvent::Delete {
99 table_name: table_name.to_string(),
100 row_index,
101 });
102 }
103 }
104}