cs_mwc_bch/util/
rx.rs

1//! Lightweight reactive library
2
3use std::sync::{Arc, RwLock, TryLockError, Weak};
4use std::time::Duration;
5use util::future::{Future, FutureProvider};
6use util::{Error, Result};
7
8/// Observes an event of type T
9pub trait Observer<T>: Sync + Send {
10    /// Called when the event occurs
11    fn next(&self, event: &T);
12}
13
14/// Event publisher that may be subscribed to
15pub trait Observable<T: Send + Sync + Clone + 'static> {
16    /// Adds a weakly held observer
17    fn subscribe<S: Observer<T> + 'static>(&self, observer: &Arc<S>);
18
19    /// Waits indefinitely for an event to be emitted
20    fn poll(&self) -> T {
21        let (poller, future) = Poller::new();
22        self.subscribe(&poller);
23        future.get()
24    }
25
26    /// Waits for an event to be emitted with a timeout
27    fn poll_timeout(&self, duration: Duration) -> Result<T> {
28        let (poller, future) = Poller::new();
29        self.subscribe(&poller);
30        match future.get_timeout(duration) {
31            Ok(t) => Ok(t),
32            Err(_future) => Err(Error::Timeout),
33        }
34    }
35}
36
37/// Stores the observers for a particular event
38pub struct Subject<T> {
39    observers: RwLock<Vec<Weak<dyn Observer<T>>>>,
40    pending: RwLock<Vec<Weak<dyn Observer<T>>>>,
41}
42
43impl<T> Subject<T> {
44    /// Creates a new empty set of observers
45    pub fn new() -> Subject<T> {
46        Subject {
47            observers: RwLock::new(Vec::new()),
48            pending: RwLock::new(Vec::new()),
49        }
50    }
51}
52
53impl<T> Observer<T> for Subject<T> {
54    fn next(&self, event: &T) {
55        let mut any_to_remove = false;
56
57        {
58            for observer in self.observers.read().unwrap().iter() {
59                match observer.upgrade() {
60                    Some(observer) => observer.next(event),
61                    None => any_to_remove = true,
62                }
63            }
64        }
65
66        if any_to_remove {
67            let mut observers = self.observers.write().unwrap();
68            observers.retain(|observer| observer.upgrade().is_some());
69        }
70
71        let any_pending = { self.pending.read().unwrap().len() > 0 };
72        if any_pending {
73            let mut observers = self.observers.write().unwrap();
74            let mut pending = self.pending.write().unwrap();
75            observers.append(&mut pending);
76        }
77    }
78}
79
80impl<T: Send + Sync + Clone + 'static> Observable<T> for Subject<T> {
81    fn subscribe<S: Observer<T> + 'static>(&self, observer: &Arc<S>) {
82        let weak_observer = Arc::downgrade(observer) as Weak<dyn Observer<T>>;
83
84        match self.observers.try_write() {
85            Ok(mut observers) => observers.push(weak_observer),
86
87            // If we would block, add to a pending set
88            Err(TryLockError::WouldBlock) => {
89                self.pending.write().unwrap().push(weak_observer);
90            }
91
92            // If observer is poisoned, app will be killed soon
93            Err(TryLockError::Poisoned(_)) => panic!("Observer lock poisoned"),
94        }
95    }
96}
97
98/// A subject that only emits a single value
99///
100/// After a value is emmitted once, all future calls to next() will be ignored,
101/// and any future subscriptions will be called with the original value once.
102pub struct Single<T: Sync + Send + Clone> {
103    subject: Subject<T>,
104    value: RwLock<Option<T>>,
105}
106
107impl<T: Sync + Send + Clone> Single<T> {
108    /// Creates a new single with an empty set of observers
109    pub fn new() -> Single<T> {
110        Single {
111            subject: Subject::new(),
112            value: RwLock::new(None),
113        }
114    }
115}
116
117impl<T: Sync + Send + Clone> Observer<T> for Single<T> {
118    fn next(&self, event: &T) {
119        let mut value = self.value.write().unwrap();
120        if let None = *value {
121            *value = Some(event.clone());
122            self.subject.next(event);
123        }
124    }
125}
126
127impl<T: Sync + Send + Clone + 'static> Observable<T> for Single<T> {
128    fn subscribe<S: Observer<T> + 'static>(&self, observer: &Arc<S>) {
129        match &*self.value.read().unwrap() {
130            Some(value) => observer.next(&value),
131            None => self.subject.subscribe(observer),
132        }
133    }
134}
135
136struct Poller<T: Sync + Send + Clone> {
137    future_provider: FutureProvider<T>,
138}
139
140impl<T: Sync + Send + Clone> Poller<T> {
141    pub fn new() -> (Arc<Poller<T>>, Future<T>) {
142        let (future, future_provider) = Future::new();
143        (Arc::new(Poller { future_provider }), future)
144    }
145}
146
147impl<T: Sync + Send + Clone> Observer<T> for Poller<T> {
148    fn next(&self, event: &T) {
149        self.future_provider.put(event.clone());
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use std::sync::atomic::{AtomicBool, Ordering};
157
158    #[test]
159    fn publish_observe() {
160        struct MyObserver {
161            observed: AtomicBool,
162        }
163
164        impl<'a> Observer<u32> for MyObserver {
165            fn next(&self, _event: &u32) {
166                self.observed.store(true, Ordering::Relaxed);
167            }
168        }
169
170        let subject = Subject::<u32>::new();
171        let observer = Arc::new(MyObserver {
172            observed: AtomicBool::new(false),
173        });
174        subject.subscribe(&observer);
175
176        assert!(!observer.observed.load(Ordering::Relaxed));
177        subject.next(&1);
178        assert!(observer.observed.load(Ordering::Relaxed));
179    }
180
181    #[test]
182    fn observe_during_next() {
183        let subject = Arc::new(Subject::<u32>::new());
184        struct MyObserver {
185            subject: Arc<Subject<u32>>,
186        }
187        impl<'a> Observer<u32> for MyObserver {
188            fn next(&self, _event: &u32) {
189                self.subject.subscribe(&Arc::new(MyObserver {
190                    subject: self.subject.clone(),
191                }));
192            }
193        }
194        subject.subscribe(&Arc::new(MyObserver {
195            subject: subject.clone(),
196        }));
197        subject.next(&1);
198    }
199
200    #[test]
201    fn single() {
202        struct MyObserver {
203            observed: AtomicBool,
204        }
205
206        impl<'a> Observer<u32> for MyObserver {
207            fn next(&self, event: &u32) {
208                assert!(event == &5);
209                assert!(!self.observed.swap(true, Ordering::Relaxed));
210            }
211        }
212
213        let pre_emit_observer = Arc::new(MyObserver {
214            observed: AtomicBool::new(false),
215        });
216
217        let post_emit_observer = Arc::new(MyObserver {
218            observed: AtomicBool::new(false),
219        });
220
221        let single = Single::<u32>::new();
222        single.subscribe(&pre_emit_observer);
223        single.next(&5);
224        assert!(pre_emit_observer.observed.load(Ordering::Relaxed));
225        single.subscribe(&post_emit_observer);
226        assert!(post_emit_observer.observed.load(Ordering::Relaxed));
227        single.next(&6);
228    }
229}