1use std::sync::{Arc, RwLock, TryLockError, Weak};
4use std::time::Duration;
5use util::future::{Future, FutureProvider};
6use util::{Error, Result};
7
8pub trait Observer<T>: Sync + Send {
10 fn next(&self, event: &T);
12}
13
14pub trait Observable<T: Send + Sync + Clone + 'static> {
16 fn subscribe<S: Observer<T> + 'static>(&self, observer: &Arc<S>);
18
19 fn poll(&self) -> T {
21 let (poller, future) = Poller::new();
22 self.subscribe(&poller);
23 future.get()
24 }
25
26 fn poll_timeout(&self, duration: Duration) -> Result<T> {
28 let (poller, future) = Poller::new();
29 self.subscribe(&poller);
30 match future.get_timeout(duration) {
31 Ok(t) => Ok(t),
32 Err(_future) => Err(Error::Timeout),
33 }
34 }
35}
36
37pub struct Subject<T> {
39 observers: RwLock<Vec<Weak<dyn Observer<T>>>>,
40 pending: RwLock<Vec<Weak<dyn Observer<T>>>>,
41}
42
43impl<T> Subject<T> {
44 pub fn new() -> Subject<T> {
46 Subject {
47 observers: RwLock::new(Vec::new()),
48 pending: RwLock::new(Vec::new()),
49 }
50 }
51}
52
53impl<T> Observer<T> for Subject<T> {
54 fn next(&self, event: &T) {
55 let mut any_to_remove = false;
56
57 {
58 for observer in self.observers.read().unwrap().iter() {
59 match observer.upgrade() {
60 Some(observer) => observer.next(event),
61 None => any_to_remove = true,
62 }
63 }
64 }
65
66 if any_to_remove {
67 let mut observers = self.observers.write().unwrap();
68 observers.retain(|observer| observer.upgrade().is_some());
69 }
70
71 let any_pending = { self.pending.read().unwrap().len() > 0 };
72 if any_pending {
73 let mut observers = self.observers.write().unwrap();
74 let mut pending = self.pending.write().unwrap();
75 observers.append(&mut pending);
76 }
77 }
78}
79
80impl<T: Send + Sync + Clone + 'static> Observable<T> for Subject<T> {
81 fn subscribe<S: Observer<T> + 'static>(&self, observer: &Arc<S>) {
82 let weak_observer = Arc::downgrade(observer) as Weak<dyn Observer<T>>;
83
84 match self.observers.try_write() {
85 Ok(mut observers) => observers.push(weak_observer),
86
87 Err(TryLockError::WouldBlock) => {
89 self.pending.write().unwrap().push(weak_observer);
90 }
91
92 Err(TryLockError::Poisoned(_)) => panic!("Observer lock poisoned"),
94 }
95 }
96}
97
98pub struct Single<T: Sync + Send + Clone> {
103 subject: Subject<T>,
104 value: RwLock<Option<T>>,
105}
106
107impl<T: Sync + Send + Clone> Single<T> {
108 pub fn new() -> Single<T> {
110 Single {
111 subject: Subject::new(),
112 value: RwLock::new(None),
113 }
114 }
115}
116
117impl<T: Sync + Send + Clone> Observer<T> for Single<T> {
118 fn next(&self, event: &T) {
119 let mut value = self.value.write().unwrap();
120 if let None = *value {
121 *value = Some(event.clone());
122 self.subject.next(event);
123 }
124 }
125}
126
127impl<T: Sync + Send + Clone + 'static> Observable<T> for Single<T> {
128 fn subscribe<S: Observer<T> + 'static>(&self, observer: &Arc<S>) {
129 match &*self.value.read().unwrap() {
130 Some(value) => observer.next(&value),
131 None => self.subject.subscribe(observer),
132 }
133 }
134}
135
136struct Poller<T: Sync + Send + Clone> {
137 future_provider: FutureProvider<T>,
138}
139
140impl<T: Sync + Send + Clone> Poller<T> {
141 pub fn new() -> (Arc<Poller<T>>, Future<T>) {
142 let (future, future_provider) = Future::new();
143 (Arc::new(Poller { future_provider }), future)
144 }
145}
146
147impl<T: Sync + Send + Clone> Observer<T> for Poller<T> {
148 fn next(&self, event: &T) {
149 self.future_provider.put(event.clone());
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use std::sync::atomic::{AtomicBool, Ordering};
157
158 #[test]
159 fn publish_observe() {
160 struct MyObserver {
161 observed: AtomicBool,
162 }
163
164 impl<'a> Observer<u32> for MyObserver {
165 fn next(&self, _event: &u32) {
166 self.observed.store(true, Ordering::Relaxed);
167 }
168 }
169
170 let subject = Subject::<u32>::new();
171 let observer = Arc::new(MyObserver {
172 observed: AtomicBool::new(false),
173 });
174 subject.subscribe(&observer);
175
176 assert!(!observer.observed.load(Ordering::Relaxed));
177 subject.next(&1);
178 assert!(observer.observed.load(Ordering::Relaxed));
179 }
180
181 #[test]
182 fn observe_during_next() {
183 let subject = Arc::new(Subject::<u32>::new());
184 struct MyObserver {
185 subject: Arc<Subject<u32>>,
186 }
187 impl<'a> Observer<u32> for MyObserver {
188 fn next(&self, _event: &u32) {
189 self.subject.subscribe(&Arc::new(MyObserver {
190 subject: self.subject.clone(),
191 }));
192 }
193 }
194 subject.subscribe(&Arc::new(MyObserver {
195 subject: subject.clone(),
196 }));
197 subject.next(&1);
198 }
199
200 #[test]
201 fn single() {
202 struct MyObserver {
203 observed: AtomicBool,
204 }
205
206 impl<'a> Observer<u32> for MyObserver {
207 fn next(&self, event: &u32) {
208 assert!(event == &5);
209 assert!(!self.observed.swap(true, Ordering::Relaxed));
210 }
211 }
212
213 let pre_emit_observer = Arc::new(MyObserver {
214 observed: AtomicBool::new(false),
215 });
216
217 let post_emit_observer = Arc::new(MyObserver {
218 observed: AtomicBool::new(false),
219 });
220
221 let single = Single::<u32>::new();
222 single.subscribe(&pre_emit_observer);
223 single.next(&5);
224 assert!(pre_emit_observer.observed.load(Ordering::Relaxed));
225 single.subscribe(&post_emit_observer);
226 assert!(post_emit_observer.observed.load(Ordering::Relaxed));
227 single.next(&6);
228 }
229}