rxrust/observable/
from_fn.rs

1use crate::prelude::*;
2
3/// param `subscribe`: the function that is called when the Observable is
4/// initially subscribed to. This function is given a Subscriber, to which
5/// new values can be `next`ed, or an `error` method can be called to raise
6/// an error, or `complete` can be called to notify of a successful
7/// completion.
8pub fn create<F, O, U, Item, Err>(
9  subscribe: F,
10) -> ObservableBase<FnEmitter<F, Item, Err>>
11where
12  F: FnOnce(Subscriber<O, U>),
13  O: Observer<Item = Item, Err = Err>,
14  U: SubscriptionLike,
15{
16  ObservableBase::new(FnEmitter(subscribe, TypeHint::new()))
17}
18
19#[derive(Clone)]
20pub struct FnEmitter<F, Item, Err>(F, TypeHint<(Item, Err)>);
21
22impl<F, Item, Err> Emitter for FnEmitter<F, Item, Err> {
23  type Item = Item;
24  type Err = Err;
25}
26
27impl<'a, F, Item, Err> LocalEmitter<'a> for FnEmitter<F, Item, Err>
28where
29  F: FnOnce(
30    Subscriber<
31      Box<dyn Observer<Item = Item, Err = Err> + 'a>,
32      Box<dyn SubscriptionLike + 'a>,
33    >,
34  ),
35{
36  fn emit<O>(self, subscriber: Subscriber<O, LocalSubscription>)
37  where
38    O: Observer<Item = Self::Item, Err = Self::Err> + 'a,
39  {
40    (self.0)(Subscriber {
41      observer: Box::new(subscriber.observer),
42      subscription: Box::new(subscriber.subscription),
43    })
44  }
45}
46
47impl<F, Item, Err> SharedEmitter for FnEmitter<F, Item, Err>
48where
49  F: FnOnce(
50    Subscriber<
51      Box<dyn Observer<Item = Item, Err = Err> + Send + Sync + 'static>,
52      SharedSubscription,
53    >,
54  ),
55{
56  fn emit<O>(self, subscriber: Subscriber<O, SharedSubscription>)
57  where
58    O: Observer<Item = Self::Item, Err = Self::Err> + Send + Sync + 'static,
59  {
60    (self.0)(Subscriber {
61      observer: Box::new(subscriber.observer),
62      subscription: subscriber.subscription,
63    })
64  }
65}
66
67#[cfg(test)]
68mod test {
69  use crate::prelude::*;
70  use bencher::Bencher;
71  use std::sync::{Arc, Mutex};
72
73  #[test]
74  fn proxy_call() {
75    let next = Arc::new(Mutex::new(0));
76    let err = Arc::new(Mutex::new(0));
77    let complete = Arc::new(Mutex::new(0));
78    let c_next = next.clone();
79    let c_err = err.clone();
80    let c_complete = complete.clone();
81
82    observable::create(|mut subscriber| {
83      subscriber.next(&1);
84      subscriber.next(&2);
85      subscriber.next(&3);
86      subscriber.complete();
87      subscriber.next(&3);
88      subscriber.error("never dispatch error");
89    })
90    .into_shared()
91    .subscribe_all(
92      move |_| *next.lock().unwrap() += 1,
93      move |_: &str| *err.lock().unwrap() += 1,
94      move || *complete.lock().unwrap() += 1,
95    );
96
97    assert_eq!(*c_next.lock().unwrap(), 3);
98    assert_eq!(*c_complete.lock().unwrap(), 1);
99    assert_eq!(*c_err.lock().unwrap(), 0);
100  }
101  #[test]
102  fn support_fork() {
103    let o = observable::create(|mut subscriber| {
104      subscriber.next(&1);
105      subscriber.next(&2);
106      subscriber.next(&3);
107      subscriber.next(&4);
108    });
109    let sum1 = Arc::new(Mutex::new(0));
110    let sum2 = Arc::new(Mutex::new(0));
111    let c_sum1 = sum1.clone();
112    let c_sum2 = sum2.clone();
113    o.clone().subscribe(move |v| *sum1.lock().unwrap() += v);
114    o.clone().subscribe(move |v| *sum2.lock().unwrap() += v);
115
116    assert_eq!(*c_sum1.lock().unwrap(), 10);
117    assert_eq!(*c_sum2.lock().unwrap(), 10);
118  }
119
120  #[test]
121  fn fork_and_share() {
122    let observable = observable::create(|_| {});
123    observable.clone().into_shared().subscribe(|_: i32| {});
124    observable.clone().into_shared().subscribe(|_| {});
125
126    let observable = observable::create(|_| {}).into_shared();
127    observable.clone().subscribe(|_: i32| {});
128    observable.clone().subscribe(|_| {});
129  }
130
131  #[test]
132  fn bench() { do_bench(); }
133
134  benchmark_group!(do_bench, bench_from_fn);
135
136  fn bench_from_fn(b: &mut Bencher) { b.iter(proxy_call); }
137}