1use std::sync::{Arc, Mutex};
16
17#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct CommitEvent {
21 pub stream: String,
22 pub writer: String,
23 pub seq: u64,
24 pub content_hash: [u8; 32],
25}
26
27type Subscriber = Box<dyn Fn(&CommitEvent) + Send + Sync>;
28
29#[derive(Clone, Default)]
32pub struct Doorbell {
33 subscribers: Arc<Mutex<Vec<Subscriber>>>,
34}
35
36impl Doorbell {
37 pub fn new() -> Self {
38 Self::default()
39 }
40
41 pub fn subscribe<F>(&self, f: F)
43 where
44 F: Fn(&CommitEvent) + Send + Sync + 'static,
45 {
46 self.subscribers
47 .lock()
48 .expect("doorbell mutex poisoned")
49 .push(Box::new(f));
50 }
51
52 pub fn ring(&self, event: &CommitEvent) {
55 for sub in self
56 .subscribers
57 .lock()
58 .expect("doorbell mutex poisoned")
59 .iter()
60 {
61 sub(event);
62 }
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use super::*;
69
70 #[test]
71 fn delivers_events_to_subscribers() {
72 let bell = Doorbell::new();
73 let seen: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));
74 let seen_clone = Arc::clone(&seen);
75 bell.subscribe(move |e| seen_clone.lock().unwrap().push(e.seq));
76
77 bell.ring(&CommitEvent {
78 stream: "conv:x".into(),
79 writer: "alice".into(),
80 seq: 1,
81 content_hash: [0u8; 32],
82 });
83 bell.ring(&CommitEvent {
84 stream: "conv:x".into(),
85 writer: "alice".into(),
86 seq: 2,
87 content_hash: [0u8; 32],
88 });
89
90 assert_eq!(*seen.lock().unwrap(), vec![1, 2]);
91 }
92
93 #[test]
94 fn multiple_subscribers_all_fire() {
95 let bell = Doorbell::new();
96 let count = Arc::new(Mutex::new(0u32));
97 for _ in 0..3 {
98 let c = Arc::clone(&count);
99 bell.subscribe(move |_| *c.lock().unwrap() += 1);
100 }
101 bell.ring(&CommitEvent {
102 stream: "s".into(),
103 writer: "w".into(),
104 seq: 1,
105 content_hash: [0u8; 32],
106 });
107 assert_eq!(*count.lock().unwrap(), 3);
108 }
109}