1use std::sync::{Arc, Weak};
2
3pub trait Observer {
53 type Subject: Observable;
57
58 fn update(
80 &self,
81 state: &<Self::Subject as Observable>::State,
82 ) -> Result<(), <Self::Subject as Observable>::Error>;
83}
84
85pub trait Observable {
133 type State;
137
138 type Error;
142
143 fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>);
151
152 fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>);
160}
161
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum NotifyStrategy {
167 StopOnError,
171
172 IgnoreError,
177}
178
179pub struct ObserverRegistry<T: Observable> {
216 observers: Vec<Weak<dyn Observer<Subject = T>>>,
221}
222
223impl<T> ObserverRegistry<T>
224where
225 T: Observable,
226{
227 pub fn new() -> Self {
233 Self {
234 observers: Vec::new(),
235 }
236 }
237
238 pub fn with_capacity(capacity: usize) -> Self {
253 Self {
254 observers: Vec::with_capacity(capacity),
255 }
256 }
257
258 pub fn attach(&mut self, observer: Arc<dyn Observer<Subject = T>>) {
273 let weak = Arc::downgrade(&observer);
274 if !self.observers.iter().any(|item| weak.ptr_eq(item)) {
275 self.observers.push(weak);
276 }
277 }
278
279 pub fn detach(&mut self, observer: Arc<dyn Observer<Subject = T>>) {
293 let weak = Arc::downgrade(&observer);
294 self.observers.retain(|item| !weak.ptr_eq(item));
295 }
296
297 pub fn notify(
322 &self,
323 state: &<T as Observable>::State,
324 strategy: NotifyStrategy,
325 ) -> Result<(), <T as Observable>::Error> {
326 self.observers
327 .iter()
328 .flat_map(Weak::upgrade)
329 .try_for_each(|observer| match observer.update(state) {
330 ok @ Ok(_) => ok,
331 err @ Err(_) => match strategy {
332 NotifyStrategy::StopOnError => err,
333 NotifyStrategy::IgnoreError => Ok(()),
334 },
335 })
336 }
337}
338
339impl<T> Default for ObserverRegistry<T>
340where
341 T: Observable,
342{
343 fn default() -> Self {
347 Self {
348 observers: Default::default(),
349 }
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356 use std::sync::atomic::{AtomicUsize, Ordering};
357
358 struct TestObservable {
360 registry: ObserverRegistry<Self>,
361 value: i32,
362 }
363
364 impl TestObservable {
365 fn new(initial_value: i32) -> Self {
366 Self {
367 registry: ObserverRegistry::new(),
368 value: initial_value,
369 }
370 }
371
372 fn update_value(&mut self, new_value: i32) -> Result<(), String> {
373 self.value = new_value;
374 self.registry
375 .notify(&self.value, NotifyStrategy::StopOnError)
376 }
377
378 fn get_value(&self) -> i32 {
379 self.value
380 }
381 }
382
383 impl Observable for TestObservable {
384 type State = i32;
385 type Error = String;
386
387 fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
388 self.registry.attach(observer);
389 }
390
391 fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
392 self.registry.detach(observer);
393 }
394 }
395
396 struct TestObserver {
398 _name: String,
399 last_value: AtomicUsize,
400 }
401
402 impl TestObserver {
403 fn new(name: &str) -> Self {
404 Self {
405 _name: name.to_string(),
406 last_value: AtomicUsize::new(0),
407 }
408 }
409
410 fn get_last_value(&self) -> usize {
411 self.last_value.load(Ordering::SeqCst)
412 }
413 }
414
415 impl Observer for TestObserver {
416 type Subject = TestObservable;
417
418 fn update(&self, value: &i32) -> Result<(), String> {
419 self.last_value.store(*value as usize, Ordering::SeqCst);
420 Ok(())
421 }
422 }
423
424 struct FailingObserver {
426 fail_after: usize,
427 call_count: AtomicUsize,
428 }
429
430 impl FailingObserver {
431 fn new(fail_after: usize) -> Self {
432 Self {
433 fail_after,
434 call_count: AtomicUsize::new(0),
435 }
436 }
437
438 fn get_call_count(&self) -> usize {
439 self.call_count.load(Ordering::SeqCst)
440 }
441 }
442
443 impl Observer for FailingObserver {
444 type Subject = TestObservable;
445
446 fn update(&self, _value: &i32) -> Result<(), String> {
447 let count = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
448 if count >= self.fail_after {
449 Err(format!("Failed after {} calls", count))
450 } else {
451 Ok(())
452 }
453 }
454 }
455
456 #[test]
457 fn test_attach_and_notify() {
458 let mut observable = TestObservable::new(0);
459 let observer = Arc::new(TestObserver::new("test"));
460
461 observable.attach(observer.clone());
463
464 assert!(observable.update_value(42).is_ok());
466 assert_eq!(observable.get_value(), 42);
467 assert_eq!(observer.get_last_value(), 42);
468 }
469
470 #[test]
471 fn test_detach() {
472 let mut observable = TestObservable::new(0);
473 let observer = Arc::new(TestObserver::new("test"));
474
475 observable.attach(observer.clone());
477
478 assert!(observable.update_value(10).is_ok());
480 assert_eq!(observer.get_last_value(), 10);
481
482 observable.detach(observer.clone());
484
485 assert!(observable.update_value(20).is_ok());
487 assert_eq!(observer.get_last_value(), 10); }
489
490 #[test]
491 fn test_multiple_observers() {
492 let mut observable = TestObservable::new(0);
493 let observer1 = Arc::new(TestObserver::new("observer1"));
494 let observer2 = Arc::new(TestObserver::new("observer2"));
495
496 observable.attach(observer1.clone());
497 observable.attach(observer2.clone());
498
499 assert!(observable.update_value(100).is_ok());
500
501 assert_eq!(observer1.get_last_value(), 100);
502 assert_eq!(observer2.get_last_value(), 100);
503 }
504
505 #[test]
506 fn test_notify_strategy_stop_on_error() {
507 let mut observable = TestObservable::new(0);
508 let failing_observer = Arc::new(FailingObserver::new(2)); let normal_observer = Arc::new(TestObserver::new("normal"));
510
511 observable.attach(failing_observer.clone());
512 observable.attach(normal_observer.clone());
513
514 assert!(observable.update_value(1).is_ok());
516 assert_eq!(failing_observer.get_call_count(), 1);
517 assert_eq!(normal_observer.get_last_value(), 1);
518
519 assert!(observable.update_value(2).is_err());
521 assert_eq!(failing_observer.get_call_count(), 2);
522 assert_eq!(normal_observer.get_last_value(), 1); }
524
525 #[test]
526 fn test_notify_strategy_ignore_error() {
527 struct IgnoreErrorObservable {
529 registry: ObserverRegistry<Self>,
530 value: i32,
531 }
532
533 struct IgnoreErrorObserver {
534 call_count: AtomicUsize,
535 }
536
537 impl IgnoreErrorObserver {
538 fn new() -> Self {
539 Self {
540 call_count: AtomicUsize::new(0),
541 }
542 }
543
544 fn get_call_count(&self) -> usize {
545 self.call_count.load(Ordering::SeqCst)
546 }
547 }
548
549 impl Observer for IgnoreErrorObserver {
550 type Subject = IgnoreErrorObservable;
551
552 fn update(&self, _value: &i32) -> Result<(), String> {
553 let count = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
554 if count >= 2 {
555 Err(format!("Failed after {} calls", count))
556 } else {
557 Ok(())
558 }
559 }
560 }
561
562 struct NormalObserver {
563 last_value: AtomicUsize,
564 }
565
566 impl NormalObserver {
567 fn new() -> Self {
568 Self {
569 last_value: AtomicUsize::new(0),
570 }
571 }
572
573 fn get_last_value(&self) -> usize {
574 self.last_value.load(Ordering::SeqCst)
575 }
576 }
577
578 impl Observer for NormalObserver {
579 type Subject = IgnoreErrorObservable;
580
581 fn update(&self, value: &i32) -> Result<(), String> {
582 self.last_value.store(*value as usize, Ordering::SeqCst);
583 Ok(())
584 }
585 }
586
587 impl IgnoreErrorObservable {
588 fn new(initial_value: i32) -> Self {
589 Self {
590 registry: ObserverRegistry::new(),
591 value: initial_value,
592 }
593 }
594
595 fn update_value(&mut self, new_value: i32) -> Result<(), String> {
596 self.value = new_value;
597 self.registry
598 .notify(&self.value, NotifyStrategy::IgnoreError)
599 }
600 }
601
602 impl Observable for IgnoreErrorObservable {
603 type State = i32;
604 type Error = String;
605
606 fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
607 self.registry.attach(observer);
608 }
609
610 fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
611 self.registry.detach(observer);
612 }
613 }
614
615 let mut observable = IgnoreErrorObservable::new(0);
616 let failing_observer = Arc::new(IgnoreErrorObserver::new());
617 let normal_observer = Arc::new(NormalObserver::new());
618
619 observable.attach(failing_observer.clone());
620 observable.attach(normal_observer.clone());
621
622 assert!(observable.update_value(1).is_ok());
624 assert_eq!(failing_observer.get_call_count(), 1);
625 assert_eq!(normal_observer.get_last_value(), 1);
626
627 assert!(observable.update_value(2).is_ok());
629 assert_eq!(failing_observer.get_call_count(), 2);
630 assert_eq!(normal_observer.get_last_value(), 2); }
632
633 #[test]
634 fn test_observer_weak_references() {
635 let mut observable = TestObservable::new(0);
636
637 {
638 let observer = Arc::new(TestObserver::new("temp"));
639 observable.attach(observer.clone());
640
641 assert!(observable.update_value(50).is_ok());
643 assert_eq!(observer.get_last_value(), 50);
644 } assert!(observable.update_value(60).is_ok());
648 }
649
650 #[test]
651 fn test_duplicate_attach() {
652 let mut observable = TestObservable::new(0);
653 let observer = Arc::new(TestObserver::new("test"));
654
655 observable.attach(observer.clone());
657 observable.attach(observer.clone());
658 observable.attach(observer.clone());
659
660 assert!(observable.update_value(99).is_ok());
662 assert_eq!(observer.get_last_value(), 99);
663 }
664
665 #[test]
666 fn test_detach_non_existent() {
667 let mut observable = TestObservable::new(0);
668 let observer = Arc::new(TestObserver::new("test"));
669 let another_observer = Arc::new(TestObserver::new("another"));
670
671 observable.attach(observer.clone());
673
674 observable.detach(another_observer.clone());
676
677 assert!(observable.update_value(33).is_ok());
679 assert_eq!(observer.get_last_value(), 33);
680 }
681
682 #[test]
683 fn test_notify_with_no_observers() {
684 let mut observable = TestObservable::new(0);
685
686 assert!(observable.update_value(77).is_ok());
688 assert_eq!(observable.get_value(), 77);
689 }
690}