use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Weak;
pub trait Observer: Send + Sync {
fn update(&self);
}
pub trait Observable: Send + Sync {
fn register_observer(&self, observer: Weak<dyn Observer + Send + Sync>);
fn notify_observers(&self);
}
#[derive(Default)]
pub struct ObservableBase {
observers: Mutex<Vec<Weak<dyn Observer + Send + Sync>>>,
}
impl std::fmt::Debug for ObservableBase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let n = self
.observers
.lock()
.map(|obs| obs.len())
.unwrap_or_default();
f.debug_struct("ObservableBase")
.field("observer_count", &n)
.finish()
}
}
impl Clone for ObservableBase {
fn clone(&self) -> Self {
Self::new()
}
}
impl ObservableBase {
pub fn new() -> Self {
Self {
observers: Mutex::new(Vec::new()),
}
}
pub fn observer_count(&self) -> usize {
let mut obs = self.observers.lock().expect("observable poisoned");
obs.retain(|w| w.strong_count() > 0);
obs.len()
}
}
impl Observable for ObservableBase {
fn register_observer(&self, observer: Weak<dyn Observer + Send + Sync>) {
let mut obs = self.observers.lock().expect("observable poisoned");
obs.retain(|w| w.strong_count() > 0);
obs.push(observer);
}
fn notify_observers(&self) {
let mut alive: Vec<Arc<dyn Observer + Send + Sync>> = Vec::new();
{
let mut obs = self.observers.lock().expect("observable poisoned");
obs.retain(|w| {
if let Some(strong) = w.upgrade() {
alive.push(strong);
true
} else {
false
}
});
}
for obs in alive {
obs.update();
}
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use super::*;
struct Counter(AtomicUsize);
impl Observer for Counter {
fn update(&self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn notifications_are_delivered() {
let obs_base = ObservableBase::new();
let counter = Arc::new(Counter(AtomicUsize::new(0)));
obs_base.register_observer(Arc::downgrade(&counter) as Weak<dyn Observer + Send + Sync>);
obs_base.notify_observers();
obs_base.notify_observers();
assert_eq!(counter.0.load(Ordering::SeqCst), 2);
}
#[test]
fn dropped_observers_are_pruned() {
let obs_base = ObservableBase::new();
{
let transient = Arc::new(Counter(AtomicUsize::new(0)));
obs_base.register_observer(Arc::downgrade(&transient) as Weak<dyn Observer + Send + Sync>);
assert_eq!(obs_base.observer_count(), 1);
}
assert_eq!(obs_base.observer_count(), 0);
}
}