Skip to main content

rust_patterns/
observer.rs

1use std::sync::{Arc, Weak};
2
3/// 观察者 trait
4///
5/// 定义观察者必须实现的接口。观察者可以订阅主题的状态变化,
6/// 并在状态更新时通过 `update` 方法接收通知。
7///
8/// # 类型参数
9///
10/// - `State`: 观察者关注的状态类型
11/// - `Error`: 观察者处理更新时可能返回的错误类型
12///
13/// # 实现要求
14///
15/// 实现者需要提供具体的状态类型和错误类型,并实现 `update` 方法。
16/// `update` 方法应该快速返回,避免阻塞通知过程。
17///
18/// # 线程安全
19///
20/// 实现者应确保 `update` 方法是线程安全的,因为可能从多个线程调用。
21pub trait Observer {
22    /// 观察者关注的状态类型
23    ///
24    /// 当主题状态变化时,会传递此类型的值给观察者。
25    type State;
26
27    /// 观察者处理更新时可能返回的错误类型
28    ///
29    /// 如果观察者处理更新失败,可以返回此类型的错误。
30    type Error;
31
32    /// 接收状态更新通知
33    ///
34    /// 当主题状态发生变化时调用此方法。实现者应该:
35    ///
36    /// 1. 处理传入的状态
37    /// 2. 返回 `Ok(())` 表示处理成功
38    /// 3. 返回 `Err(error)` 表示处理失败
39    ///
40    /// # 参数
41    ///
42    /// - `state`: 当前的主题状态引用
43    ///
44    /// # 返回值
45    ///
46    /// - `Ok(())`: 成功处理状态更新
47    /// - `Err(Self::Error)`: 处理状态更新时发生错误
48    ///
49    /// # 错误处理
50    ///
51    /// 如果此方法返回错误,主题的 `notify` 方法会根据指定的通知策略
52    /// 决定是否继续通知其他观察者。
53    fn update(&self, state: &Self::State) -> Result<(), Self::Error>;
54}
55
56/// 通知策略
57///
58/// 定义当观察者处理更新失败时的行为。
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum NotifyStrategy {
61    /// 立即停止并返回错误
62    ///
63    /// 这是默认策略,当一个观察者失败时立即停止通知过程。
64    StopOnError,
65
66    /// 忽略错误并继续通知
67    ///
68    /// 即使某个观察者失败,也继续通知其他观察者。
69    /// 错误会被忽略,继续执行。
70    IgnoreError,
71}
72
73/// 主题(被观察者)
74///
75/// 管理一组观察者并在状态变化时通知它们。主题维护观察者的弱引用列表,
76/// 避免强引用循环导致的内存泄漏。
77///
78/// # 类型参数
79///
80/// - `T`: 状态类型,必须与观察者的 `State` 类型匹配
81/// - `E`: 错误类型,必须与观察者的 `Error` 类型匹配
82///
83/// # 设计特点
84///
85/// - **弱引用管理**: 使用 `Weak` 引用存储观察者,允许观察者在不再需要时被释放
86/// - **自动清理**: 可选地自动清理已释放的观察者弱引用
87/// - **通知策略**: 支持两种通知策略
88pub struct Subject<T, E> {
89    /// 观察者弱引用列表
90    ///
91    /// 使用弱引用避免循环引用。当观察者被释放时,
92    /// 对应的弱引用会自动变为无效。
93    observers: Vec<Weak<dyn Observer<State = T, Error = E>>>,
94}
95
96impl<T, E> Subject<T, E> {
97    /// 创建新的主题实例
98    ///
99    /// # 返回值
100    ///
101    /// 返回一个空的主题,不包含任何观察者。
102    pub fn new() -> Self {
103        Self {
104            observers: Vec::new(),
105        }
106    }
107
108    /// 创建具有初始容量的主题实例
109    ///
110    /// # 参数
111    ///
112    /// - `capacity`: 初始容量,用于预分配内存
113    ///
114    /// # 返回值
115    ///
116    /// 返回一个具有指定初始容量的空主题。
117    ///
118    /// # 性能
119    ///
120    /// 预分配容量可以避免后续添加观察者时的多次内存重新分配,
121    /// 当已知观察者数量时建议使用此方法。
122    pub fn with_capacity(capacity: usize) -> Self {
123        Self {
124            observers: Vec::with_capacity(capacity),
125        }
126    }
127
128    /// 附加观察者
129    ///
130    /// 将观察者附加到主题。方法内部会将观察者的强引用转换为弱引用,
131    /// 并确保不会重复添加相同的观察者。
132    ///
133    /// # 参数
134    ///
135    /// - `observer`: 要附加的观察者强引用
136    ///
137    /// # 注意
138    ///
139    /// - 使用 `Arc::downgrade` 将强引用转换为弱引用,避免循环引用
140    /// - 使用 `Weak::ptr_eq` 检查观察者是否已存在,防止重复添加
141    /// - 观察者将在下次调用 `notify` 方法时收到状态更新通知
142    pub fn attach(&mut self, observer: Arc<dyn Observer<State = T, Error = E>>) {
143        let weak = Arc::downgrade(&observer);
144        if !self.observers.iter().any(|item| item.ptr_eq(&weak)) {
145            self.observers.push(weak);
146        }
147    }
148
149    /// 分离观察者
150    ///
151    /// 从主题中分离指定的观察者。分离后,该观察者将不再收到状态更新通知。
152    ///
153    /// # 参数
154    ///
155    /// - `observer`: 要分离的观察者强引用
156    ///
157    /// # 注意
158    ///
159    /// - 使用 `Arc::downgrade` 将强引用转换为弱引用进行匹配
160    /// - 使用 `Weak::ptr_eq` 进行引用相等性比较
161    /// - 如果观察者不存在于列表中,此方法不会有任何效果
162    pub fn detach(&mut self, observer: Arc<dyn Observer<State = T, Error = E>>) {
163        let weak = Arc::downgrade(&observer);
164        self.observers.retain(|item| !item.ptr_eq(&weak));
165    }
166
167    /// 通知所有观察者
168    ///
169    /// 向所有有效的观察者发送状态更新通知。
170    ///
171    /// # 参数
172    ///
173    /// - `state`: 要通知的新状态引用
174    /// - `error_strategy`: 通知策略,指定观察者失败时的行为
175    ///   使用 `NotifyStrategy::StopOnError` 或 `NotifyStrategy::IgnoreError`
176    ///
177    /// # 返回值
178    ///
179    /// - `Ok(())`: 所有观察者都成功处理了更新,或错误被忽略
180    /// - `Err(E)`: 某个观察者处理更新时返回了错误(当使用 `StopOnError` 策略时)
181    ///
182    /// # 通知策略
183    ///
184    /// 根据指定的通知策略决定行为:
185    /// - `StopOnError`: 立即返回第一个错误,停止通知其他观察者
186    /// - `IgnoreError`: 忽略错误,继续通知其他观察者,总是返回 `Ok(())`
187    pub fn notify(&self, state: &T, strategy: NotifyStrategy) -> Result<(), E> {
188        self.observers
189            .iter()
190            .flat_map(Weak::upgrade)
191            .try_for_each(|observer| match observer.update(state) {
192                Ok(()) => Ok(()),
193                Err(e) => match strategy {
194                    NotifyStrategy::StopOnError => Err(e),
195                    NotifyStrategy::IgnoreError => Ok(()),
196                },
197            })
198    }
199}
200
201impl<T, E> Default for Subject<T, E> {
202    /// 创建默认的主题实例
203    ///
204    /// 等同于调用 `Subject::new()`。
205    fn default() -> Self {
206        Self::new()
207    }
208}