Skip to main content

nookdb_core/
notify.rs

1//! Post-commit notifier with a stable multi-observer seam.
2//!
3//! `Database::write` builds a [`CommitEvent`] from the committed
4//! transaction's touched documents and dispatches it to every
5//! registered [`CommitObserver`] on the commit-`Ok` path only
6//! (rollback/panic never dispatch). The reactive `live()` subsystem
7//! (`crate::live`) is itself just one registered observer; an external
8//! package (e.g. a future Pro `DevTools` subscription debugger) attaches
9//! as an additional observer through [`Notifier::add_observer`] WITHOUT
10//! modifying this crate — the M3 extension seam
11use std::collections::BTreeSet;
12use std::panic::{catch_unwind, AssertUnwindSafe};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex, Weak};
15
16/// What happened to one document in a committed transaction.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum ChangeOp {
19    Insert,
20    Delete,
21    // NOTE(M3 spec §8): `Update` is a documented future extension point;
22    // M2 has no update path, so it is intentionally absent in M3.
23}
24
25/// One committed document change. `doc_id` is the raw key bytes (the
26/// notifier sits at the M1 bytes boundary); M3 reactive correctness
27/// uses only [`CommitEvent::touched_collections`].
28#[derive(Debug, Clone)]
29pub struct DocChange {
30    pub collection: String,
31    pub op: ChangeOp,
32    pub doc_id: Vec<u8>,
33}
34
35/// The structured payload dispatched after a successful commit.
36#[derive(Debug, Clone)]
37pub struct CommitEvent {
38    pub changes: Vec<DocChange>,
39}
40
41impl CommitEvent {
42    #[must_use]
43    pub const fn new(changes: Vec<DocChange>) -> Self {
44        Self { changes }
45    }
46
47    /// The distinct collections touched by this commit, sorted.
48    #[must_use]
49    pub fn touched_collections(&self) -> BTreeSet<&str> {
50        self.changes.iter().map(|c| c.collection.as_str()).collect()
51    }
52}
53
54/// A passive or active observer of committed changes.
55///
56/// Implementations MUST be cheap and non-blocking (dispatch is
57/// synchronous on the committing thread) and MUST NOT rely on being
58/// the only observer or on a particular position beyond registration
59/// order.
60pub trait CommitObserver: Send + Sync {
61    fn on_commit(&self, ev: &CommitEvent);
62}
63
64type Slot = (u64, Weak<dyn CommitObserver>);
65
66struct Inner {
67    // No user/observer code runs while this lock is held (`dispatch`
68    // collects upgraded `Arc`s then drops the guard before calling
69    // `on_commit`); every critical section here (push / collect /
70    // retain) is panic-free. The mutex is therefore effectively
71    // unpoisonable, so the `Ok`-guarded lock sites silently treat a
72    // (theoretically impossible) poisoned lock as a no-op rather than
73    // panicking.
74    slots: Mutex<Vec<Slot>>,
75    next_id: AtomicU64,
76}
77
78/// Ordered, panic-isolated registry of [`CommitObserver`]s.
79#[derive(Clone)]
80pub struct Notifier {
81    inner: Arc<Inner>,
82}
83
84impl Notifier {
85    #[must_use]
86    pub fn new() -> Self {
87        Self {
88            inner: Arc::new(Inner {
89                slots: Mutex::new(Vec::new()),
90                next_id: AtomicU64::new(0),
91            }),
92        }
93    }
94
95    /// Registers `obs`. The returned [`ObserverHandle`] owns the strong
96    /// `Arc` and unregisters on drop (RAII); the registry holds only a
97    /// `Weak`, so the handle's lifetime alone controls delivery and no
98    /// `Notifier → observer → … → Notifier` strong cycle can form.
99    pub fn add_observer(&self, obs: Arc<dyn CommitObserver>) -> ObserverHandle {
100        let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
101        if let Ok(mut slots) = self.inner.slots.lock() {
102            slots.push((id, Arc::downgrade(&obs)));
103        }
104        ObserverHandle {
105            inner: Arc::downgrade(&self.inner),
106            id,
107            _strong: obs,
108        }
109    }
110
111    /// Dispatches `ev` to every live observer in registration order.
112    /// Each `on_commit` is `catch_unwind`-isolated: a panicking
113    /// observer poisons neither the caller (the committing thread) nor
114    /// any other observer.
115    pub fn dispatch(&self, ev: &CommitEvent) {
116        let observers: Vec<Arc<dyn CommitObserver>> = {
117            let Ok(slots) = self.inner.slots.lock() else {
118                return;
119            };
120            slots.iter().filter_map(|(_, w)| w.upgrade()).collect()
121        };
122        for obs in observers {
123            let _ = catch_unwind(AssertUnwindSafe(|| obs.on_commit(ev)));
124        }
125    }
126}
127
128impl Default for Notifier {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134/// RAII registration handle. Dropping it unregisters the observer.
135pub struct ObserverHandle {
136    inner: Weak<Inner>,
137    id: u64,
138    _strong: Arc<dyn CommitObserver>,
139}
140
141impl Drop for ObserverHandle {
142    fn drop(&mut self) {
143        if let Some(inner) = self.inner.upgrade() {
144            if let Ok(mut slots) = inner.slots.lock() {
145                slots.retain(|(sid, _)| *sid != self.id);
146            }
147        }
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use std::sync::atomic::{AtomicUsize, Ordering};
155    use std::sync::Arc;
156
157    struct Counter(AtomicUsize);
158    impl CommitObserver for Counter {
159        fn on_commit(&self, ev: &CommitEvent) {
160            self.0.fetch_add(ev.changes.len().max(1), Ordering::SeqCst);
161        }
162    }
163
164    struct OrderRec(Arc<std::sync::Mutex<Vec<u8>>>, u8);
165    impl CommitObserver for OrderRec {
166        fn on_commit(&self, _ev: &CommitEvent) {
167            self.0.lock().unwrap().push(self.1);
168        }
169    }
170
171    struct Panicker;
172    impl CommitObserver for Panicker {
173        fn on_commit(&self, _ev: &CommitEvent) {
174            panic!("observer must not poison dispatch");
175        }
176    }
177
178    fn ev() -> CommitEvent {
179        CommitEvent::new(vec![DocChange {
180            collection: "users".into(),
181            op: ChangeOp::Insert,
182            doc_id: b"u1".to_vec(),
183        }])
184    }
185
186    #[test]
187    fn dispatch_invokes_registered_observers_in_registration_order() {
188        let n = Notifier::new();
189        let log = Arc::new(std::sync::Mutex::new(Vec::new()));
190        let _h1 = n.add_observer(Arc::new(OrderRec(log.clone(), 1)));
191        let _h2 = n.add_observer(Arc::new(OrderRec(log.clone(), 2)));
192        n.dispatch(&ev());
193        assert_eq!(*log.lock().unwrap(), vec![1, 2]);
194    }
195
196    #[test]
197    fn observer_handle_drop_unregisters() {
198        let n = Notifier::new();
199        let c = Arc::new(Counter(AtomicUsize::new(0)));
200        let h = n.add_observer(c.clone());
201        n.dispatch(&ev());
202        drop(h);
203        n.dispatch(&ev());
204        assert_eq!(
205            c.0.load(Ordering::SeqCst),
206            1,
207            "no delivery after handle drop"
208        );
209    }
210
211    #[test]
212    fn a_panicking_observer_does_not_poison_others_or_the_caller() {
213        let n = Notifier::new();
214        let _p = n.add_observer(Arc::new(Panicker));
215        let c = Arc::new(Counter(AtomicUsize::new(0)));
216        let _h = n.add_observer(c.clone());
217        n.dispatch(&ev()); // must not unwind
218        assert_eq!(c.0.load(Ordering::SeqCst), 1, "later observer still ran");
219    }
220
221    #[test]
222    fn touched_collections_dedupes_and_collects() {
223        let e = CommitEvent::new(vec![
224            DocChange {
225                collection: "a".into(),
226                op: ChangeOp::Insert,
227                doc_id: b"1".to_vec(),
228            },
229            DocChange {
230                collection: "a".into(),
231                op: ChangeOp::Delete,
232                doc_id: b"2".to_vec(),
233            },
234            DocChange {
235                collection: "b".into(),
236                op: ChangeOp::Insert,
237                doc_id: b"3".to_vec(),
238            },
239        ]);
240        let cols: Vec<&str> = e.touched_collections().into_iter().collect();
241        assert_eq!(cols, vec!["a", "b"]); // BTreeSet → sorted, deduped
242    }
243}