use std::collections::BTreeSet;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, Weak};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangeOp {
Insert,
Delete,
}
#[derive(Debug, Clone)]
pub struct DocChange {
pub collection: String,
pub op: ChangeOp,
pub doc_id: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct CommitEvent {
pub changes: Vec<DocChange>,
}
impl CommitEvent {
#[must_use]
pub const fn new(changes: Vec<DocChange>) -> Self {
Self { changes }
}
#[must_use]
pub fn touched_collections(&self) -> BTreeSet<&str> {
self.changes.iter().map(|c| c.collection.as_str()).collect()
}
}
pub trait CommitObserver: Send + Sync {
fn on_commit(&self, ev: &CommitEvent);
}
type Slot = (u64, Weak<dyn CommitObserver>);
struct Inner {
slots: Mutex<Vec<Slot>>,
next_id: AtomicU64,
}
#[derive(Clone)]
pub struct Notifier {
inner: Arc<Inner>,
}
impl Notifier {
#[must_use]
pub fn new() -> Self {
Self {
inner: Arc::new(Inner {
slots: Mutex::new(Vec::new()),
next_id: AtomicU64::new(0),
}),
}
}
pub fn add_observer(&self, obs: Arc<dyn CommitObserver>) -> ObserverHandle {
let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
if let Ok(mut slots) = self.inner.slots.lock() {
slots.push((id, Arc::downgrade(&obs)));
}
ObserverHandle {
inner: Arc::downgrade(&self.inner),
id,
_strong: obs,
}
}
pub fn dispatch(&self, ev: &CommitEvent) {
let observers: Vec<Arc<dyn CommitObserver>> = {
let Ok(slots) = self.inner.slots.lock() else {
return;
};
slots.iter().filter_map(|(_, w)| w.upgrade()).collect()
};
for obs in observers {
let _ = catch_unwind(AssertUnwindSafe(|| obs.on_commit(ev)));
}
}
}
impl Default for Notifier {
fn default() -> Self {
Self::new()
}
}
pub struct ObserverHandle {
inner: Weak<Inner>,
id: u64,
_strong: Arc<dyn CommitObserver>,
}
impl Drop for ObserverHandle {
fn drop(&mut self) {
if let Some(inner) = self.inner.upgrade() {
if let Ok(mut slots) = inner.slots.lock() {
slots.retain(|(sid, _)| *sid != self.id);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
struct Counter(AtomicUsize);
impl CommitObserver for Counter {
fn on_commit(&self, ev: &CommitEvent) {
self.0.fetch_add(ev.changes.len().max(1), Ordering::SeqCst);
}
}
struct OrderRec(Arc<std::sync::Mutex<Vec<u8>>>, u8);
impl CommitObserver for OrderRec {
fn on_commit(&self, _ev: &CommitEvent) {
self.0.lock().unwrap().push(self.1);
}
}
struct Panicker;
impl CommitObserver for Panicker {
fn on_commit(&self, _ev: &CommitEvent) {
panic!("observer must not poison dispatch");
}
}
fn ev() -> CommitEvent {
CommitEvent::new(vec![DocChange {
collection: "users".into(),
op: ChangeOp::Insert,
doc_id: b"u1".to_vec(),
}])
}
#[test]
fn dispatch_invokes_registered_observers_in_registration_order() {
let n = Notifier::new();
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let _h1 = n.add_observer(Arc::new(OrderRec(log.clone(), 1)));
let _h2 = n.add_observer(Arc::new(OrderRec(log.clone(), 2)));
n.dispatch(&ev());
assert_eq!(*log.lock().unwrap(), vec![1, 2]);
}
#[test]
fn observer_handle_drop_unregisters() {
let n = Notifier::new();
let c = Arc::new(Counter(AtomicUsize::new(0)));
let h = n.add_observer(c.clone());
n.dispatch(&ev());
drop(h);
n.dispatch(&ev());
assert_eq!(
c.0.load(Ordering::SeqCst),
1,
"no delivery after handle drop"
);
}
#[test]
fn a_panicking_observer_does_not_poison_others_or_the_caller() {
let n = Notifier::new();
let _p = n.add_observer(Arc::new(Panicker));
let c = Arc::new(Counter(AtomicUsize::new(0)));
let _h = n.add_observer(c.clone());
n.dispatch(&ev()); assert_eq!(c.0.load(Ordering::SeqCst), 1, "later observer still ran");
}
#[test]
fn touched_collections_dedupes_and_collects() {
let e = CommitEvent::new(vec![
DocChange {
collection: "a".into(),
op: ChangeOp::Insert,
doc_id: b"1".to_vec(),
},
DocChange {
collection: "a".into(),
op: ChangeOp::Delete,
doc_id: b"2".to_vec(),
},
DocChange {
collection: "b".into(),
op: ChangeOp::Insert,
doc_id: b"3".to_vec(),
},
]);
let cols: Vec<&str> = e.touched_collections().into_iter().collect();
assert_eq!(cols, vec!["a", "b"]); }
}