use std::sync::{Arc, Weak};
pub trait Observer {
type Subject: Observable;
fn update(
&self,
state: &<Self::Subject as Observable>::State,
) -> Result<(), <Self::Subject as Observable>::Error>;
}
pub trait Observable {
type State;
type Error;
fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>);
fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NotifyStrategy {
StopOnError,
IgnoreError,
}
pub struct ObserverRegistry<T: Observable> {
observers: Vec<Weak<dyn Observer<Subject = T>>>,
}
impl<T> ObserverRegistry<T>
where
T: Observable,
{
pub fn new() -> Self {
Self {
observers: Vec::new(),
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
observers: Vec::with_capacity(capacity),
}
}
pub fn attach(&mut self, observer: Arc<dyn Observer<Subject = T>>) {
let weak = Arc::downgrade(&observer);
if !self.observers.iter().any(|item| weak.ptr_eq(item)) {
self.observers.push(weak);
}
}
pub fn detach(&mut self, observer: Arc<dyn Observer<Subject = T>>) {
let weak = Arc::downgrade(&observer);
self.observers.retain(|item| !weak.ptr_eq(item));
}
pub fn notify(
&self,
state: &<T as Observable>::State,
strategy: NotifyStrategy,
) -> Result<(), <T as Observable>::Error> {
self.observers
.iter()
.flat_map(Weak::upgrade)
.try_for_each(|observer| match observer.update(state) {
ok @ Ok(_) => ok,
err @ Err(_) => match strategy {
NotifyStrategy::StopOnError => err,
NotifyStrategy::IgnoreError => Ok(()),
},
})
}
}
impl<T> Default for ObserverRegistry<T>
where
T: Observable,
{
fn default() -> Self {
Self {
observers: Default::default(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
struct TestObservable {
registry: ObserverRegistry<Self>,
value: i32,
}
impl TestObservable {
fn new(initial_value: i32) -> Self {
Self {
registry: ObserverRegistry::new(),
value: initial_value,
}
}
fn update_value(&mut self, new_value: i32) -> Result<(), String> {
self.value = new_value;
self.registry
.notify(&self.value, NotifyStrategy::StopOnError)
}
fn get_value(&self) -> i32 {
self.value
}
}
impl Observable for TestObservable {
type State = i32;
type Error = String;
fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
self.registry.attach(observer);
}
fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
self.registry.detach(observer);
}
}
struct TestObserver {
_name: String,
last_value: AtomicUsize,
}
impl TestObserver {
fn new(name: &str) -> Self {
Self {
_name: name.to_string(),
last_value: AtomicUsize::new(0),
}
}
fn get_last_value(&self) -> usize {
self.last_value.load(Ordering::SeqCst)
}
}
impl Observer for TestObserver {
type Subject = TestObservable;
fn update(&self, value: &i32) -> Result<(), String> {
self.last_value.store(*value as usize, Ordering::SeqCst);
Ok(())
}
}
struct FailingObserver {
fail_after: usize,
call_count: AtomicUsize,
}
impl FailingObserver {
fn new(fail_after: usize) -> Self {
Self {
fail_after,
call_count: AtomicUsize::new(0),
}
}
fn get_call_count(&self) -> usize {
self.call_count.load(Ordering::SeqCst)
}
}
impl Observer for FailingObserver {
type Subject = TestObservable;
fn update(&self, _value: &i32) -> Result<(), String> {
let count = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
if count >= self.fail_after {
Err(format!("Failed after {} calls", count))
} else {
Ok(())
}
}
}
#[test]
fn test_attach_and_notify() {
let mut observable = TestObservable::new(0);
let observer = Arc::new(TestObserver::new("test"));
observable.attach(observer.clone());
assert!(observable.update_value(42).is_ok());
assert_eq!(observable.get_value(), 42);
assert_eq!(observer.get_last_value(), 42);
}
#[test]
fn test_detach() {
let mut observable = TestObservable::new(0);
let observer = Arc::new(TestObserver::new("test"));
observable.attach(observer.clone());
assert!(observable.update_value(10).is_ok());
assert_eq!(observer.get_last_value(), 10);
observable.detach(observer.clone());
assert!(observable.update_value(20).is_ok());
assert_eq!(observer.get_last_value(), 10); }
#[test]
fn test_multiple_observers() {
let mut observable = TestObservable::new(0);
let observer1 = Arc::new(TestObserver::new("observer1"));
let observer2 = Arc::new(TestObserver::new("observer2"));
observable.attach(observer1.clone());
observable.attach(observer2.clone());
assert!(observable.update_value(100).is_ok());
assert_eq!(observer1.get_last_value(), 100);
assert_eq!(observer2.get_last_value(), 100);
}
#[test]
fn test_notify_strategy_stop_on_error() {
let mut observable = TestObservable::new(0);
let failing_observer = Arc::new(FailingObserver::new(2)); let normal_observer = Arc::new(TestObserver::new("normal"));
observable.attach(failing_observer.clone());
observable.attach(normal_observer.clone());
assert!(observable.update_value(1).is_ok());
assert_eq!(failing_observer.get_call_count(), 1);
assert_eq!(normal_observer.get_last_value(), 1);
assert!(observable.update_value(2).is_err());
assert_eq!(failing_observer.get_call_count(), 2);
assert_eq!(normal_observer.get_last_value(), 1); }
#[test]
fn test_notify_strategy_ignore_error() {
struct IgnoreErrorObservable {
registry: ObserverRegistry<Self>,
value: i32,
}
struct IgnoreErrorObserver {
call_count: AtomicUsize,
}
impl IgnoreErrorObserver {
fn new() -> Self {
Self {
call_count: AtomicUsize::new(0),
}
}
fn get_call_count(&self) -> usize {
self.call_count.load(Ordering::SeqCst)
}
}
impl Observer for IgnoreErrorObserver {
type Subject = IgnoreErrorObservable;
fn update(&self, _value: &i32) -> Result<(), String> {
let count = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
if count >= 2 {
Err(format!("Failed after {} calls", count))
} else {
Ok(())
}
}
}
struct NormalObserver {
last_value: AtomicUsize,
}
impl NormalObserver {
fn new() -> Self {
Self {
last_value: AtomicUsize::new(0),
}
}
fn get_last_value(&self) -> usize {
self.last_value.load(Ordering::SeqCst)
}
}
impl Observer for NormalObserver {
type Subject = IgnoreErrorObservable;
fn update(&self, value: &i32) -> Result<(), String> {
self.last_value.store(*value as usize, Ordering::SeqCst);
Ok(())
}
}
impl IgnoreErrorObservable {
fn new(initial_value: i32) -> Self {
Self {
registry: ObserverRegistry::new(),
value: initial_value,
}
}
fn update_value(&mut self, new_value: i32) -> Result<(), String> {
self.value = new_value;
self.registry
.notify(&self.value, NotifyStrategy::IgnoreError)
}
}
impl Observable for IgnoreErrorObservable {
type State = i32;
type Error = String;
fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
self.registry.attach(observer);
}
fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
self.registry.detach(observer);
}
}
let mut observable = IgnoreErrorObservable::new(0);
let failing_observer = Arc::new(IgnoreErrorObserver::new());
let normal_observer = Arc::new(NormalObserver::new());
observable.attach(failing_observer.clone());
observable.attach(normal_observer.clone());
assert!(observable.update_value(1).is_ok());
assert_eq!(failing_observer.get_call_count(), 1);
assert_eq!(normal_observer.get_last_value(), 1);
assert!(observable.update_value(2).is_ok());
assert_eq!(failing_observer.get_call_count(), 2);
assert_eq!(normal_observer.get_last_value(), 2); }
#[test]
fn test_observer_weak_references() {
let mut observable = TestObservable::new(0);
{
let observer = Arc::new(TestObserver::new("temp"));
observable.attach(observer.clone());
assert!(observable.update_value(50).is_ok());
assert_eq!(observer.get_last_value(), 50);
}
assert!(observable.update_value(60).is_ok());
}
#[test]
fn test_duplicate_attach() {
let mut observable = TestObservable::new(0);
let observer = Arc::new(TestObserver::new("test"));
observable.attach(observer.clone());
observable.attach(observer.clone());
observable.attach(observer.clone());
assert!(observable.update_value(99).is_ok());
assert_eq!(observer.get_last_value(), 99);
}
#[test]
fn test_detach_non_existent() {
let mut observable = TestObservable::new(0);
let observer = Arc::new(TestObserver::new("test"));
let another_observer = Arc::new(TestObserver::new("another"));
observable.attach(observer.clone());
observable.detach(another_observer.clone());
assert!(observable.update_value(33).is_ok());
assert_eq!(observer.get_last_value(), 33);
}
#[test]
fn test_notify_with_no_observers() {
let mut observable = TestObservable::new(0);
assert!(observable.update_value(77).is_ok());
assert_eq!(observable.get_value(), 77);
}
}