1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use crate::prelude::*;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
pub type SimpleSubscription = Arc<AtomicBool>;
pub struct Subscriber<Sub> {
stopped: SimpleSubscription,
subscribe: Sub,
}
unsafe impl<Sub> Send for Subscriber<Sub> where Sub: Send {}
unsafe impl<Sub> Sync for Subscriber<Sub> where Sub: Send {}
impl<Sub> Subscriber<Sub> {
pub fn new(subscribe: Sub) -> Self {
Subscriber {
stopped: Arc::new(AtomicBool::new(false)),
subscribe,
}
}
#[inline(always)]
pub fn clone_subscription(&self) -> SimpleSubscription {
self.stopped.clone()
}
}
impl<Item, Err, Sub> Observer<Item, Err> for Subscriber<Sub>
where
Sub: RxFn(RxValue<&'_ Item, &'_ Err>),
{
fn next(&self, v: &Item) {
if !self.stopped.load(Ordering::Relaxed) {
self.subscribe.call((RxValue::Next(v),))
}
}
fn complete(&mut self) {
if self.stopped.load(Ordering::Relaxed) {
return;
}
self.stopped.store(true, Ordering::Relaxed);
self.subscribe.call((RxValue::Complete,));
}
fn error(&mut self, err: &Err) {
if self.stopped.load(Ordering::Relaxed) {
return;
}
self.stopped.store(true, Ordering::Relaxed);
self.subscribe.call((RxValue::Err(err),));
}
fn is_stopped(&self) -> bool { self.stopped.load(Ordering::Relaxed) }
}
impl Subscription for SimpleSubscription {
#[inline(always)]
fn unsubscribe(&mut self) { self.store(true, Ordering::Relaxed); }
}
impl<Sub> Subscription for Subscriber<Sub> {
#[inline(always)]
fn unsubscribe(&mut self) { self.stopped.unsubscribe(); }
}
#[cfg(test)]
mod test {
use crate::prelude::*;
use std::cell::Cell;
macro_rules! create_subscriber {
($next:ident, $err: ident, $complete: ident) => {{
Subscriber::new(RxFnWrapper::new(|v: RxValue<&'_ _, &'_ ()>| {
match v {
RxValue::Next(_) => $next.set($next.get() + 1),
RxValue::Complete => $complete.set($complete.get() + 1),
RxValue::Err(_) => $err.set($err.get() + 1),
};
}))
}};
}
#[test]
fn next_and_complete() {
let next = Cell::new(0);
let err = Cell::new(0);
let complete = Cell::new(0);
let mut subscriber = create_subscriber!(next, err, complete);
subscriber.next(&1);
subscriber.next(&2);
subscriber.complete();
subscriber.next(&3);
subscriber.next(&4);
assert_eq!(next.get(), 2);
assert_eq!(complete.get(), 1);
}
#[test]
fn next_and_error() {
let next = Cell::new(0);
let err = Cell::new(0);
let complete = Cell::new(0);
let mut subscriber = create_subscriber!(next, err, complete);
subscriber.next(&1);
subscriber.next(&2);
subscriber.error(&());
subscriber.next(&3);
subscriber.next(&4);
assert_eq!(next.get(), 2);
assert_eq!(err.get(), 1);
}
}