1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use crate::{impl_local_shared_both, prelude::*};
pub fn defer<F, U>(observable_supplier: F) -> ObservableDeref<F>
where
F: FnOnce() -> U,
{
ObservableDeref(observable_supplier)
}
#[derive(Clone)]
pub struct ObservableDeref<F>(F);
impl<F, U> Observable for ObservableDeref<F>
where
F: FnOnce() -> U,
U: Observable,
{
type Item = U::Item;
type Err = U::Err;
}
impl_local_shared_both! {
impl<F, U> ObservableDeref<F>;
type Unsub = U::Unsub;
macro method($self: ident, $observer: ident, $ctx: ident) {
($self.0)().actual_subscribe($observer)
}
where F: FnOnce()-> U, U: @ctx::Observable
}
#[cfg(not(target_arch = "wasm32"))]
#[cfg(test)]
mod test {
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use crate::prelude::*;
use bencher::Bencher;
#[test]
fn no_results_before_deferred_subscribe() {
let calls = Arc::new(Mutex::new(0));
let sum = Arc::new(Mutex::new(0));
let errs = Arc::new(Mutex::new(0));
let completes = Arc::new(Mutex::new(0));
let deferred = observable::defer(|| {
*calls.lock().unwrap() += 1;
observable::of(&2)
})
.into_shared();
assert_eq!(calls.lock().unwrap().deref(), &0);
for i in 1..4 {
let sum_copy = Arc::clone(&sum);
let errs_copy = Arc::clone(&errs);
let completes_copy = Arc::clone(&completes);
deferred.clone().subscribe_all(
move |v| *sum_copy.lock().unwrap() += v,
move |_| *errs_copy.lock().unwrap() += 1,
move || *completes_copy.lock().unwrap() += 1,
);
assert_eq!(*calls.lock().unwrap(), i);
}
assert_eq!(*calls.lock().unwrap().deref(), 3);
assert_eq!(*sum.lock().unwrap().deref(), 6);
assert_eq!(*errs.lock().unwrap().deref(), 0);
assert_eq!(*completes.lock().unwrap().deref(), 3);
}
#[test]
fn support_fork() {
let calls = Arc::new(Mutex::new(0));
let o = observable::defer(|| {
*calls.lock().unwrap() += 1;
observable::of(10)
});
let sum1 = Arc::new(Mutex::new(0));
let sum2 = Arc::new(Mutex::new(0));
let c_sum1 = sum1.clone();
let c_sum2 = sum2.clone();
o.clone().subscribe(move |v| *sum1.lock().unwrap() += v);
o.clone().subscribe(move |v| *sum2.lock().unwrap() += v);
assert_eq!(*c_sum1.lock().unwrap(), 10);
assert_eq!(*c_sum2.lock().unwrap(), 10);
assert_eq!(*calls.lock().unwrap().deref(), 2);
}
#[test]
fn fork_and_share() {
let observable = observable::defer(observable::empty);
observable.clone().into_shared().subscribe(|_: i32| {});
observable.into_shared().subscribe(|_| {});
let observable = observable::defer(observable::empty).into_shared();
observable.clone().subscribe(|_: i32| {});
observable.subscribe(|_| {});
}
#[test]
fn bench() { do_bench(); }
benchmark_group!(do_bench, bench_deref);
fn bench_deref(b: &mut Bencher) {
b.iter(no_results_before_deferred_subscribe);
}
}