1use std::sync::{Arc, Mutex, Weak};
7
8pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum ChangeEvent {
18 Insert {
20 table_name: String,
22 row_index: usize,
24 },
25 Update {
27 table_name: String,
29 row_index: usize,
31 },
32 Delete {
34 table_name: String,
36 row_index: usize,
38 },
39}
40
41impl ChangeEvent {
42 pub fn table_name(&self) -> &str {
44 match self {
45 ChangeEvent::Insert { table_name, .. } => table_name,
46 ChangeEvent::Update { table_name, .. } => table_name,
47 ChangeEvent::Delete { table_name, .. } => table_name,
48 }
49 }
50
51 pub fn row_index(&self) -> usize {
53 match self {
54 ChangeEvent::Insert { row_index, .. } => *row_index,
55 ChangeEvent::Update { row_index, .. } => *row_index,
56 ChangeEvent::Delete { row_index, .. } => *row_index,
57 }
58 }
59}
60
61#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum RecvError {
64 Empty,
66 Lagged(usize),
68 Closed,
70}
71
72struct ChannelState {
74 buffer: Vec<Option<ChangeEvent>>,
76 write_pos: usize,
78 total_sent: usize,
80 capacity: usize,
82 closed: bool,
84}
85
86impl ChannelState {
87 fn new(capacity: usize) -> Self {
88 Self { buffer: vec![None; capacity], write_pos: 0, total_sent: 0, capacity, closed: false }
89 }
90}
91
92#[derive(Clone)]
96pub struct ChangeEventSender {
97 state: Arc<Mutex<ChannelState>>,
98}
99
100impl ChangeEventSender {
101 pub fn send(&self, event: ChangeEvent) -> usize {
106 let mut state = self.state.lock().unwrap();
107
108 let write_pos = state.write_pos;
110 let capacity = state.capacity;
111 state.buffer[write_pos] = Some(event);
112 state.write_pos = (write_pos + 1) % capacity;
113 state.total_sent += 1;
114
115 Arc::strong_count(&self.state) - 1 }
119
120 pub fn subscribe(&self) -> ChangeEventReceiver {
122 let state = self.state.lock().unwrap();
123 ChangeEventReceiver {
124 state: Arc::downgrade(&self.state),
125 read_pos: state.total_sent, }
127 }
128}
129
130impl std::fmt::Debug for ChangeEventSender {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 f.debug_struct("ChangeEventSender").finish_non_exhaustive()
133 }
134}
135
136pub struct ChangeEventReceiver {
138 state: Weak<Mutex<ChannelState>>,
139 read_pos: usize,
141}
142
143impl ChangeEventReceiver {
144 pub fn try_recv(&mut self) -> Result<ChangeEvent, RecvError> {
152 let state_arc = self.state.upgrade().ok_or(RecvError::Closed)?;
153 let state = state_arc.lock().unwrap();
154
155 let oldest_available = state.total_sent.saturating_sub(state.capacity);
157 if self.read_pos < oldest_available {
158 let missed = oldest_available - self.read_pos;
159 self.read_pos = oldest_available;
160 return Err(RecvError::Lagged(missed));
161 }
162
163 if self.read_pos >= state.total_sent {
165 if state.closed {
166 return Err(RecvError::Closed);
167 }
168 return Err(RecvError::Empty);
169 }
170
171 let buffer_idx = self.read_pos % state.capacity;
173 let event = state.buffer[buffer_idx].clone().expect("Buffer slot should be filled");
174
175 self.read_pos += 1;
176 Ok(event)
177 }
178
179 pub fn recv_all(&mut self) -> Vec<ChangeEvent> {
184 let mut events = Vec::new();
185 loop {
186 match self.try_recv() {
187 Ok(event) => events.push(event),
188 Err(RecvError::Lagged(n)) => {
189 log::warn!("Change event receiver lagged, missed {} events", n);
190 }
192 Err(RecvError::Empty) | Err(RecvError::Closed) => break,
193 }
194 }
195 events
196 }
197}
198
199impl Clone for ChangeEventReceiver {
200 fn clone(&self) -> Self {
201 Self { state: self.state.clone(), read_pos: self.read_pos }
202 }
203}
204
205impl std::fmt::Debug for ChangeEventReceiver {
206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 f.debug_struct("ChangeEventReceiver").field("read_pos", &self.read_pos).finish()
208 }
209}
210
211pub fn channel(capacity: usize) -> (ChangeEventSender, ChangeEventReceiver) {
219 let capacity = capacity.max(1); let state = Arc::new(Mutex::new(ChannelState::new(capacity)));
221
222 let sender = ChangeEventSender { state: state.clone() };
223 let receiver = ChangeEventReceiver { state: Arc::downgrade(&state), read_pos: 0 };
224
225 (sender, receiver)
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 #[test]
233 fn test_send_receive_single_event() {
234 let (sender, mut receiver) = channel(16);
235
236 sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: 0 });
237
238 let event = receiver.try_recv().unwrap();
239 assert!(
240 matches!(event, ChangeEvent::Insert { table_name, row_index: 0 } if table_name == "users")
241 );
242 }
243
244 #[test]
245 fn test_multiple_receivers() {
246 let (sender, mut rx1) = channel(16);
247 let mut rx2 = sender.subscribe();
248
249 sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: 0 });
250
251 assert!(rx1.try_recv().is_ok());
253 assert!(rx2.try_recv().is_ok());
254 }
255
256 #[test]
257 fn test_empty_when_no_events() {
258 let (_sender, mut receiver) = channel(16);
259 assert_eq!(receiver.try_recv(), Err(RecvError::Empty));
260 }
261
262 #[test]
263 fn test_lagged_receiver() {
264 let (sender, mut receiver) = channel(4); for i in 0..10 {
268 sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: i });
269 }
270
271 let result = receiver.try_recv();
273 assert!(matches!(result, Err(RecvError::Lagged(_))));
274
275 assert!(receiver.try_recv().is_ok());
277 }
278
279 #[test]
280 fn test_closed_channel() {
281 let (sender, mut receiver) = channel(16);
282 drop(sender);
283
284 assert_eq!(receiver.try_recv(), Err(RecvError::Closed));
285 }
286
287 #[test]
288 fn test_recv_all() {
289 let (sender, mut receiver) = channel(16);
290
291 sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: 0 });
292 sender.send(ChangeEvent::Update { table_name: "users".to_string(), row_index: 0 });
293 sender.send(ChangeEvent::Delete { table_name: "users".to_string(), row_index: 0 });
294
295 let events = receiver.recv_all();
296 assert_eq!(events.len(), 3);
297 assert!(matches!(events[0], ChangeEvent::Insert { .. }));
298 assert!(matches!(events[1], ChangeEvent::Update { .. }));
299 assert!(matches!(events[2], ChangeEvent::Delete { .. }));
300 }
301
302 #[test]
303 fn test_event_accessors() {
304 let event = ChangeEvent::Insert { table_name: "products".to_string(), row_index: 42 };
305 assert_eq!(event.table_name(), "products");
306 assert_eq!(event.row_index(), 42);
307 }
308
309 #[test]
310 fn test_new_subscriber_starts_from_current() {
311 let (sender, _rx1) = channel(16);
312
313 sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: 0 });
315 sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: 1 });
316
317 let mut rx2 = sender.subscribe();
319 assert_eq!(rx2.try_recv(), Err(RecvError::Empty));
320
321 sender.send(ChangeEvent::Insert { table_name: "users".to_string(), row_index: 2 });
323 let event = rx2.try_recv().unwrap();
324 assert_eq!(event.row_index(), 2);
325 }
326}