Skip to main content

rust_pattern_components/
observer.rs

1use std::sync::{Arc, Weak};
2
3/// 观察者 trait
4///
5/// 定义观察者必须实现的接口。观察者可以订阅被观察者的状态变化,
6/// 并在状态更新时通过 `update` 方法接收通知。
7///
8/// # 关联类型
9///
10/// - `Subject`: 观察者订阅的被观察者类型,必须实现 [`Observable`] trait
11///
12/// # 方法
13///
14/// - `update`: 接收状态更新通知
15///
16/// # 实现要求
17///
18/// 实现者需要指定具体的被观察者类型,并实现 `update` 方法。
19/// `update` 方法应该快速返回,避免阻塞通知过程。
20///
21/// # 线程安全
22///
23/// 实现者应确保 `update` 方法是线程安全的,因为可能从多个线程调用。
24///
25/// # 示例
26///
27/// ```
28/// use std::sync::Arc;
29/// use rust_patterns_components::{Observer, Observable};
30///
31/// struct TemperatureSensor;
32///
33/// impl Observable for TemperatureSensor {
34///     type State = f64;
35///     type Error = String;
36///
37///     fn attach(&mut self, _observer: Arc<dyn Observer<Subject = Self>>) {}
38///     fn detach(&mut self, _observer: Arc<dyn Observer<Subject = Self>>) {}
39/// }
40///
41/// struct TemperatureDisplay;
42///
43/// impl Observer for TemperatureDisplay {
44///     type Subject = TemperatureSensor;
45///
46///     fn update(&self, state: &f64) -> Result<(), String> {
47///         println!("当前温度: {:.1}°C", state);
48///         Ok(())
49///     }
50/// }
51/// ```
52pub trait Observer {
53    /// 观察者订阅的被观察者类型
54    ///
55    /// 此类型必须实现 [`Observable`] trait,定义了观察者关注的状态和错误类型。
56    type Subject: Observable;
57
58    /// 接收状态更新通知
59    ///
60    /// 当被观察者状态发生变化时调用此方法。实现者应该:
61    ///
62    /// 1. 处理传入的状态
63    /// 2. 返回 `Ok(())` 表示处理成功
64    /// 3. 返回 `Err(error)` 表示处理失败
65    ///
66    /// # 参数
67    ///
68    /// - `state`: 当前的被观察者状态引用
69    ///
70    /// # 返回值
71    ///
72    /// - `Ok(())`: 成功处理状态更新
73    /// - `Err(<Self::Subject as Observable>::Error)`: 处理状态更新时发生错误
74    ///
75    /// # 错误处理
76    ///
77    /// 如果此方法返回错误,被观察者的 `notify` 方法会根据指定的通知策略
78    /// 决定是否继续通知其他观察者。
79    fn update(
80        &self,
81        state: &<Self::Subject as Observable>::State,
82    ) -> Result<(), <Self::Subject as Observable>::Error>;
83}
84
85/// 被观察者 trait
86///
87/// 定义被观察者必须实现的接口。被观察者维护一组观察者,
88/// 并在状态变化时通知它们。
89///
90/// # 关联类型
91///
92/// - `State`: 被观察者的状态类型,观察者通过此类型接收状态更新
93/// - `Error`: 观察者处理更新时可能返回的错误类型
94///
95/// # 方法
96///
97/// - `attach`: 附加观察者到被观察者
98/// - `detach`: 从被观察者分离观察者
99///
100/// # 实现要求
101///
102/// 实现者需要指定状态类型和错误类型,并实现观察者管理方法。
103/// 通常建议使用 [`ObserverRegistry`] 来简化实现。
104///
105/// # 线程安全
106///
107/// 实现者应确保观察者管理方法是线程安全的,因为可能从多个线程调用。
108///
109/// # 示例
110///
111/// ```
112/// use std::sync::Arc;
113/// use rust_patterns_components::{Observable, Observer, ObserverRegistry};
114///
115/// struct WeatherStation {
116///     registry: ObserverRegistry<Self>,
117/// }
118///
119/// impl Observable for WeatherStation {
120///     type State = f64;
121///     type Error = String;
122///
123///     fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
124///         self.registry.attach(observer);
125///     }
126///
127///     fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
128///         self.registry.detach(observer);
129///     }
130/// }
131/// ```
132pub trait Observable {
133    /// 被观察者的状态类型
134    ///
135    /// 当状态变化时,会传递此类型的值给观察者。
136    type State;
137
138    /// 观察者处理更新时可能返回的错误类型
139    ///
140    /// 如果观察者处理更新失败,可以返回此类型的错误。
141    type Error;
142
143    /// 附加观察者
144    ///
145    /// 将观察者附加到被观察者。附加后,观察者将收到状态更新通知。
146    ///
147    /// # 参数
148    ///
149    /// - `observer`: 要附加的观察者强引用
150    fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>);
151
152    /// 分离观察者
153    ///
154    /// 从被观察者分离指定的观察者。分离后,该观察者将不再收到状态更新通知。
155    ///
156    /// # 参数
157    ///
158    /// - `observer`: 要分离的观察者强引用
159    fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>);
160}
161
162/// 通知策略
163///
164/// 定义当观察者处理更新失败时的行为。
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum NotifyStrategy {
167    /// 立即停止并返回错误
168    ///
169    /// 这是默认策略,当一个观察者失败时立即停止通知过程。
170    StopOnError,
171
172    /// 忽略错误并继续通知
173    ///
174    /// 即使某个观察者失败,也继续通知其他观察者。
175    /// 错误会被忽略,继续执行。
176    IgnoreError,
177}
178
179/// 观察者注册表
180///
181/// 管理一组观察者并在状态变化时通知它们。注册表维护观察者的弱引用列表,
182/// 避免强引用循环导致的内存泄漏。
183///
184/// # 类型参数
185///
186/// - `T`: 被观察者类型,必须实现 [`Observable`] trait
187///
188/// # 设计特点
189///
190/// - **弱引用管理**: 使用 `Weak` 引用存储观察者,允许观察者在不再需要时被释放
191/// - **自动清理**: 在通知时自动跳过已释放的观察者弱引用
192/// - **通知策略**: 支持两种通知策略
193/// - **防止重复**: 检查观察者是否已存在,防止重复添加
194///
195/// # 示例
196///
197/// ```
198/// use std::sync::Arc;
199/// use rust_patterns_components::{Observable, Observer, ObserverRegistry, NotifyStrategy};
200///
201/// struct Sensor;
202///
203/// impl Observable for Sensor {
204///     type State = String;
205///     type Error = String;
206///
207///     fn attach(&mut self, _observer: Arc<dyn Observer<Subject = Self>>) {}
208///     fn detach(&mut self, _observer: Arc<dyn Observer<Subject = Self>>) {}
209/// }
210///
211/// let mut registry = ObserverRegistry::<Sensor>::default();
212/// // 使用 registry.attach() 添加观察者
213/// // 使用 registry.notify() 通知观察者
214/// ```
215pub struct ObserverRegistry<T: Observable> {
216    /// 观察者弱引用列表
217    ///
218    /// 使用弱引用避免循环引用。当观察者被释放时,
219    /// 对应的弱引用会自动变为无效。
220    observers: Vec<Weak<dyn Observer<Subject = T>>>,
221}
222
223impl<T> ObserverRegistry<T>
224where
225    T: Observable,
226{
227    /// 创建新的观察者注册表实例
228    ///
229    /// # 返回值
230    ///
231    /// 返回一个空的观察者注册表,不包含任何观察者。
232    pub fn new() -> Self {
233        Self {
234            observers: Vec::new(),
235        }
236    }
237
238    /// 创建具有初始容量的观察者注册表实例
239    ///
240    /// # 参数
241    ///
242    /// - `capacity`: 初始容量,用于预分配内存
243    ///
244    /// # 返回值
245    ///
246    /// 返回一个具有指定初始容量的空注册表。
247    ///
248    /// # 性能
249    ///
250    /// 预分配容量可以避免后续添加观察者时的多次内存重新分配,
251    /// 当已知观察者数量时建议使用此方法。
252    pub fn with_capacity(capacity: usize) -> Self {
253        Self {
254            observers: Vec::with_capacity(capacity),
255        }
256    }
257
258    /// 附加观察者
259    ///
260    /// 将观察者附加到注册表。方法内部会将观察者的强引用转换为弱引用,
261    /// 并确保不会重复添加相同的观察者。
262    ///
263    /// # 参数
264    ///
265    /// - `observer`: 要附加的观察者强引用
266    ///
267    /// # 注意
268    ///
269    /// - 使用 `Arc::downgrade` 将强引用转换为弱引用,避免循环引用
270    /// - 使用 `Weak::ptr_eq` 检查观察者是否已存在,防止重复添加
271    /// - 观察者将在下次调用 `notify` 方法时收到状态更新通知
272    pub fn attach(&mut self, observer: Arc<dyn Observer<Subject = T>>) {
273        let weak = Arc::downgrade(&observer);
274        if !self.observers.iter().any(|item| weak.ptr_eq(item)) {
275            self.observers.push(weak);
276        }
277    }
278
279    /// 分离观察者
280    ///
281    /// 从注册表中分离指定的观察者。分离后,该观察者将不再收到状态更新通知。
282    ///
283    /// # 参数
284    ///
285    /// - `observer`: 要分离的观察者强引用
286    ///
287    /// # 注意
288    ///
289    /// - 使用 `Arc::downgrade` 将强引用转换为弱引用进行匹配
290    /// - 使用 `Weak::ptr_eq` 进行引用相等性比较
291    /// - 如果观察者不存在于列表中,此方法不会有任何效果
292    pub fn detach(&mut self, observer: Arc<dyn Observer<Subject = T>>) {
293        let weak = Arc::downgrade(&observer);
294        self.observers.retain(|item| !weak.ptr_eq(item));
295    }
296
297    /// 通知所有观察者
298    ///
299    /// 向所有有效的观察者发送状态更新通知。
300    ///
301    /// # 参数
302    ///
303    /// - `state`: 要通知的新状态引用
304    /// - `strategy`: 通知策略,指定观察者失败时的行为
305    ///   使用 [`NotifyStrategy::StopOnError`] 或 [`NotifyStrategy::IgnoreError`]
306    ///
307    /// # 返回值
308    ///
309    /// - `Ok(())`: 所有观察者都成功处理了更新,或错误被忽略
310    /// - `Err(<T as Observable>::Error)`: 某个观察者处理更新时返回了错误(当使用 `StopOnError` 策略时)
311    ///
312    /// # 通知策略
313    ///
314    /// 根据指定的通知策略决定行为:
315    /// - `NotifyStrategy::StopOnError`: 立即返回第一个错误,停止通知其他观察者
316    /// - `NotifyStrategy::IgnoreError`: 忽略错误,继续通知其他观察者,总是返回 `Ok(())`
317    ///
318    /// # 性能
319    ///
320    /// 此方法会自动清理无效的观察者弱引用(通过 `Weak::upgrade` 过滤)。
321    pub fn notify(
322        &self,
323        state: &<T as Observable>::State,
324        strategy: NotifyStrategy,
325    ) -> Result<(), <T as Observable>::Error> {
326        self.observers
327            .iter()
328            .flat_map(Weak::upgrade)
329            .try_for_each(|observer| match observer.update(state) {
330                ok @ Ok(_) => ok,
331                err @ Err(_) => match strategy {
332                    NotifyStrategy::StopOnError => err,
333                    NotifyStrategy::IgnoreError => Ok(()),
334                },
335            })
336    }
337}
338
339impl<T> Default for ObserverRegistry<T>
340where
341    T: Observable,
342{
343    /// 创建默认的观察者注册表实例
344    ///
345    /// 等同于调用 [`ObserverRegistry::new()`]。
346    fn default() -> Self {
347        Self {
348            observers: Default::default(),
349        }
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356    use std::sync::atomic::{AtomicUsize, Ordering};
357
358    // 测试用的被观察者
359    struct TestObservable {
360        registry: ObserverRegistry<Self>,
361        value: i32,
362    }
363
364    impl TestObservable {
365        fn new(initial_value: i32) -> Self {
366            Self {
367                registry: ObserverRegistry::new(),
368                value: initial_value,
369            }
370        }
371
372        fn update_value(&mut self, new_value: i32) -> Result<(), String> {
373            self.value = new_value;
374            self.registry
375                .notify(&self.value, NotifyStrategy::StopOnError)
376        }
377
378        fn get_value(&self) -> i32 {
379            self.value
380        }
381    }
382
383    impl Observable for TestObservable {
384        type State = i32;
385        type Error = String;
386
387        fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
388            self.registry.attach(observer);
389        }
390
391        fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
392            self.registry.detach(observer);
393        }
394    }
395
396    // 简单的测试观察者
397    struct TestObserver {
398        _name: String,
399        last_value: AtomicUsize,
400    }
401
402    impl TestObserver {
403        fn new(name: &str) -> Self {
404            Self {
405                _name: name.to_string(),
406                last_value: AtomicUsize::new(0),
407            }
408        }
409
410        fn get_last_value(&self) -> usize {
411            self.last_value.load(Ordering::SeqCst)
412        }
413    }
414
415    impl Observer for TestObserver {
416        type Subject = TestObservable;
417
418        fn update(&self, value: &i32) -> Result<(), String> {
419            self.last_value.store(*value as usize, Ordering::SeqCst);
420            Ok(())
421        }
422    }
423
424    // 可能失败的测试观察者
425    struct FailingObserver {
426        fail_after: usize,
427        call_count: AtomicUsize,
428    }
429
430    impl FailingObserver {
431        fn new(fail_after: usize) -> Self {
432            Self {
433                fail_after,
434                call_count: AtomicUsize::new(0),
435            }
436        }
437
438        fn get_call_count(&self) -> usize {
439            self.call_count.load(Ordering::SeqCst)
440        }
441    }
442
443    impl Observer for FailingObserver {
444        type Subject = TestObservable;
445
446        fn update(&self, _value: &i32) -> Result<(), String> {
447            let count = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
448            if count >= self.fail_after {
449                Err(format!("Failed after {} calls", count))
450            } else {
451                Ok(())
452            }
453        }
454    }
455
456    #[test]
457    fn test_attach_and_notify() {
458        let mut observable = TestObservable::new(0);
459        let observer = Arc::new(TestObserver::new("test"));
460
461        // 附加观察者
462        observable.attach(observer.clone());
463
464        // 更新值并通知
465        assert!(observable.update_value(42).is_ok());
466        assert_eq!(observable.get_value(), 42);
467        assert_eq!(observer.get_last_value(), 42);
468    }
469
470    #[test]
471    fn test_detach() {
472        let mut observable = TestObservable::new(0);
473        let observer = Arc::new(TestObserver::new("test"));
474
475        // 附加观察者
476        observable.attach(observer.clone());
477
478        // 更新一次
479        assert!(observable.update_value(10).is_ok());
480        assert_eq!(observer.get_last_value(), 10);
481
482        // 分离观察者
483        observable.detach(observer.clone());
484
485        // 再次更新,观察者不应该收到通知
486        assert!(observable.update_value(20).is_ok());
487        assert_eq!(observer.get_last_value(), 10); // 仍然是旧值
488    }
489
490    #[test]
491    fn test_multiple_observers() {
492        let mut observable = TestObservable::new(0);
493        let observer1 = Arc::new(TestObserver::new("observer1"));
494        let observer2 = Arc::new(TestObserver::new("observer2"));
495
496        observable.attach(observer1.clone());
497        observable.attach(observer2.clone());
498
499        assert!(observable.update_value(100).is_ok());
500
501        assert_eq!(observer1.get_last_value(), 100);
502        assert_eq!(observer2.get_last_value(), 100);
503    }
504
505    #[test]
506    fn test_notify_strategy_stop_on_error() {
507        let mut observable = TestObservable::new(0);
508        let failing_observer = Arc::new(FailingObserver::new(2)); // 第二次调用失败
509        let normal_observer = Arc::new(TestObserver::new("normal"));
510
511        observable.attach(failing_observer.clone());
512        observable.attach(normal_observer.clone());
513
514        // 第一次更新应该成功
515        assert!(observable.update_value(1).is_ok());
516        assert_eq!(failing_observer.get_call_count(), 1);
517        assert_eq!(normal_observer.get_last_value(), 1);
518
519        // 第二次更新应该失败(StopOnError 策略)
520        assert!(observable.update_value(2).is_err());
521        assert_eq!(failing_observer.get_call_count(), 2);
522        assert_eq!(normal_observer.get_last_value(), 1); // 正常观察者不应该收到第二次通知
523    }
524
525    #[test]
526    fn test_notify_strategy_ignore_error() {
527        // 为 IgnoreErrorObservable 创建专门的观察者
528        struct IgnoreErrorObservable {
529            registry: ObserverRegistry<Self>,
530            value: i32,
531        }
532
533        struct IgnoreErrorObserver {
534            call_count: AtomicUsize,
535        }
536
537        impl IgnoreErrorObserver {
538            fn new() -> Self {
539                Self {
540                    call_count: AtomicUsize::new(0),
541                }
542            }
543
544            fn get_call_count(&self) -> usize {
545                self.call_count.load(Ordering::SeqCst)
546            }
547        }
548
549        impl Observer for IgnoreErrorObserver {
550            type Subject = IgnoreErrorObservable;
551
552            fn update(&self, _value: &i32) -> Result<(), String> {
553                let count = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
554                if count >= 2 {
555                    Err(format!("Failed after {} calls", count))
556                } else {
557                    Ok(())
558                }
559            }
560        }
561
562        struct NormalObserver {
563            last_value: AtomicUsize,
564        }
565
566        impl NormalObserver {
567            fn new() -> Self {
568                Self {
569                    last_value: AtomicUsize::new(0),
570                }
571            }
572
573            fn get_last_value(&self) -> usize {
574                self.last_value.load(Ordering::SeqCst)
575            }
576        }
577
578        impl Observer for NormalObserver {
579            type Subject = IgnoreErrorObservable;
580
581            fn update(&self, value: &i32) -> Result<(), String> {
582                self.last_value.store(*value as usize, Ordering::SeqCst);
583                Ok(())
584            }
585        }
586
587        impl IgnoreErrorObservable {
588            fn new(initial_value: i32) -> Self {
589                Self {
590                    registry: ObserverRegistry::new(),
591                    value: initial_value,
592                }
593            }
594
595            fn update_value(&mut self, new_value: i32) -> Result<(), String> {
596                self.value = new_value;
597                self.registry
598                    .notify(&self.value, NotifyStrategy::IgnoreError)
599            }
600        }
601
602        impl Observable for IgnoreErrorObservable {
603            type State = i32;
604            type Error = String;
605
606            fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
607                self.registry.attach(observer);
608            }
609
610            fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
611                self.registry.detach(observer);
612            }
613        }
614
615        let mut observable = IgnoreErrorObservable::new(0);
616        let failing_observer = Arc::new(IgnoreErrorObserver::new());
617        let normal_observer = Arc::new(NormalObserver::new());
618
619        observable.attach(failing_observer.clone());
620        observable.attach(normal_observer.clone());
621
622        // 第一次更新应该成功
623        assert!(observable.update_value(1).is_ok());
624        assert_eq!(failing_observer.get_call_count(), 1);
625        assert_eq!(normal_observer.get_last_value(), 1);
626
627        // 第二次更新应该返回 Ok(IgnoreError 策略)
628        assert!(observable.update_value(2).is_ok());
629        assert_eq!(failing_observer.get_call_count(), 2);
630        assert_eq!(normal_observer.get_last_value(), 2); // 正常观察者应该收到第二次通知
631    }
632
633    #[test]
634    fn test_observer_weak_references() {
635        let mut observable = TestObservable::new(0);
636
637        {
638            let observer = Arc::new(TestObserver::new("temp"));
639            observable.attach(observer.clone());
640
641            // 更新一次
642            assert!(observable.update_value(50).is_ok());
643            assert_eq!(observer.get_last_value(), 50);
644        } // observer 在这里被丢弃
645
646        // 再次更新,应该仍然工作(弱引用会被清理)
647        assert!(observable.update_value(60).is_ok());
648    }
649
650    #[test]
651    fn test_duplicate_attach() {
652        let mut observable = TestObservable::new(0);
653        let observer = Arc::new(TestObserver::new("test"));
654
655        // 多次附加同一个观察者
656        observable.attach(observer.clone());
657        observable.attach(observer.clone());
658        observable.attach(observer.clone());
659
660        // 应该只通知一次
661        assert!(observable.update_value(99).is_ok());
662        assert_eq!(observer.get_last_value(), 99);
663    }
664
665    #[test]
666    fn test_detach_non_existent() {
667        let mut observable = TestObservable::new(0);
668        let observer = Arc::new(TestObserver::new("test"));
669        let another_observer = Arc::new(TestObserver::new("another"));
670
671        // 附加一个观察者
672        observable.attach(observer.clone());
673
674        // 尝试分离一个未附加的观察者(应该没有效果)
675        observable.detach(another_observer.clone());
676
677        // 更新应该仍然工作
678        assert!(observable.update_value(33).is_ok());
679        assert_eq!(observer.get_last_value(), 33);
680    }
681
682    #[test]
683    fn test_notify_with_no_observers() {
684        let mut observable = TestObservable::new(0);
685
686        // 没有观察者时更新应该成功
687        assert!(observable.update_value(77).is_ok());
688        assert_eq!(observable.get_value(), 77);
689    }
690}