rxrust/observable/
observable_comp.rs1use crate::prelude::*;
2
3#[derive(Clone)]
4pub struct ObserverComp<N, C, Item> {
5 next: N,
6 complete: C,
7 is_stopped: bool,
8 _marker: TypeHint<*const Item>,
9}
10
11impl<N, C, Item> Observer for ObserverComp<N, C, Item>
12where
13 C: FnMut(),
14 N: FnMut(Item),
15{
16 type Item = Item;
17 type Err = ();
18 #[inline]
19 fn next(&mut self, value: Item) { (self.next)(value); }
20 #[inline]
21 fn error(&mut self, _err: ()) { self.is_stopped = true; }
22 fn complete(&mut self) {
23 (self.complete)();
24 self.is_stopped = true;
25 }
26 fn is_stopped(&self) -> bool { self.is_stopped }
27}
28
29pub trait SubscribeComplete<'a, N, C> {
30 type Unsub: SubscriptionLike;
32
33 fn subscribe_complete(
36 self,
37 next: N,
38 complete: C,
39 ) -> SubscriptionWrapper<Self::Unsub>;
40}
41
42impl<'a, S, N, C> SubscribeComplete<'a, N, C> for S
43where
44 S: LocalObservable<'a, Err = ()>,
45 C: FnMut() + 'a,
46 N: FnMut(S::Item) + 'a,
47 S::Item: 'a,
48{
49 type Unsub = S::Unsub;
50 fn subscribe_complete(
51 self,
52 next: N,
53 complete: C,
54 ) -> SubscriptionWrapper<Self::Unsub>
55 where
56 Self: Sized,
57 S::Item: 'a,
58 {
59 let unsub = self.actual_subscribe(Subscriber::local(ObserverComp {
60 next,
61 complete,
62 is_stopped: false,
63 _marker: TypeHint::new(),
64 }));
65 SubscriptionWrapper(unsub)
66 }
67}
68
69impl<'a, S, N, C> SubscribeComplete<'a, N, C> for Shared<S>
70where
71 S: SharedObservable<Err = ()>,
72 C: FnMut() + Send + Sync + 'static,
73 N: FnMut(S::Item) + Send + Sync + 'static,
74 S::Item: 'static,
75{
76 type Unsub = S::Unsub;
77 fn subscribe_complete(
78 self,
79 next: N,
80 complete: C,
81 ) -> SubscriptionWrapper<Self::Unsub>
82 where
83 Self: Sized,
84 {
85 let unsub = self.0.actual_subscribe(Subscriber::shared(ObserverComp {
86 next,
87 complete,
88 is_stopped: false,
89 _marker: TypeHint::new(),
90 }));
91 SubscriptionWrapper(unsub)
92 }
93}
94
95#[test]
96fn raii() {
97 let mut times = 0;
98 {
99 let mut subject = LocalSubject::new();
100 {
101 let _ = subject
102 .clone()
103 .subscribe_complete(|_| times += 1, || {})
104 .unsubscribe_when_dropped();
105 } subject.next(());
107 }
108 assert_eq!(times, 0);
109}