rxrust/observable/
defer.rs1use crate::prelude::*;
2
3pub fn defer<F, Item, Err, Emit>(
20 observable_supplier: F,
21) -> ObservableBase<DeferEmitter<F, Item, Err>>
22where
23 F: FnOnce() -> ObservableBase<Emit>,
24 Emit: Emitter,
25{
26 ObservableBase::new(DeferEmitter(observable_supplier, TypeHint::new()))
27}
28
29#[derive(Clone)]
30pub struct DeferEmitter<F, Item, Err>(F, TypeHint<(Item, Err)>);
31
32impl<F, Item, Err> Emitter for DeferEmitter<F, Item, Err> {
33 type Item = Item;
34 type Err = Err;
35}
36
37impl<'a, F, Emit, Item, Err> LocalEmitter<'a> for DeferEmitter<F, Item, Err>
38where
39 F: FnOnce() -> Emit,
40 Emit: LocalObservable<'a> + observable::Observable<Item = Item, Err = Err>,
41{
42 fn emit<O>(self, subscriber: Subscriber<O, LocalSubscription>)
43 where
44 O: Observer<Item = Self::Item, Err = Self::Err> + 'a,
45 {
46 (self.0)().actual_subscribe(subscriber);
47 }
48}
49
50impl<F, Item: 'static, Emit, Err: 'static> SharedEmitter
51 for DeferEmitter<F, Item, Err>
52where
53 F: FnOnce() -> Emit,
54 Emit: SharedObservable + observable::Observable<Item = Item, Err = Err>,
55{
56 fn emit<O>(self, subscriber: Subscriber<O, SharedSubscription>)
57 where
58 O: Observer<Item = Self::Item, Err = Self::Err> + Send + Sync + 'static,
59 {
60 (self.0)().actual_subscribe(subscriber);
61 }
62}
63
64#[cfg(test)]
65mod test {
66 use std::ops::Deref;
67 use std::sync::{Arc, Mutex};
68
69 use crate::prelude::*;
70 use bencher::Bencher;
71
72 #[test]
73 fn no_results_before_deferred_subscribe() {
74 let calls = Arc::new(Mutex::new(0));
75 let sum = Arc::new(Mutex::new(0));
76 let errs = Arc::new(Mutex::new(0));
77 let completes = Arc::new(Mutex::new(0));
78
79 let deferred = observable::defer(|| {
80 *calls.lock().unwrap() += 1;
81 observable::of(&2)
82 })
83 .into_shared();
84
85 assert_eq!(calls.lock().unwrap().deref(), &0);
86
87 for i in 1..4 {
88 let sum_copy = Arc::clone(&sum);
89 let errs_copy = Arc::clone(&errs);
90 let completes_copy = Arc::clone(&completes);
91 deferred.clone().subscribe_all(
92 move |v| *sum_copy.lock().unwrap() += v,
93 move |_| *errs_copy.lock().unwrap() += 1,
94 move || *completes_copy.lock().unwrap() += 1,
95 );
96 assert_eq!(*calls.lock().unwrap(), i);
97 }
98
99 assert_eq!(*calls.lock().unwrap().deref(), 3);
100 assert_eq!(*sum.lock().unwrap().deref(), 6);
101 assert_eq!(*errs.lock().unwrap().deref(), 0);
102 assert_eq!(*completes.lock().unwrap().deref(), 3);
103 }
104
105 #[test]
106 fn support_fork() {
107 let calls = Arc::new(Mutex::new(0));
108 let o = observable::defer(|| {
109 *calls.lock().unwrap() += 1;
110 observable::of(10)
111 });
112 let sum1 = Arc::new(Mutex::new(0));
113 let sum2 = Arc::new(Mutex::new(0));
114 let c_sum1 = sum1.clone();
115 let c_sum2 = sum2.clone();
116 o.clone().subscribe(move |v| *sum1.lock().unwrap() += v);
117 o.clone().subscribe(move |v| *sum2.lock().unwrap() += v);
118
119 assert_eq!(*c_sum1.lock().unwrap(), 10);
120 assert_eq!(*c_sum2.lock().unwrap(), 10);
121 assert_eq!(*calls.lock().unwrap().deref(), 2);
122 }
123
124 #[test]
125 fn fork_and_share() {
126 let observable = observable::defer(observable::empty);
127 observable.clone().into_shared().subscribe(|_: i32| {});
128 observable.into_shared().subscribe(|_| {});
129
130 let observable = observable::defer(observable::empty).into_shared();
131 observable.clone().subscribe(|_: i32| {});
132 observable.subscribe(|_| {});
133 }
134
135 #[test]
136 fn bench() { do_bench(); }
137
138 benchmark_group!(do_bench, bench_deref);
139
140 fn bench_deref(b: &mut Bencher) {
141 b.iter(no_results_before_deferred_subscribe);
142 }
143}