rxrust/observable/
defer.rs

1use crate::prelude::*;
2
3/// Creates an observable that will on subscription defer to another observable
4/// that is supplied by a supplier-function which will be run once at each
5/// subscription
6///
7/// ```rust
8/// # use rxrust::prelude::*;
9///
10/// observable::defer(|| {
11///   println!("Hi!");
12///   observable::of("Hello!")
13/// })
14///   .subscribe(move |v| {
15///     println!("{}", v);
16///   });
17/// // Prints: Hi!\nHello!\n
18/// ```
19pub fn defer<F, Item, Err, Emit>(
20  observable_supplier: F,
21) -> ObservableBase<DeferEmitter<F, Item, Err>>
22where
23  F: FnOnce() -> ObservableBase<Emit>,
24  Emit: Emitter,
25{
26  ObservableBase::new(DeferEmitter(observable_supplier, TypeHint::new()))
27}
28
29#[derive(Clone)]
30pub struct DeferEmitter<F, Item, Err>(F, TypeHint<(Item, Err)>);
31
32impl<F, Item, Err> Emitter for DeferEmitter<F, Item, Err> {
33  type Item = Item;
34  type Err = Err;
35}
36
37impl<'a, F, Emit, Item, Err> LocalEmitter<'a> for DeferEmitter<F, Item, Err>
38where
39  F: FnOnce() -> Emit,
40  Emit: LocalObservable<'a> + observable::Observable<Item = Item, Err = Err>,
41{
42  fn emit<O>(self, subscriber: Subscriber<O, LocalSubscription>)
43  where
44    O: Observer<Item = Self::Item, Err = Self::Err> + 'a,
45  {
46    (self.0)().actual_subscribe(subscriber);
47  }
48}
49
50impl<F, Item: 'static, Emit, Err: 'static> SharedEmitter
51  for DeferEmitter<F, Item, Err>
52where
53  F: FnOnce() -> Emit,
54  Emit: SharedObservable + observable::Observable<Item = Item, Err = Err>,
55{
56  fn emit<O>(self, subscriber: Subscriber<O, SharedSubscription>)
57  where
58    O: Observer<Item = Self::Item, Err = Self::Err> + Send + Sync + 'static,
59  {
60    (self.0)().actual_subscribe(subscriber);
61  }
62}
63
64#[cfg(test)]
65mod test {
66  use std::ops::Deref;
67  use std::sync::{Arc, Mutex};
68
69  use crate::prelude::*;
70  use bencher::Bencher;
71
72  #[test]
73  fn no_results_before_deferred_subscribe() {
74    let calls = Arc::new(Mutex::new(0));
75    let sum = Arc::new(Mutex::new(0));
76    let errs = Arc::new(Mutex::new(0));
77    let completes = Arc::new(Mutex::new(0));
78
79    let deferred = observable::defer(|| {
80      *calls.lock().unwrap() += 1;
81      observable::of(&2)
82    })
83    .into_shared();
84
85    assert_eq!(calls.lock().unwrap().deref(), &0);
86
87    for i in 1..4 {
88      let sum_copy = Arc::clone(&sum);
89      let errs_copy = Arc::clone(&errs);
90      let completes_copy = Arc::clone(&completes);
91      deferred.clone().subscribe_all(
92        move |v| *sum_copy.lock().unwrap() += v,
93        move |_| *errs_copy.lock().unwrap() += 1,
94        move || *completes_copy.lock().unwrap() += 1,
95      );
96      assert_eq!(*calls.lock().unwrap(), i);
97    }
98
99    assert_eq!(*calls.lock().unwrap().deref(), 3);
100    assert_eq!(*sum.lock().unwrap().deref(), 6);
101    assert_eq!(*errs.lock().unwrap().deref(), 0);
102    assert_eq!(*completes.lock().unwrap().deref(), 3);
103  }
104
105  #[test]
106  fn support_fork() {
107    let calls = Arc::new(Mutex::new(0));
108    let o = observable::defer(|| {
109      *calls.lock().unwrap() += 1;
110      observable::of(10)
111    });
112    let sum1 = Arc::new(Mutex::new(0));
113    let sum2 = Arc::new(Mutex::new(0));
114    let c_sum1 = sum1.clone();
115    let c_sum2 = sum2.clone();
116    o.clone().subscribe(move |v| *sum1.lock().unwrap() += v);
117    o.clone().subscribe(move |v| *sum2.lock().unwrap() += v);
118
119    assert_eq!(*c_sum1.lock().unwrap(), 10);
120    assert_eq!(*c_sum2.lock().unwrap(), 10);
121    assert_eq!(*calls.lock().unwrap().deref(), 2);
122  }
123
124  #[test]
125  fn fork_and_share() {
126    let observable = observable::defer(observable::empty);
127    observable.clone().into_shared().subscribe(|_: i32| {});
128    observable.into_shared().subscribe(|_| {});
129
130    let observable = observable::defer(observable::empty).into_shared();
131    observable.clone().subscribe(|_: i32| {});
132    observable.subscribe(|_| {});
133  }
134
135  #[test]
136  fn bench() { do_bench(); }
137
138  benchmark_group!(do_bench, bench_deref);
139
140  fn bench_deref(b: &mut Bencher) {
141    b.iter(no_results_before_deferred_subscribe);
142  }
143}