rust_pattern_components/observer.rs
1use std::sync::{Arc, Weak};
2
3/// 观察者 trait
4///
5/// 定义观察者必须实现的接口。观察者可以订阅被观察者的状态变化,
6/// 并在状态更新时通过 `update` 方法接收通知。
7///
8/// # 关联类型
9///
10/// - `Subject`: 观察者订阅的被观察者类型,必须实现 [`Observable`] trait
11///
12/// # 方法
13///
14/// - `update`: 接收状态更新通知
15///
16/// # 实现要求
17///
18/// 实现者需要指定具体的被观察者类型,并实现 `update` 方法。
19/// `update` 方法应该快速返回,避免阻塞通知过程。
20///
21/// # 线程安全
22///
23/// 实现者应确保 `update` 方法是线程安全的,因为可能从多个线程调用。
24///
25/// # 示例
26///
27/// ```
28/// use std::sync::Arc;
29/// use rust_patterns_components::{Observer, Observable};
30///
31/// struct TemperatureSensor;
32///
33/// impl Observable for TemperatureSensor {
34/// type State = f64;
35/// type Error = String;
36///
37/// fn attach(&mut self, _observer: Arc<dyn Observer<Subject = Self>>) {}
38/// fn detach(&mut self, _observer: Arc<dyn Observer<Subject = Self>>) {}
39/// }
40///
41/// struct TemperatureDisplay;
42///
43/// impl Observer for TemperatureDisplay {
44/// type Subject = TemperatureSensor;
45///
46/// fn update(&self, state: &f64) -> Result<(), String> {
47/// println!("当前温度: {:.1}°C", state);
48/// Ok(())
49/// }
50/// }
51/// ```
52pub trait Observer {
53 /// 观察者订阅的被观察者类型
54 ///
55 /// 此类型必须实现 [`Observable`] trait,定义了观察者关注的状态和错误类型。
56 type Subject: Observable;
57
58 /// 接收状态更新通知
59 ///
60 /// 当被观察者状态发生变化时调用此方法。实现者应该:
61 ///
62 /// 1. 处理传入的状态
63 /// 2. 返回 `Ok(())` 表示处理成功
64 /// 3. 返回 `Err(error)` 表示处理失败
65 ///
66 /// # 参数
67 ///
68 /// - `state`: 当前的被观察者状态引用
69 ///
70 /// # 返回值
71 ///
72 /// - `Ok(())`: 成功处理状态更新
73 /// - `Err(<Self::Subject as Observable>::Error)`: 处理状态更新时发生错误
74 ///
75 /// # 错误处理
76 ///
77 /// 如果此方法返回错误,被观察者的 `notify` 方法会根据指定的通知策略
78 /// 决定是否继续通知其他观察者。
79 fn update(
80 &self,
81 state: &<Self::Subject as Observable>::State,
82 ) -> Result<(), <Self::Subject as Observable>::Error>;
83}
84
85/// 被观察者 trait
86///
87/// 定义被观察者必须实现的接口。被观察者维护一组观察者,
88/// 并在状态变化时通知它们。
89///
90/// # 关联类型
91///
92/// - `State`: 被观察者的状态类型,观察者通过此类型接收状态更新
93/// - `Error`: 观察者处理更新时可能返回的错误类型
94///
95/// # 方法
96///
97/// - `attach`: 附加观察者到被观察者
98/// - `detach`: 从被观察者分离观察者
99///
100/// # 实现要求
101///
102/// 实现者需要指定状态类型和错误类型,并实现观察者管理方法。
103/// 通常建议使用 [`ObserverRegistry`] 来简化实现。
104///
105/// # 线程安全
106///
107/// 实现者应确保观察者管理方法是线程安全的,因为可能从多个线程调用。
108///
109/// # 示例
110///
111/// ```
112/// use std::sync::Arc;
113/// use rust_patterns_components::{Observable, Observer, ObserverRegistry};
114///
115/// struct WeatherStation {
116/// registry: ObserverRegistry<Self>,
117/// }
118///
119/// impl Observable for WeatherStation {
120/// type State = f64;
121/// type Error = String;
122///
123/// fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
124/// self.registry.attach(observer);
125/// }
126///
127/// fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
128/// self.registry.detach(observer);
129/// }
130/// }
131/// ```
132pub trait Observable {
133 /// 被观察者的状态类型
134 ///
135 /// 当状态变化时,会传递此类型的值给观察者。
136 type State;
137
138 /// 观察者处理更新时可能返回的错误类型
139 ///
140 /// 如果观察者处理更新失败,可以返回此类型的错误。
141 type Error;
142
143 /// 附加观察者
144 ///
145 /// 将观察者附加到被观察者。附加后,观察者将收到状态更新通知。
146 ///
147 /// # 参数
148 ///
149 /// - `observer`: 要附加的观察者强引用
150 fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>);
151
152 /// 分离观察者
153 ///
154 /// 从被观察者分离指定的观察者。分离后,该观察者将不再收到状态更新通知。
155 ///
156 /// # 参数
157 ///
158 /// - `observer`: 要分离的观察者强引用
159 fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>);
160}
161
162/// 观察者注册表
163///
164/// 管理一组观察者并在状态变化时通知它们。注册表维护观察者的弱引用列表,
165/// 避免强引用循环导致的内存泄漏。
166///
167/// # 类型参数
168///
169/// - `T`: 被观察者类型,必须实现 [`Observable`] trait
170///
171/// # 设计特点
172///
173/// - **弱引用管理**: 使用 `Weak` 引用存储观察者,允许观察者在不再需要时被释放
174/// - **自动清理**: 在通知时自动跳过已释放的观察者弱引用
175/// - **通知策略**: 支持两种通知策略
176/// - **防止重复**: 检查观察者是否已存在,防止重复添加
177///
178/// # 示例
179///
180/// ```
181/// use std::sync::Arc;
182/// use rust_patterns_components::{Observable, Observer, ObserverRegistry};
183///
184/// struct Sensor;
185///
186/// impl Observable for Sensor {
187/// type State = String;
188/// type Error = String;
189///
190/// fn attach(&mut self, _observer: Arc<dyn Observer<Subject = Self>>) {}
191/// fn detach(&mut self, _observer: Arc<dyn Observer<Subject = Self>>) {}
192/// }
193///
194/// let mut registry = ObserverRegistry::<Sensor>::default();
195/// // 使用 registry.attach() 添加观察者
196/// // 使用 registry.notify() 通知观察者
197/// ```
198pub struct ObserverRegistry<T: Observable> {
199 /// 观察者弱引用列表
200 ///
201 /// 使用弱引用避免循环引用。当观察者被释放时,
202 /// 对应的弱引用会自动变为无效。
203 observers: Vec<Weak<dyn Observer<Subject = T>>>,
204}
205
206impl<T> ObserverRegistry<T>
207where
208 T: Observable,
209{
210 /// 创建新的观察者注册表实例
211 ///
212 /// # 返回值
213 ///
214 /// 返回一个空的观察者注册表,不包含任何观察者。
215 pub fn new() -> Self {
216 Self {
217 observers: Vec::new(),
218 }
219 }
220
221 /// 创建具有初始容量的观察者注册表实例
222 ///
223 /// # 参数
224 ///
225 /// - `capacity`: 初始容量,用于预分配内存
226 ///
227 /// # 返回值
228 ///
229 /// 返回一个具有指定初始容量的空注册表。
230 ///
231 /// # 性能
232 ///
233 /// 预分配容量可以避免后续添加观察者时的多次内存重新分配,
234 /// 当已知观察者数量时建议使用此方法。
235 pub fn with_capacity(capacity: usize) -> Self {
236 Self {
237 observers: Vec::with_capacity(capacity),
238 }
239 }
240
241 /// 附加观察者
242 ///
243 /// 将观察者附加到注册表。方法内部会将观察者的强引用转换为弱引用,
244 /// 并确保不会重复添加相同的观察者。
245 ///
246 /// # 参数
247 ///
248 /// - `observer`: 要附加的观察者强引用
249 ///
250 /// # 注意
251 ///
252 /// - 使用 `Arc::downgrade` 将强引用转换为弱引用,避免循环引用
253 /// - 使用 `Weak::ptr_eq` 检查观察者是否已存在,防止重复添加
254 /// - 观察者将在下次调用 `notify` 方法时收到状态更新通知
255 pub fn attach(&mut self, observer: Arc<dyn Observer<Subject = T>>) {
256 let weak = Arc::downgrade(&observer);
257 if !self.observers.iter().any(|item| weak.ptr_eq(item)) {
258 self.observers.push(weak);
259 }
260 }
261
262 /// 分离观察者
263 ///
264 /// 从注册表中分离指定的观察者。分离后,该观察者将不再收到状态更新通知。
265 ///
266 /// # 参数
267 ///
268 /// - `observer`: 要分离的观察者强引用
269 ///
270 /// # 注意
271 ///
272 /// - 使用 `Arc::downgrade` 将强引用转换为弱引用进行匹配
273 /// - 使用 `Weak::ptr_eq` 进行引用相等性比较
274 /// - 如果观察者不存在于列表中,此方法不会有任何效果
275 pub fn detach(&mut self, observer: Arc<dyn Observer<Subject = T>>) {
276 let weak = Arc::downgrade(&observer);
277 self.observers.retain(|item| !weak.ptr_eq(item));
278 }
279
280 /// 通知所有观察者
281 ///
282 /// 向所有有效的观察者发送状态更新通知。
283 ///
284 /// # 参数
285 ///
286 /// - `state`: 要通知的新状态引用
287 ///
288 /// # 返回值
289 ///
290 /// - `Ok(())`: 所有观察者都成功处理了更新
291 /// - `Err(<T as Observable>::Error)`: 某个观察者处理更新时返回了错误,立即停止通知其他观察者
292 ///
293 /// # 行为
294 ///
295 /// - 遍历所有观察者并调用其 `update` 方法
296 /// - 当某个观察者返回错误时,立即停止并返回该错误
297 /// - 自动清理无效的观察者弱引用(通过 `Weak::upgrade` 过滤)
298 ///
299 /// # 性能
300 ///
301 /// 此方法会自动清理无效的观察者弱引用(通过 `Weak::upgrade` 过滤)。
302 pub fn notify(&self, state: &<T as Observable>::State) -> Result<(), <T as Observable>::Error> {
303 self.observers
304 .iter()
305 .flat_map(Weak::upgrade)
306 .try_for_each(|observer| observer.update(state))
307 }
308
309 /// 通知所有观察者状态变化,忽略错误
310 ///
311 /// 此方法会通知所有已注册的观察者状态变化,但会忽略任何观察者返回的错误。
312 /// 即使某个观察者处理更新失败,也会继续通知其他观察者。
313 ///
314 /// # 参数
315 ///
316 /// * `state` - 要通知的状态变化
317 ///
318 /// # 行为
319 ///
320 /// - 遍历所有观察者并调用其 `update` 方法
321 /// - 忽略所有观察者返回的错误(使用 `let _ = ...`)
322 /// - 自动清理无效的观察者弱引用(通过 `Weak::upgrade` 过滤)
323 ///
324 /// # 使用场景
325 ///
326 /// 适用于以下情况:
327 /// - 观察者的错误不应该阻止其他观察者接收通知
328 /// - 错误处理不是关键,可以安全忽略
329 /// - 需要确保所有观察者都能收到通知,即使某些观察者可能失败
330 ///
331 /// # 与 `notify` 方法的区别
332 ///
333 /// - `notify`: 遇到第一个错误就停止,并返回该错误
334 /// - `notify_ignore_error`: 忽略所有错误,继续通知所有观察者
335 ///
336 /// # 示例
337 ///
338 /// ```rust
339 /// use rust_pattern_components::{Observable, Observer, ObserverRegistry};
340 /// use std::sync::{Arc, Weak};
341 ///
342 /// struct Counter {
343 /// registry: ObserverRegistry<Self>,
344 /// value: u64,
345 /// }
346 ///
347 /// impl Observable for Counter {
348 /// type State = u64;
349 /// type Error = String;
350 ///
351 /// fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
352 /// self.registry.attach(observer);
353 /// }
354 ///
355 /// fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
356 /// self.registry.detach(observer);
357 /// }
358 /// }
359 ///
360 /// let counter = Counter {
361 /// registry: ObserverRegistry::new(),
362 /// value: 42,
363 /// };
364 ///
365 /// // 即使观察者可能失败,也会通知所有观察者
366 /// counter.registry.notify_ignore_error(&counter.value);
367 /// ```
368 pub fn notify_ignore_error(&self, state: &<T as Observable>::State) {
369 self.observers
370 .iter()
371 .flat_map(Weak::upgrade)
372 .for_each(|observer| {
373 let _ = observer.update(state);
374 })
375 }
376}
377
378impl<T> Default for ObserverRegistry<T>
379where
380 T: Observable,
381{
382 /// 创建默认的观察者注册表实例
383 ///
384 /// 等同于调用 [`ObserverRegistry::new()`]。
385 fn default() -> Self {
386 Self {
387 observers: Default::default(),
388 }
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395 use std::sync::atomic::{AtomicUsize, Ordering};
396
397 // 测试用的被观察者
398 struct TestObservable {
399 registry: ObserverRegistry<Self>,
400 value: i32,
401 }
402
403 impl TestObservable {
404 fn new(initial_value: i32) -> Self {
405 Self {
406 registry: ObserverRegistry::new(),
407 value: initial_value,
408 }
409 }
410
411 fn update_value(&mut self, new_value: i32) -> Result<(), String> {
412 self.value = new_value;
413 self.registry.notify(&self.value)
414 }
415
416 fn get_value(&self) -> i32 {
417 self.value
418 }
419 }
420
421 impl Observable for TestObservable {
422 type State = i32;
423 type Error = String;
424
425 fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
426 self.registry.attach(observer);
427 }
428
429 fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
430 self.registry.detach(observer);
431 }
432 }
433
434 // 简单的测试观察者
435 struct TestObserver {
436 _name: String,
437 last_value: AtomicUsize,
438 }
439
440 impl TestObserver {
441 fn new(name: &str) -> Self {
442 Self {
443 _name: name.to_string(),
444 last_value: AtomicUsize::new(0),
445 }
446 }
447
448 fn get_last_value(&self) -> usize {
449 self.last_value.load(Ordering::SeqCst)
450 }
451 }
452
453 impl Observer for TestObserver {
454 type Subject = TestObservable;
455
456 fn update(&self, value: &i32) -> Result<(), String> {
457 self.last_value.store(*value as usize, Ordering::SeqCst);
458 Ok(())
459 }
460 }
461
462 // 可能失败的测试观察者
463 struct FailingObserver {
464 fail_after: usize,
465 call_count: AtomicUsize,
466 }
467
468 impl FailingObserver {
469 fn new(fail_after: usize) -> Self {
470 Self {
471 fail_after,
472 call_count: AtomicUsize::new(0),
473 }
474 }
475
476 fn get_call_count(&self) -> usize {
477 self.call_count.load(Ordering::SeqCst)
478 }
479 }
480
481 impl Observer for FailingObserver {
482 type Subject = TestObservable;
483
484 fn update(&self, _value: &i32) -> Result<(), String> {
485 let count = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
486 if count >= self.fail_after {
487 Err(format!("Failed after {} calls", count))
488 } else {
489 Ok(())
490 }
491 }
492 }
493
494 #[test]
495 fn test_attach_and_notify() {
496 let mut observable = TestObservable::new(0);
497 let observer = Arc::new(TestObserver::new("test"));
498
499 // 附加观察者
500 observable.attach(observer.clone());
501
502 // 更新值并通知
503 assert!(observable.update_value(42).is_ok());
504 assert_eq!(observable.get_value(), 42);
505 assert_eq!(observer.get_last_value(), 42);
506 }
507
508 #[test]
509 fn test_detach() {
510 let mut observable = TestObservable::new(0);
511 let observer = Arc::new(TestObserver::new("test"));
512
513 // 附加观察者
514 observable.attach(observer.clone());
515
516 // 更新一次
517 assert!(observable.update_value(10).is_ok());
518 assert_eq!(observer.get_last_value(), 10);
519
520 // 分离观察者
521 observable.detach(observer.clone());
522
523 // 再次更新,观察者不应该收到通知
524 assert!(observable.update_value(20).is_ok());
525 assert_eq!(observer.get_last_value(), 10); // 仍然是旧值
526 }
527
528 #[test]
529 fn test_multiple_observers() {
530 let mut observable = TestObservable::new(0);
531 let observer1 = Arc::new(TestObserver::new("observer1"));
532 let observer2 = Arc::new(TestObserver::new("observer2"));
533
534 observable.attach(observer1.clone());
535 observable.attach(observer2.clone());
536
537 assert!(observable.update_value(100).is_ok());
538
539 assert_eq!(observer1.get_last_value(), 100);
540 assert_eq!(observer2.get_last_value(), 100);
541 }
542
543 #[test]
544 fn test_notify_stop_on_error() {
545 let mut observable = TestObservable::new(0);
546 let failing_observer = Arc::new(FailingObserver::new(2)); // 第二次调用失败
547 let normal_observer = Arc::new(TestObserver::new("normal"));
548
549 observable.attach(failing_observer.clone());
550 observable.attach(normal_observer.clone());
551
552 // 第一次更新应该成功
553 assert!(observable.update_value(1).is_ok());
554 assert_eq!(failing_observer.get_call_count(), 1);
555 assert_eq!(normal_observer.get_last_value(), 1);
556
557 // 第二次更新应该失败(StopOnError 策略)
558 assert!(observable.update_value(2).is_err());
559 assert_eq!(failing_observer.get_call_count(), 2);
560 assert_eq!(normal_observer.get_last_value(), 1); // 正常观察者不应该收到第二次通知
561 }
562
563 #[test]
564 fn test_notify_ignore_error() {
565 // 为 IgnoreErrorObservable 创建专门的观察者
566 struct IgnoreErrorObservable {
567 registry: ObserverRegistry<Self>,
568 value: i32,
569 }
570
571 struct IgnoreErrorObserver {
572 call_count: AtomicUsize,
573 }
574
575 impl IgnoreErrorObserver {
576 fn new() -> Self {
577 Self {
578 call_count: AtomicUsize::new(0),
579 }
580 }
581
582 fn get_call_count(&self) -> usize {
583 self.call_count.load(Ordering::SeqCst)
584 }
585 }
586
587 impl Observer for IgnoreErrorObserver {
588 type Subject = IgnoreErrorObservable;
589
590 fn update(&self, _value: &i32) -> Result<(), String> {
591 let count = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
592 if count >= 2 {
593 Err(format!("Failed after {} calls", count))
594 } else {
595 Ok(())
596 }
597 }
598 }
599
600 struct NormalObserver {
601 last_value: AtomicUsize,
602 }
603
604 impl NormalObserver {
605 fn new() -> Self {
606 Self {
607 last_value: AtomicUsize::new(0),
608 }
609 }
610
611 fn get_last_value(&self) -> usize {
612 self.last_value.load(Ordering::SeqCst)
613 }
614 }
615
616 impl Observer for NormalObserver {
617 type Subject = IgnoreErrorObservable;
618
619 fn update(&self, value: &i32) -> Result<(), String> {
620 self.last_value.store(*value as usize, Ordering::SeqCst);
621 Ok(())
622 }
623 }
624
625 impl IgnoreErrorObservable {
626 fn new(initial_value: i32) -> Self {
627 Self {
628 registry: ObserverRegistry::new(),
629 value: initial_value,
630 }
631 }
632
633 fn update_value(&mut self, new_value: i32) {
634 self.value = new_value;
635 self.registry.notify_ignore_error(&self.value);
636 }
637 }
638
639 impl Observable for IgnoreErrorObservable {
640 type State = i32;
641 type Error = String;
642
643 fn attach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
644 self.registry.attach(observer);
645 }
646
647 fn detach(&mut self, observer: Arc<dyn Observer<Subject = Self>>) {
648 self.registry.detach(observer);
649 }
650 }
651
652 let mut observable = IgnoreErrorObservable::new(0);
653 let failing_observer = Arc::new(IgnoreErrorObserver::new());
654 let normal_observer = Arc::new(NormalObserver::new());
655
656 observable.attach(failing_observer.clone());
657 observable.attach(normal_observer.clone());
658
659 // 第一次更新应该成功
660 observable.update_value(1);
661 assert_eq!(failing_observer.get_call_count(), 1);
662 assert_eq!(normal_observer.get_last_value(), 1);
663
664 // 第二次更新应该成功(IgnoreError 策略)
665 observable.update_value(2);
666 assert_eq!(failing_observer.get_call_count(), 2);
667 assert_eq!(normal_observer.get_last_value(), 2); // 正常观察者应该收到第二次通知
668 }
669
670 #[test]
671 fn test_observer_weak_references() {
672 let mut observable = TestObservable::new(0);
673
674 {
675 let observer = Arc::new(TestObserver::new("temp"));
676 observable.attach(observer.clone());
677
678 // 更新一次
679 assert!(observable.update_value(50).is_ok());
680 assert_eq!(observer.get_last_value(), 50);
681 } // observer 在这里被丢弃
682
683 // 再次更新,应该仍然工作(弱引用会被清理)
684 assert!(observable.update_value(60).is_ok());
685 }
686
687 #[test]
688 fn test_duplicate_attach() {
689 let mut observable = TestObservable::new(0);
690 let observer = Arc::new(TestObserver::new("test"));
691
692 // 多次附加同一个观察者
693 observable.attach(observer.clone());
694 observable.attach(observer.clone());
695 observable.attach(observer.clone());
696
697 // 应该只通知一次
698 assert!(observable.update_value(99).is_ok());
699 assert_eq!(observer.get_last_value(), 99);
700 }
701
702 #[test]
703 fn test_detach_non_existent() {
704 let mut observable = TestObservable::new(0);
705 let observer = Arc::new(TestObserver::new("test"));
706 let another_observer = Arc::new(TestObserver::new("another"));
707
708 // 附加一个观察者
709 observable.attach(observer.clone());
710
711 // 尝试分离一个未附加的观察者(应该没有效果)
712 observable.detach(another_observer.clone());
713
714 // 更新应该仍然工作
715 assert!(observable.update_value(33).is_ok());
716 assert_eq!(observer.get_last_value(), 33);
717 }
718
719 #[test]
720 fn test_notify_with_no_observers() {
721 let mut observable = TestObservable::new(0);
722
723 // 没有观察者时更新应该成功
724 assert!(observable.update_value(77).is_ok());
725 assert_eq!(observable.get_value(), 77);
726 }
727}