1use std::collections::BTreeSet;
12use std::panic::{catch_unwind, AssertUnwindSafe};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex, Weak};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum ChangeOp {
19 Insert,
20 Delete,
21 }
24
25#[derive(Debug, Clone)]
29pub struct DocChange {
30 pub collection: String,
31 pub op: ChangeOp,
32 pub doc_id: Vec<u8>,
33}
34
35#[derive(Debug, Clone)]
37pub struct CommitEvent {
38 pub changes: Vec<DocChange>,
39}
40
41impl CommitEvent {
42 #[must_use]
43 pub const fn new(changes: Vec<DocChange>) -> Self {
44 Self { changes }
45 }
46
47 #[must_use]
49 pub fn touched_collections(&self) -> BTreeSet<&str> {
50 self.changes.iter().map(|c| c.collection.as_str()).collect()
51 }
52}
53
54pub trait CommitObserver: Send + Sync {
61 fn on_commit(&self, ev: &CommitEvent);
62}
63
64type Slot = (u64, Weak<dyn CommitObserver>);
65
66struct Inner {
67 slots: Mutex<Vec<Slot>>,
75 next_id: AtomicU64,
76}
77
78#[derive(Clone)]
80pub struct Notifier {
81 inner: Arc<Inner>,
82}
83
84impl Notifier {
85 #[must_use]
86 pub fn new() -> Self {
87 Self {
88 inner: Arc::new(Inner {
89 slots: Mutex::new(Vec::new()),
90 next_id: AtomicU64::new(0),
91 }),
92 }
93 }
94
95 pub fn add_observer(&self, obs: Arc<dyn CommitObserver>) -> ObserverHandle {
100 let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
101 if let Ok(mut slots) = self.inner.slots.lock() {
102 slots.push((id, Arc::downgrade(&obs)));
103 }
104 ObserverHandle {
105 inner: Arc::downgrade(&self.inner),
106 id,
107 _strong: obs,
108 }
109 }
110
111 pub fn dispatch(&self, ev: &CommitEvent) {
116 let observers: Vec<Arc<dyn CommitObserver>> = {
117 let Ok(slots) = self.inner.slots.lock() else {
118 return;
119 };
120 slots.iter().filter_map(|(_, w)| w.upgrade()).collect()
121 };
122 for obs in observers {
123 let _ = catch_unwind(AssertUnwindSafe(|| obs.on_commit(ev)));
124 }
125 }
126}
127
128impl Default for Notifier {
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134pub struct ObserverHandle {
136 inner: Weak<Inner>,
137 id: u64,
138 _strong: Arc<dyn CommitObserver>,
139}
140
141impl Drop for ObserverHandle {
142 fn drop(&mut self) {
143 if let Some(inner) = self.inner.upgrade() {
144 if let Ok(mut slots) = inner.slots.lock() {
145 slots.retain(|(sid, _)| *sid != self.id);
146 }
147 }
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use std::sync::atomic::{AtomicUsize, Ordering};
155 use std::sync::Arc;
156
157 struct Counter(AtomicUsize);
158 impl CommitObserver for Counter {
159 fn on_commit(&self, ev: &CommitEvent) {
160 self.0.fetch_add(ev.changes.len().max(1), Ordering::SeqCst);
161 }
162 }
163
164 struct OrderRec(Arc<std::sync::Mutex<Vec<u8>>>, u8);
165 impl CommitObserver for OrderRec {
166 fn on_commit(&self, _ev: &CommitEvent) {
167 self.0.lock().unwrap().push(self.1);
168 }
169 }
170
171 struct Panicker;
172 impl CommitObserver for Panicker {
173 fn on_commit(&self, _ev: &CommitEvent) {
174 panic!("observer must not poison dispatch");
175 }
176 }
177
178 fn ev() -> CommitEvent {
179 CommitEvent::new(vec![DocChange {
180 collection: "users".into(),
181 op: ChangeOp::Insert,
182 doc_id: b"u1".to_vec(),
183 }])
184 }
185
186 #[test]
187 fn dispatch_invokes_registered_observers_in_registration_order() {
188 let n = Notifier::new();
189 let log = Arc::new(std::sync::Mutex::new(Vec::new()));
190 let _h1 = n.add_observer(Arc::new(OrderRec(log.clone(), 1)));
191 let _h2 = n.add_observer(Arc::new(OrderRec(log.clone(), 2)));
192 n.dispatch(&ev());
193 assert_eq!(*log.lock().unwrap(), vec![1, 2]);
194 }
195
196 #[test]
197 fn observer_handle_drop_unregisters() {
198 let n = Notifier::new();
199 let c = Arc::new(Counter(AtomicUsize::new(0)));
200 let h = n.add_observer(c.clone());
201 n.dispatch(&ev());
202 drop(h);
203 n.dispatch(&ev());
204 assert_eq!(
205 c.0.load(Ordering::SeqCst),
206 1,
207 "no delivery after handle drop"
208 );
209 }
210
211 #[test]
212 fn a_panicking_observer_does_not_poison_others_or_the_caller() {
213 let n = Notifier::new();
214 let _p = n.add_observer(Arc::new(Panicker));
215 let c = Arc::new(Counter(AtomicUsize::new(0)));
216 let _h = n.add_observer(c.clone());
217 n.dispatch(&ev()); assert_eq!(c.0.load(Ordering::SeqCst), 1, "later observer still ran");
219 }
220
221 #[test]
222 fn touched_collections_dedupes_and_collects() {
223 let e = CommitEvent::new(vec![
224 DocChange {
225 collection: "a".into(),
226 op: ChangeOp::Insert,
227 doc_id: b"1".to_vec(),
228 },
229 DocChange {
230 collection: "a".into(),
231 op: ChangeOp::Delete,
232 doc_id: b"2".to_vec(),
233 },
234 DocChange {
235 collection: "b".into(),
236 op: ChangeOp::Insert,
237 doc_id: b"3".to_vec(),
238 },
239 ]);
240 let cols: Vec<&str> = e.touched_collections().into_iter().collect();
241 assert_eq!(cols, vec!["a", "b"]); }
243}