vibesql_storage/
change_events.rs

1//! Change event broadcasting for reactive subscriptions
2//!
3//! This module provides a broadcast channel for notifying subscribers when data changes.
4//! It is designed to be lightweight and WASM-compatible.
5
6use std::sync::{Arc, Mutex, Weak};
7
8/// Default capacity for the change event channel
9pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
10
11/// Change event for external subscribers
12///
13/// Note: Only row_id is included, not full row data. This is intentional for performance -
14/// cloning full rows on every mutation would be expensive. Subscribers that need row data
15/// can re-query using the row_id.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum ChangeEvent {
18    /// A row was inserted
19    Insert {
20        /// Name of the table
21        table_name: String,
22        /// Index of the inserted row
23        row_index: usize,
24    },
25    /// A row was updated
26    Update {
27        /// Name of the table
28        table_name: String,
29        /// Index of the updated row
30        row_index: usize,
31    },
32    /// A row was deleted
33    Delete {
34        /// Name of the table
35        table_name: String,
36        /// Index of the deleted row (before deletion)
37        row_index: usize,
38    },
39}
40
41impl ChangeEvent {
42    /// Get the table name from the event
43    pub fn table_name(&self) -> &str {
44        match self {
45            ChangeEvent::Insert { table_name, .. } => table_name,
46            ChangeEvent::Update { table_name, .. } => table_name,
47            ChangeEvent::Delete { table_name, .. } => table_name,
48        }
49    }
50
51    /// Get the row index from the event
52    pub fn row_index(&self) -> usize {
53        match self {
54            ChangeEvent::Insert { row_index, .. } => *row_index,
55            ChangeEvent::Update { row_index, .. } => *row_index,
56            ChangeEvent::Delete { row_index, .. } => *row_index,
57        }
58    }
59}
60
61/// Error type for receive operations
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum RecvError {
64    /// No events available (non-blocking)
65    Empty,
66    /// Receiver has lagged behind and missed some events
67    Lagged(usize),
68    /// Channel has been closed (sender dropped)
69    Closed,
70}
71
72/// Shared state for the broadcast channel
73struct ChannelState {
74    /// Ring buffer of events
75    buffer: Vec<Option<ChangeEvent>>,
76    /// Current write position in the ring buffer
77    write_pos: usize,
78    /// Total number of events ever sent (for lag detection)
79    total_sent: usize,
80    /// Channel capacity
81    capacity: usize,
82    /// Whether the sender has been dropped
83    closed: bool,
84}
85
86impl ChannelState {
87    fn new(capacity: usize) -> Self {
88        Self { buffer: vec![None; capacity], write_pos: 0, total_sent: 0, capacity, closed: false }
89    }
90}
91
92/// Sender half of the change event broadcast channel
93///
94/// This can be cloned to create additional senders, all sharing the same channel.
95#[derive(Clone)]
96pub struct ChangeEventSender {
97    state: Arc<Mutex<ChannelState>>,
98}
99
100impl ChangeEventSender {
101    /// Send an event to all subscribers
102    ///
103    /// Returns the number of active receivers. If there are no receivers,
104    /// the event is still buffered for future subscribers.
105    pub fn send(&self, event: ChangeEvent) -> usize {
106        let mut state = self.state.lock().unwrap();
107
108        // Store the event in the ring buffer
109        let write_pos = state.write_pos;
110        let capacity = state.capacity;
111        state.buffer[write_pos] = Some(event);
112        state.write_pos = (write_pos + 1) % capacity;
113        state.total_sent += 1;
114
115        // Count active receivers by counting weak references
116        // This is an approximation since we don't track receivers directly
117        Arc::strong_count(&self.state) - 1 // -1 for the sender's own reference
118    }
119
120    /// Create a new receiver subscribed to this channel
121    pub fn subscribe(&self) -> ChangeEventReceiver {
122        let state = self.state.lock().unwrap();
123        ChangeEventReceiver {
124            state: Arc::downgrade(&self.state),
125            read_pos: state.total_sent, // Start from current position (no backlog)
126        }
127    }
128}
129
130impl std::fmt::Debug for ChangeEventSender {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        f.debug_struct("ChangeEventSender").finish_non_exhaustive()
133    }
134}
135
136/// Receiver half of the change event broadcast channel
137pub struct ChangeEventReceiver {
138    state: Weak<Mutex<ChannelState>>,
139    /// Position of next event to read (in terms of total_sent)
140    read_pos: usize,
141}
142
143impl ChangeEventReceiver {
144    /// Try to receive the next event without blocking
145    ///
146    /// Returns:
147    /// - `Ok(event)` if an event is available
148    /// - `Err(RecvError::Empty)` if no events are available
149    /// - `Err(RecvError::Lagged(n))` if n events were missed due to buffer overflow
150    /// - `Err(RecvError::Closed)` if the sender has been dropped
151    pub fn try_recv(&mut self) -> Result<ChangeEvent, RecvError> {
152        let state_arc = self.state.upgrade().ok_or(RecvError::Closed)?;
153        let state = state_arc.lock().unwrap();
154
155        // Check if we've lagged behind
156        let oldest_available = state.total_sent.saturating_sub(state.capacity);
157        if self.read_pos < oldest_available {
158            let missed = oldest_available - self.read_pos;
159            self.read_pos = oldest_available;
160            return Err(RecvError::Lagged(missed));
161        }
162
163        // Check if there are new events
164        if self.read_pos >= state.total_sent {
165            if state.closed {
166                return Err(RecvError::Closed);
167            }
168            return Err(RecvError::Empty);
169        }
170
171        // Calculate buffer index
172        let buffer_idx = self.read_pos % state.capacity;
173        let event = state.buffer[buffer_idx].clone().expect("Buffer slot should be filled");
174
175        self.read_pos += 1;
176        Ok(event)
177    }
178
179    /// Receive all available events
180    ///
181    /// Returns a vector of all events that have been published since the last read.
182    /// If the receiver has lagged, logs a warning and returns events from the oldest available.
183    pub fn recv_all(&mut self) -> Vec<ChangeEvent> {
184        let mut events = Vec::new();
185        loop {
186            match self.try_recv() {
187                Ok(event) => events.push(event),
188                Err(RecvError::Lagged(n)) => {
189                    log::warn!("Change event receiver lagged, missed {} events", n);
190                    // Continue reading from the oldest available position
191                }
192                Err(RecvError::Empty) | Err(RecvError::Closed) => break,
193            }
194        }
195        events
196    }
197}
198
199impl Clone for ChangeEventReceiver {
200    fn clone(&self) -> Self {
201        Self { state: self.state.clone(), read_pos: self.read_pos }
202    }
203}
204
205impl std::fmt::Debug for ChangeEventReceiver {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        f.debug_struct("ChangeEventReceiver").field("read_pos", &self.read_pos).finish()
208    }
209}
210
211/// Create a new broadcast channel for change events
212///
213/// # Arguments
214/// * `capacity` - Maximum number of events to buffer before old events are overwritten
215///
216/// # Returns
217/// A tuple of (sender, receiver)
218pub fn channel(capacity: usize) -> (ChangeEventSender, ChangeEventReceiver) {
219    let capacity = capacity.max(1); // Ensure at least capacity of 1
220    let state = Arc::new(Mutex::new(ChannelState::new(capacity)));
221
222    let sender = ChangeEventSender { state: state.clone() };
223    let receiver = ChangeEventReceiver { state: Arc::downgrade(&state), read_pos: 0 };
224
225    (sender, receiver)
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    #[test]
233    fn test_send_receive_single_event() {
234        let (sender, mut receiver) = channel(16);
235
236        sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: 0 });
237
238        let event = receiver.try_recv().unwrap();
239        assert!(
240            matches!(event, ChangeEvent::Insert { table_name, row_index: 0 } if table_name == "users")
241        );
242    }
243
244    #[test]
245    fn test_multiple_receivers() {
246        let (sender, mut rx1) = channel(16);
247        let mut rx2 = sender.subscribe();
248
249        sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: 0 });
250
251        // Both receivers should get the event
252        assert!(rx1.try_recv().is_ok());
253        assert!(rx2.try_recv().is_ok());
254    }
255
256    #[test]
257    fn test_empty_when_no_events() {
258        let (_sender, mut receiver) = channel(16);
259        assert_eq!(receiver.try_recv(), Err(RecvError::Empty));
260    }
261
262    #[test]
263    fn test_lagged_receiver() {
264        let (sender, mut receiver) = channel(4); // Small buffer
265
266        // Send more events than buffer can hold
267        for i in 0..10 {
268            sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: i });
269        }
270
271        // First read should report lag
272        let result = receiver.try_recv();
273        assert!(matches!(result, Err(RecvError::Lagged(_))));
274
275        // Subsequent reads should work
276        assert!(receiver.try_recv().is_ok());
277    }
278
279    #[test]
280    fn test_closed_channel() {
281        let (sender, mut receiver) = channel(16);
282        drop(sender);
283
284        assert_eq!(receiver.try_recv(), Err(RecvError::Closed));
285    }
286
287    #[test]
288    fn test_recv_all() {
289        let (sender, mut receiver) = channel(16);
290
291        sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: 0 });
292        sender.send(ChangeEvent::Update { table_name: "users".to_string(), row_index: 0 });
293        sender.send(ChangeEvent::Delete { table_name: "users".to_string(), row_index: 0 });
294
295        let events = receiver.recv_all();
296        assert_eq!(events.len(), 3);
297        assert!(matches!(events[0], ChangeEvent::Insert { .. }));
298        assert!(matches!(events[1], ChangeEvent::Update { .. }));
299        assert!(matches!(events[2], ChangeEvent::Delete { .. }));
300    }
301
302    #[test]
303    fn test_event_accessors() {
304        let event = ChangeEvent::Insert { table_name: "products".to_string(), row_index: 42 };
305        assert_eq!(event.table_name(), "products");
306        assert_eq!(event.row_index(), 42);
307    }
308
309    #[test]
310    fn test_new_subscriber_starts_from_current() {
311        let (sender, _rx1) = channel(16);
312
313        // Send some events before second subscriber joins
314        sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: 0 });
315        sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: 1 });
316
317        // New subscriber should not see old events
318        let mut rx2 = sender.subscribe();
319        assert_eq!(rx2.try_recv(), Err(RecvError::Empty));
320
321        // But should see new events
322        sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: 2 });
323        let event = rx2.try_recv().unwrap();
324        assert_eq!(event.row_index(), 2);
325    }
326}