Skip to main content

rust_pattern_components/
observer.rs

1use std::sync::{Arc, Weak};
2
3/// 观察者 trait
4///
5/// 定义观察者必须实现的接口。观察者可以订阅被观察者的状态变化,
6/// 并在状态更新时通过 `update` 方法接收通知。
7///
8/// # 关联类型
9///
10/// - `Subject`: 观察者订阅的被观察者类型,必须实现 [`Observable`] trait
11///
12/// # 方法
13///
14/// - `update`: 接收状态更新通知
15///
16/// # 实现要求
17///
18/// 实现者需要指定具体的被观察者类型,并实现 `update` 方法。
19/// `update` 方法应该快速返回,避免阻塞通知过程。
20///
21/// # 线程安全
22///
23/// 实现者应确保 `update` 方法是线程安全的,因为可能从多个线程调用。
24///
25/// # 示例
26///
27/// ```
28/// use std::sync::Arc;
29/// use rust_patterns_components::{Observer, Observable};
30///
31/// struct TemperatureSensor;
32///
33/// impl Observable for TemperatureSensor {
34///     type State = f64;
35///     type Error = String;
36///
37///     fn attach(&mut self, _observer: Arc<dyn Observer<Subject = Self>>) {}
38///     fn detach(&mut self, _observer: Arc<dyn Observer<Subject = Self>>) {}
39/// }
40///
41/// struct TemperatureDisplay;
42///
43/// impl Observer for TemperatureDisplay {
44///     type Subject = TemperatureSensor;
45///
46///     fn update(&self, state: &f64) -> Result<(), String> {
47///         println!("当前温度: {:.1}°C", state);
48///         Ok(())
49///     }
50/// }
51/// ```
52pub trait Observer {
53    /// 观察者订阅的被观察者类型
54    ///
55    /// 此类型必须实现 [`Observable`] trait,定义了观察者关注的状态和错误类型。
56    type Subject: Observable;
57
58    /// 接收状态更新通知
59    ///
60    /// 当被观察者状态发生变化时调用此方法。实现者应该:
61    ///
62    /// 1. 处理传入的状态
63    /// 2. 返回 `Ok(())` 表示处理成功
64    /// 3. 返回 `Err(error)` 表示处理失败
65    ///
66    /// # 参数
67    ///
68    /// - `state`: 当前的被观察者状态引用
69    ///
70    /// # 返回值
71    ///
72    /// - `Ok(())`: 成功处理状态更新
73    /// - `Err(<Self::Subject as Observable>::Error)`: 处理状态更新时发生错误
74    ///
75    /// # 错误处理
76    ///
77    /// 如果此方法返回错误,被观察者的 `notify` 方法会根据指定的通知策略
78    /// 决定是否继续通知其他观察者。
79    fn update(
80        &self,
81        state: &<Self::Subject as Observable>::State,
82    ) -> Result<(), <Self::Subject as Observable>::Error>;
83}
84
85/// 被观察者 trait
86///
87/// 定义被观察者必须实现的接口。被观察者维护一组观察者,
88/// 并在状态变化时通知它们。
89///
90/// # 关联类型
91///
92/// - `State`: 被观察者的状态类型,观察者通过此类型接收状态更新
93/// - `Error`: 观察者处理更新时可能返回的错误类型
94///
95/// # 方法
96///
97/// - `attach`: 附加观察者到被观察者
98/// - `detach`: 从被观察者分离观察者
99///
100/// # 实现要求
101///
102/// 实现者需要指定状态类型和错误类型,并实现观察者管理方法。
103/// 通常建议使用 [`ObserverRegistry`] 来简化实现。
104///
105/// # 线程安全
106///
107/// 实现者应确保观察者管理方法是线程安全的,因为可能从多个线程调用。
108///
109/// # 示例
110///
111/// ```
112/// use std::sync::Arc;
113/// use rust_patterns_components::{Observable, Observer, ObserverRegistry};
114///
115/// struct WeatherStation {
116///     registry: ObserverRegistry<Self>,
117/// }
118///
119/// impl Observable for WeatherStation {
120///     type State = f64;
121///     type Error = String;
122///
123///     fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
124///         self.registry.attach(observer);
125///     }
126///
127///     fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
128///         self.registry.detach(observer);
129///     }
130/// }
131/// ```
132pub trait Observable {
133    /// 被观察者的状态类型
134    ///
135    /// 当状态变化时,会传递此类型的值给观察者。
136    type State;
137
138    /// 观察者处理更新时可能返回的错误类型
139    ///
140    /// 如果观察者处理更新失败,可以返回此类型的错误。
141    type Error;
142
143    /// 附加观察者
144    ///
145    /// 将观察者附加到被观察者。附加后,观察者将收到状态更新通知。
146    ///
147    /// # 参数
148    ///
149    /// - `observer`: 要附加的观察者强引用
150    fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>);
151
152    /// 分离观察者
153    ///
154    /// 从被观察者分离指定的观察者。分离后,该观察者将不再收到状态更新通知。
155    ///
156    /// # 参数
157    ///
158    /// - `observer`: 要分离的观察者强引用
159    fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>);
160}
161
162/// 观察者注册表
163///
164/// 管理一组观察者并在状态变化时通知它们。注册表维护观察者的弱引用列表,
165/// 避免强引用循环导致的内存泄漏。
166///
167/// # 类型参数
168///
169/// - `T`: 被观察者类型,必须实现 [`Observable`] trait
170///
171/// # 设计特点
172///
173/// - **弱引用管理**: 使用 `Weak` 引用存储观察者,允许观察者在不再需要时被释放
174/// - **自动清理**: 在通知时自动跳过已释放的观察者弱引用
175/// - **通知策略**: 支持两种通知策略
176/// - **防止重复**: 检查观察者是否已存在,防止重复添加
177///
178/// # 示例
179///
180/// ```
181/// use std::sync::Arc;
182/// use rust_patterns_components::{Observable, Observer, ObserverRegistry};
183///
184/// struct Sensor;
185///
186/// impl Observable for Sensor {
187///     type State = String;
188///     type Error = String;
189///
190///     fn attach(&mut self, _observer: Arc<dyn Observer<Subject = Self>>) {}
191///     fn detach(&mut self, _observer: Arc<dyn Observer<Subject = Self>>) {}
192/// }
193///
194/// let mut registry = ObserverRegistry::<Sensor>::default();
195/// // 使用 registry.attach() 添加观察者
196/// // 使用 registry.notify() 通知观察者
197/// ```
198pub struct ObserverRegistry<T: Observable> {
199    /// 观察者弱引用列表
200    ///
201    /// 使用弱引用避免循环引用。当观察者被释放时,
202    /// 对应的弱引用会自动变为无效。
203    observers: Vec<Weak<dyn Observer<Subject = T>>>,
204}
205
206impl<T> ObserverRegistry<T>
207where
208    T: Observable,
209{
210    /// 创建新的观察者注册表实例
211    ///
212    /// # 返回值
213    ///
214    /// 返回一个空的观察者注册表,不包含任何观察者。
215    pub fn new() -> Self {
216        Self {
217            observers: Vec::new(),
218        }
219    }
220
221    /// 创建具有初始容量的观察者注册表实例
222    ///
223    /// # 参数
224    ///
225    /// - `capacity`: 初始容量,用于预分配内存
226    ///
227    /// # 返回值
228    ///
229    /// 返回一个具有指定初始容量的空注册表。
230    ///
231    /// # 性能
232    ///
233    /// 预分配容量可以避免后续添加观察者时的多次内存重新分配,
234    /// 当已知观察者数量时建议使用此方法。
235    pub fn with_capacity(capacity: usize) -> Self {
236        Self {
237            observers: Vec::with_capacity(capacity),
238        }
239    }
240
241    /// 附加观察者
242    ///
243    /// 将观察者附加到注册表。方法内部会将观察者的强引用转换为弱引用,
244    /// 并确保不会重复添加相同的观察者。
245    ///
246    /// # 参数
247    ///
248    /// - `observer`: 要附加的观察者强引用
249    ///
250    /// # 注意
251    ///
252    /// - 使用 `Arc::downgrade` 将强引用转换为弱引用,避免循环引用
253    /// - 使用 `Weak::ptr_eq` 检查观察者是否已存在,防止重复添加
254    /// - 观察者将在下次调用 `notify` 方法时收到状态更新通知
255    pub fn attach(&mut self, observer: Arc<dyn Observer<Subject = T>>) {
256        let weak = Arc::downgrade(&observer);
257        if !self.observers.iter().any(|item| weak.ptr_eq(item)) {
258            self.observers.push(weak);
259        }
260    }
261
262    /// 分离观察者
263    ///
264    /// 从注册表中分离指定的观察者。分离后,该观察者将不再收到状态更新通知。
265    ///
266    /// # 参数
267    ///
268    /// - `observer`: 要分离的观察者强引用
269    ///
270    /// # 注意
271    ///
272    /// - 使用 `Arc::downgrade` 将强引用转换为弱引用进行匹配
273    /// - 使用 `Weak::ptr_eq` 进行引用相等性比较
274    /// - 如果观察者不存在于列表中,此方法不会有任何效果
275    pub fn detach(&mut self, observer: Arc<dyn Observer<Subject = T>>) {
276        let weak = Arc::downgrade(&observer);
277        self.observers.retain(|item| !weak.ptr_eq(item));
278    }
279
280    /// 通知所有观察者
281    ///
282    /// 向所有有效的观察者发送状态更新通知。
283    ///
284    /// # 参数
285    ///
286    /// - `state`: 要通知的新状态引用
287    ///
288    /// # 返回值
289    ///
290    /// - `Ok(())`: 所有观察者都成功处理了更新
291    /// - `Err(<T as Observable>::Error)`: 某个观察者处理更新时返回了错误,立即停止通知其他观察者
292    ///
293    /// # 行为
294    ///
295    /// - 遍历所有观察者并调用其 `update` 方法
296    /// - 当某个观察者返回错误时,立即停止并返回该错误
297    /// - 自动清理无效的观察者弱引用(通过 `Weak::upgrade` 过滤)
298    ///
299    /// # 性能
300    ///
301    /// 此方法会自动清理无效的观察者弱引用(通过 `Weak::upgrade` 过滤)。
302    pub fn notify(&self, state: &<T as Observable>::State) -> Result<(), <T as Observable>::Error> {
303        self.observers
304            .iter()
305            .flat_map(Weak::upgrade)
306            .try_for_each(|observer| observer.update(state))
307    }
308
309    /// 通知所有观察者状态变化,忽略错误
310    ///
311    /// 此方法会通知所有已注册的观察者状态变化,但会忽略任何观察者返回的错误。
312    /// 即使某个观察者处理更新失败,也会继续通知其他观察者。
313    ///
314    /// # 参数
315    ///
316    /// * `state` - 要通知的状态变化
317    ///
318    /// # 行为
319    ///
320    /// - 遍历所有观察者并调用其 `update` 方法
321    /// - 忽略所有观察者返回的错误(使用 `let _ = ...`)
322    /// - 自动清理无效的观察者弱引用(通过 `Weak::upgrade` 过滤)
323    ///
324    /// # 使用场景
325    ///
326    /// 适用于以下情况:
327    /// - 观察者的错误不应该阻止其他观察者接收通知
328    /// - 错误处理不是关键,可以安全忽略
329    /// - 需要确保所有观察者都能收到通知,即使某些观察者可能失败
330    ///
331    /// # 与 `notify` 方法的区别
332    ///
333    /// - `notify`: 遇到第一个错误就停止,并返回该错误
334    /// - `notify_ignore_error`: 忽略所有错误,继续通知所有观察者
335    ///
336    /// # 示例
337    ///
338    /// ```rust
339    /// use rust_pattern_components::{Observable, Observer, ObserverRegistry};
340    /// use std::sync::{Arc, Weak};
341    ///
342    /// struct Counter {
343    ///     registry: ObserverRegistry<Self>,
344    ///     value: u64,
345    /// }
346    ///
347    /// impl Observable for Counter {
348    ///     type State = u64;
349    ///     type Error = String;
350    ///
351    ///     fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
352    ///         self.registry.attach(observer);
353    ///     }
354    ///
355    ///     fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
356    ///         self.registry.detach(observer);
357    ///     }
358    /// }
359    ///
360    /// let counter = Counter {
361    ///     registry: ObserverRegistry::new(),
362    ///     value: 42,
363    /// };
364    ///
365    /// // 即使观察者可能失败,也会通知所有观察者
366    /// counter.registry.notify_ignore_error(&counter.value);
367    /// ```
368    pub fn notify_ignore_error(&self, state: &<T as Observable>::State) {
369        self.observers
370            .iter()
371            .flat_map(Weak::upgrade)
372            .for_each(|observer| {
373                let _ = observer.update(state);
374            })
375    }
376}
377
378impl<T> Default for ObserverRegistry<T>
379where
380    T: Observable,
381{
382    /// 创建默认的观察者注册表实例
383    ///
384    /// 等同于调用 [`ObserverRegistry::new()`]。
385    fn default() -> Self {
386        Self {
387            observers: Default::default(),
388        }
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use std::sync::atomic::{AtomicUsize, Ordering};
396
397    // 测试用的被观察者
398    struct TestObservable {
399        registry: ObserverRegistry<Self>,
400        value: i32,
401    }
402
403    impl TestObservable {
404        fn new(initial_value: i32) -> Self {
405            Self {
406                registry: ObserverRegistry::new(),
407                value: initial_value,
408            }
409        }
410
411        fn update_value(&mut self, new_value: i32) -> Result<(), String> {
412            self.value = new_value;
413            self.registry.notify(&self.value)
414        }
415
416        fn get_value(&self) -> i32 {
417            self.value
418        }
419    }
420
421    impl Observable for TestObservable {
422        type State = i32;
423        type Error = String;
424
425        fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
426            self.registry.attach(observer);
427        }
428
429        fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
430            self.registry.detach(observer);
431        }
432    }
433
434    // 简单的测试观察者
435    struct TestObserver {
436        _name: String,
437        last_value: AtomicUsize,
438    }
439
440    impl TestObserver {
441        fn new(name: &str) -> Self {
442            Self {
443                _name: name.to_string(),
444                last_value: AtomicUsize::new(0),
445            }
446        }
447
448        fn get_last_value(&self) -> usize {
449            self.last_value.load(Ordering::SeqCst)
450        }
451    }
452
453    impl Observer for TestObserver {
454        type Subject = TestObservable;
455
456        fn update(&self, value: &i32) -> Result<(), String> {
457            self.last_value.store(*value as usize, Ordering::SeqCst);
458            Ok(())
459        }
460    }
461
462    // 可能失败的测试观察者
463    struct FailingObserver {
464        fail_after: usize,
465        call_count: AtomicUsize,
466    }
467
468    impl FailingObserver {
469        fn new(fail_after: usize) -> Self {
470            Self {
471                fail_after,
472                call_count: AtomicUsize::new(0),
473            }
474        }
475
476        fn get_call_count(&self) -> usize {
477            self.call_count.load(Ordering::SeqCst)
478        }
479    }
480
481    impl Observer for FailingObserver {
482        type Subject = TestObservable;
483
484        fn update(&self, _value: &i32) -> Result<(), String> {
485            let count = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
486            if count >= self.fail_after {
487                Err(format!("Failed after {} calls", count))
488            } else {
489                Ok(())
490            }
491        }
492    }
493
494    #[test]
495    fn test_attach_and_notify() {
496        let mut observable = TestObservable::new(0);
497        let observer = Arc::new(TestObserver::new("test"));
498
499        // 附加观察者
500        observable.attach(observer.clone());
501
502        // 更新值并通知
503        assert!(observable.update_value(42).is_ok());
504        assert_eq!(observable.get_value(), 42);
505        assert_eq!(observer.get_last_value(), 42);
506    }
507
508    #[test]
509    fn test_detach() {
510        let mut observable = TestObservable::new(0);
511        let observer = Arc::new(TestObserver::new("test"));
512
513        // 附加观察者
514        observable.attach(observer.clone());
515
516        // 更新一次
517        assert!(observable.update_value(10).is_ok());
518        assert_eq!(observer.get_last_value(), 10);
519
520        // 分离观察者
521        observable.detach(observer.clone());
522
523        // 再次更新,观察者不应该收到通知
524        assert!(observable.update_value(20).is_ok());
525        assert_eq!(observer.get_last_value(), 10); // 仍然是旧值
526    }
527
528    #[test]
529    fn test_multiple_observers() {
530        let mut observable = TestObservable::new(0);
531        let observer1 = Arc::new(TestObserver::new("observer1"));
532        let observer2 = Arc::new(TestObserver::new("observer2"));
533
534        observable.attach(observer1.clone());
535        observable.attach(observer2.clone());
536
537        assert!(observable.update_value(100).is_ok());
538
539        assert_eq!(observer1.get_last_value(), 100);
540        assert_eq!(observer2.get_last_value(), 100);
541    }
542
543    #[test]
544    fn test_notify_stop_on_error() {
545        let mut observable = TestObservable::new(0);
546        let failing_observer = Arc::new(FailingObserver::new(2)); // 第二次调用失败
547        let normal_observer = Arc::new(TestObserver::new("normal"));
548
549        observable.attach(failing_observer.clone());
550        observable.attach(normal_observer.clone());
551
552        // 第一次更新应该成功
553        assert!(observable.update_value(1).is_ok());
554        assert_eq!(failing_observer.get_call_count(), 1);
555        assert_eq!(normal_observer.get_last_value(), 1);
556
557        // 第二次更新应该失败(StopOnError 策略)
558        assert!(observable.update_value(2).is_err());
559        assert_eq!(failing_observer.get_call_count(), 2);
560        assert_eq!(normal_observer.get_last_value(), 1); // 正常观察者不应该收到第二次通知
561    }
562
563    #[test]
564    fn test_notify_ignore_error() {
565        // 为 IgnoreErrorObservable 创建专门的观察者
566        struct IgnoreErrorObservable {
567            registry: ObserverRegistry<Self>,
568            value: i32,
569        }
570
571        struct IgnoreErrorObserver {
572            call_count: AtomicUsize,
573        }
574
575        impl IgnoreErrorObserver {
576            fn new() -> Self {
577                Self {
578                    call_count: AtomicUsize::new(0),
579                }
580            }
581
582            fn get_call_count(&self) -> usize {
583                self.call_count.load(Ordering::SeqCst)
584            }
585        }
586
587        impl Observer for IgnoreErrorObserver {
588            type Subject = IgnoreErrorObservable;
589
590            fn update(&self, _value: &i32) -> Result<(), String> {
591                let count = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
592                if count >= 2 {
593                    Err(format!("Failed after {} calls", count))
594                } else {
595                    Ok(())
596                }
597            }
598        }
599
600        struct NormalObserver {
601            last_value: AtomicUsize,
602        }
603
604        impl NormalObserver {
605            fn new() -> Self {
606                Self {
607                    last_value: AtomicUsize::new(0),
608                }
609            }
610
611            fn get_last_value(&self) -> usize {
612                self.last_value.load(Ordering::SeqCst)
613            }
614        }
615
616        impl Observer for NormalObserver {
617            type Subject = IgnoreErrorObservable;
618
619            fn update(&self, value: &i32) -> Result<(), String> {
620                self.last_value.store(*value as usize, Ordering::SeqCst);
621                Ok(())
622            }
623        }
624
625        impl IgnoreErrorObservable {
626            fn new(initial_value: i32) -> Self {
627                Self {
628                    registry: ObserverRegistry::new(),
629                    value: initial_value,
630                }
631            }
632
633            fn update_value(&mut self, new_value: i32) {
634                self.value = new_value;
635                self.registry.notify_ignore_error(&self.value);
636            }
637        }
638
639        impl Observable for IgnoreErrorObservable {
640            type State = i32;
641            type Error = String;
642
643            fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
644                self.registry.attach(observer);
645            }
646
647            fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
648                self.registry.detach(observer);
649            }
650        }
651
652        let mut observable = IgnoreErrorObservable::new(0);
653        let failing_observer = Arc::new(IgnoreErrorObserver::new());
654        let normal_observer = Arc::new(NormalObserver::new());
655
656        observable.attach(failing_observer.clone());
657        observable.attach(normal_observer.clone());
658
659        // 第一次更新应该成功
660        observable.update_value(1);
661        assert_eq!(failing_observer.get_call_count(), 1);
662        assert_eq!(normal_observer.get_last_value(), 1);
663
664        // 第二次更新应该成功(IgnoreError 策略)
665        observable.update_value(2);
666        assert_eq!(failing_observer.get_call_count(), 2);
667        assert_eq!(normal_observer.get_last_value(), 2); // 正常观察者应该收到第二次通知
668    }
669
670    #[test]
671    fn test_observer_weak_references() {
672        let mut observable = TestObservable::new(0);
673
674        {
675            let observer = Arc::new(TestObserver::new("temp"));
676            observable.attach(observer.clone());
677
678            // 更新一次
679            assert!(observable.update_value(50).is_ok());
680            assert_eq!(observer.get_last_value(), 50);
681        } // observer 在这里被丢弃
682
683        // 再次更新,应该仍然工作(弱引用会被清理)
684        assert!(observable.update_value(60).is_ok());
685    }
686
687    #[test]
688    fn test_duplicate_attach() {
689        let mut observable = TestObservable::new(0);
690        let observer = Arc::new(TestObserver::new("test"));
691
692        // 多次附加同一个观察者
693        observable.attach(observer.clone());
694        observable.attach(observer.clone());
695        observable.attach(observer.clone());
696
697        // 应该只通知一次
698        assert!(observable.update_value(99).is_ok());
699        assert_eq!(observer.get_last_value(), 99);
700    }
701
702    #[test]
703    fn test_detach_non_existent() {
704        let mut observable = TestObservable::new(0);
705        let observer = Arc::new(TestObserver::new("test"));
706        let another_observer = Arc::new(TestObserver::new("another"));
707
708        // 附加一个观察者
709        observable.attach(observer.clone());
710
711        // 尝试分离一个未附加的观察者(应该没有效果)
712        observable.detach(another_observer.clone());
713
714        // 更新应该仍然工作
715        assert!(observable.update_value(33).is_ok());
716        assert_eq!(observer.get_last_value(), 33);
717    }
718
719    #[test]
720    fn test_notify_with_no_observers() {
721        let mut observable = TestObservable::new(0);
722
723        // 没有观察者时更新应该成功
724        assert!(observable.update_value(77).is_ok());
725        assert_eq!(observable.get_value(), 77);
726    }
727}