rxrust/observable/
from_fn.rs1use crate::prelude::*;
2
3pub fn create<F, O, U, Item, Err>(
9 subscribe: F,
10) -> ObservableBase<FnEmitter<F, Item, Err>>
11where
12 F: FnOnce(Subscriber<O, U>),
13 O: Observer<Item = Item, Err = Err>,
14 U: SubscriptionLike,
15{
16 ObservableBase::new(FnEmitter(subscribe, TypeHint::new()))
17}
18
19#[derive(Clone)]
20pub struct FnEmitter<F, Item, Err>(F, TypeHint<(Item, Err)>);
21
22impl<F, Item, Err> Emitter for FnEmitter<F, Item, Err> {
23 type Item = Item;
24 type Err = Err;
25}
26
27impl<'a, F, Item, Err> LocalEmitter<'a> for FnEmitter<F, Item, Err>
28where
29 F: FnOnce(
30 Subscriber<
31 Box<dyn Observer<Item = Item, Err = Err> + 'a>,
32 Box<dyn SubscriptionLike + 'a>,
33 >,
34 ),
35{
36 fn emit<O>(self, subscriber: Subscriber<O, LocalSubscription>)
37 where
38 O: Observer<Item = Self::Item, Err = Self::Err> + 'a,
39 {
40 (self.0)(Subscriber {
41 observer: Box::new(subscriber.observer),
42 subscription: Box::new(subscriber.subscription),
43 })
44 }
45}
46
47impl<F, Item, Err> SharedEmitter for FnEmitter<F, Item, Err>
48where
49 F: FnOnce(
50 Subscriber<
51 Box<dyn Observer<Item = Item, Err = Err> + Send + Sync + 'static>,
52 SharedSubscription,
53 >,
54 ),
55{
56 fn emit<O>(self, subscriber: Subscriber<O, SharedSubscription>)
57 where
58 O: Observer<Item = Self::Item, Err = Self::Err> + Send + Sync + 'static,
59 {
60 (self.0)(Subscriber {
61 observer: Box::new(subscriber.observer),
62 subscription: subscriber.subscription,
63 })
64 }
65}
66
67#[cfg(test)]
68mod test {
69 use crate::prelude::*;
70 use bencher::Bencher;
71 use std::sync::{Arc, Mutex};
72
73 #[test]
74 fn proxy_call() {
75 let next = Arc::new(Mutex::new(0));
76 let err = Arc::new(Mutex::new(0));
77 let complete = Arc::new(Mutex::new(0));
78 let c_next = next.clone();
79 let c_err = err.clone();
80 let c_complete = complete.clone();
81
82 observable::create(|mut subscriber| {
83 subscriber.next(&1);
84 subscriber.next(&2);
85 subscriber.next(&3);
86 subscriber.complete();
87 subscriber.next(&3);
88 subscriber.error("never dispatch error");
89 })
90 .into_shared()
91 .subscribe_all(
92 move |_| *next.lock().unwrap() += 1,
93 move |_: &str| *err.lock().unwrap() += 1,
94 move || *complete.lock().unwrap() += 1,
95 );
96
97 assert_eq!(*c_next.lock().unwrap(), 3);
98 assert_eq!(*c_complete.lock().unwrap(), 1);
99 assert_eq!(*c_err.lock().unwrap(), 0);
100 }
101 #[test]
102 fn support_fork() {
103 let o = observable::create(|mut subscriber| {
104 subscriber.next(&1);
105 subscriber.next(&2);
106 subscriber.next(&3);
107 subscriber.next(&4);
108 });
109 let sum1 = Arc::new(Mutex::new(0));
110 let sum2 = Arc::new(Mutex::new(0));
111 let c_sum1 = sum1.clone();
112 let c_sum2 = sum2.clone();
113 o.clone().subscribe(move |v| *sum1.lock().unwrap() += v);
114 o.clone().subscribe(move |v| *sum2.lock().unwrap() += v);
115
116 assert_eq!(*c_sum1.lock().unwrap(), 10);
117 assert_eq!(*c_sum2.lock().unwrap(), 10);
118 }
119
120 #[test]
121 fn fork_and_share() {
122 let observable = observable::create(|_| {});
123 observable.clone().into_shared().subscribe(|_: i32| {});
124 observable.clone().into_shared().subscribe(|_| {});
125
126 let observable = observable::create(|_| {}).into_shared();
127 observable.clone().subscribe(|_: i32| {});
128 observable.clone().subscribe(|_| {});
129 }
130
131 #[test]
132 fn bench() { do_bench(); }
133
134 benchmark_group!(do_bench, bench_from_fn);
135
136 fn bench_from_fn(b: &mut Bencher) { b.iter(proxy_call); }
137}