futures_signals/signal/
broadcaster.rs

1use super::Signal;
2use std::pin::Pin;
3use std::marker::Unpin;
4use std::fmt::Debug;
5use std::sync::{Arc, Mutex, RwLock, Weak};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::task::{Poll, Waker, Context};
8use futures_util::task::{self, ArcWake};
9use crate::signal::ChangedWaker;
10
11
12/// When the Signal changes it will wake up the BroadcasterNotifier, which will
13/// then wake up all of the child ChangedWaker.
14#[derive(Debug)]
15struct BroadcasterNotifier {
16    is_changed: AtomicBool,
17    wakers: Mutex<Vec<Weak<ChangedWaker>>>,
18}
19
20impl BroadcasterNotifier {
21    fn new() -> Self {
22        Self {
23            is_changed: AtomicBool::new(true),
24            wakers: Mutex::new(vec![]),
25        }
26    }
27
28    fn notify(&self) {
29        let mut lock = self.wakers.lock().unwrap();
30
31        self.is_changed.store(true, Ordering::SeqCst);
32
33        // Take this opportunity to GC dead wakers
34        lock.retain(|waker| {
35            if let Some(waker) = waker.upgrade() {
36                waker.wake(false);
37                true
38
39            } else {
40                false
41            }
42        });
43    }
44
45    fn is_changed(&self) -> bool {
46        self.is_changed.swap(false, Ordering::SeqCst)
47    }
48}
49
50impl ArcWake for BroadcasterNotifier {
51    #[inline]
52    fn wake_by_ref(arc_self: &Arc<Self>) {
53        arc_self.notify();
54    }
55}
56
57
58/// This will poll the input Signal and keep track of the most recent value.
59#[derive(Debug)]
60struct BroadcasterInnerState<A> where A: Signal {
61    // TODO is there a more efficient way to implement this ?
62    signal: Option<Pin<Box<A>>>,
63    waker: Waker,
64    value: Option<A::Item>,
65    // This is used to keep track of when the value changes
66    epoch: usize,
67}
68
69impl<A> BroadcasterInnerState<A> where A: Signal {
70    fn new(signal: A, waker: Waker) -> Self {
71        Self {
72            signal: Some(Box::pin(signal)),
73            waker,
74            value: None,
75            epoch: 0,
76        }
77    }
78
79    fn poll_signal(&mut self) {
80        if let Some(ref mut signal) = self.signal {
81            let cx = &mut Context::from_waker(&self.waker);
82
83            let mut changed = false;
84
85            loop {
86                // TODO what if it is woken up while polling ?
87                match signal.as_mut().poll_change(cx) {
88                    Poll::Ready(Some(value)) => {
89                        self.value = Some(value);
90                        changed = true;
91                        continue;
92                    },
93                    Poll::Ready(None) => {
94                        self.signal = None;
95                        break;
96                    },
97                    Poll::Pending => {
98                        break;
99                    },
100                }
101            }
102
103            if changed {
104                self.epoch += 1;
105            }
106        }
107    }
108}
109
110
111/// Shared state for the Broadcaster and all child signals.
112struct BroadcasterSharedState<A> where A: Signal {
113    inner: RwLock<BroadcasterInnerState<A>>,
114    notifier: Arc<BroadcasterNotifier>,
115}
116
117impl<A> BroadcasterSharedState<A> where A: Signal {
118    fn new(signal: A) -> Self {
119        let notifier = Arc::new(BroadcasterNotifier::new());
120        let waker = task::waker(notifier.clone());
121
122        Self {
123            inner: RwLock::new(BroadcasterInnerState::new(signal, waker)),
124            notifier,
125        }
126    }
127
128    fn poll<B, F>(&self, f: F) -> B where F: FnOnce(&BroadcasterInnerState<A>) -> B {
129        if self.notifier.is_changed() {
130            let mut lock = self.inner.write().unwrap();
131
132            lock.poll_signal();
133
134            f(&lock)
135
136        } else {
137            let lock = self.inner.read().unwrap();
138
139            f(&lock)
140        }
141    }
142}
143
144// TODO use derive
145impl<A> Debug for BroadcasterSharedState<A>
146    where A: Debug + Signal,
147          A::Item: Debug {
148
149    fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
150        fmt.debug_struct("BroadcasterSharedState")
151            .field("inner", &self.inner)
152            .field("notifier", &self.notifier)
153            .finish()
154    }
155}
156
157
158struct BroadcasterState<A> where A: Signal {
159    epoch: usize,
160    waker: Arc<ChangedWaker>,
161    shared_state: Arc<BroadcasterSharedState<A>>,
162}
163
164impl<A> BroadcasterState<A> where A: Signal {
165    fn new(shared_state: &Arc<BroadcasterSharedState<A>>) -> Self {
166        let waker = Arc::new(ChangedWaker::new());
167
168        {
169            let mut lock = shared_state.notifier.wakers.lock().unwrap();
170            lock.push(Arc::downgrade(&waker));
171        }
172
173        Self {
174            epoch: 0,
175            waker,
176            shared_state: shared_state.clone(),
177        }
178    }
179
180    fn poll_change<B, F>(&mut self, cx: &mut Context, f: F) -> Poll<Option<B>> where F: FnOnce(&A::Item) -> B {
181        let BroadcasterState { epoch, waker, shared_state } = self;
182
183        shared_state.poll(|state| {
184            // Value hasn't changed
185            if state.epoch == *epoch {
186                if state.signal.is_none() {
187                    Poll::Ready(None)
188
189                } else {
190                    waker.set_waker(cx);
191                    Poll::Pending
192                }
193
194            } else {
195                *epoch = state.epoch;
196                Poll::Ready(Some(f(state.value.as_ref().unwrap())))
197            }
198        })
199    }
200}
201
202// TODO use derive
203impl<A> Debug for BroadcasterState<A>
204    where A: Debug + Signal,
205          A::Item: Debug {
206
207    fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
208        fmt.debug_struct("BroadcasterState")
209            .field("epoch", &self.epoch)
210            .field("waker", &self.waker)
211            .field("shared_state", &self.shared_state)
212            .finish()
213    }
214}
215
216// ---------------------------------------------------------------------------
217
218/// Splits an input `Signal` into multiple output `Signals`.
219///
220/// `Broadcaster` provides `.signal()`, `.signal_cloned()`, and `.signal_ref()`
221/// methods which can be used to create multiple signals from a single signal.
222///
223/// This is useful because `Signal` usually does not implement `Clone`, so it is
224/// necessary to use `Broadcaster` to "clone" a `Signal`.
225///
226/// If you are using a `Mutable` then you don't need `Broadcaster`, because
227/// `Mutable` already supports the `.signal()`, `.signal_cloned()` and
228/// `.signal_ref()` methods (they are faster than `Broadcaster`).
229pub struct Broadcaster<A> where A: Signal {
230    shared_state: Arc<BroadcasterSharedState<A>>,
231}
232
233impl<A> Broadcaster<A> where A: Signal {
234    /// Create a new `Broadcaster`
235    pub fn new(signal: A) -> Self {
236        Self {
237            shared_state: Arc::new(BroadcasterSharedState::new(signal)),
238        }
239    }
240
241    #[inline]
242    pub fn signal_ref<B, F>(&self, f: F) -> BroadcasterSignalRef<A, F>
243        where F: FnMut(&A::Item) -> B {
244        BroadcasterSignalRef {
245            state: BroadcasterState::new(&self.shared_state),
246            callback: f,
247        }
248    }
249}
250
251impl<A> Broadcaster<A> where A: Signal, A::Item: Copy {
252    /// Returns a new `Signal` which copies values from the input `Signal`
253    #[inline]
254    pub fn signal(&self) -> BroadcasterSignal<A> {
255        BroadcasterSignal {
256            state: BroadcasterState::new(&self.shared_state),
257        }
258    }
259}
260
261impl<A> Broadcaster<A> where A: Signal, A::Item: Clone {
262    /// Returns a new `Signal` which clones values from the input `Signal`
263    #[inline]
264    pub fn signal_cloned(&self) -> BroadcasterSignalCloned<A> {
265        BroadcasterSignalCloned {
266            state: BroadcasterState::new(&self.shared_state),
267        }
268    }
269}
270
271// TODO use derive
272impl<A> Debug for Broadcaster<A>
273    where A: Debug + Signal,
274          A::Item: Debug {
275
276    fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
277        fmt.debug_struct("Broadcaster")
278            .field("shared_state", &self.shared_state)
279            .finish()
280    }
281}
282
283impl<A> Clone for Broadcaster<A> where A: Signal {
284    #[inline]
285    fn clone(&self) -> Self {
286        Self {
287            shared_state: self.shared_state.clone(),
288        }
289    }
290}
291
292// ---------------------------------------------------------------------------
293
294#[must_use = "Signals do nothing unless polled"]
295pub struct BroadcasterSignal<A> where A: Signal {
296    state: BroadcasterState<A>,
297}
298
299impl<A> Signal for BroadcasterSignal<A>
300    where A: Signal,
301          A::Item: Copy {
302
303    type Item = A::Item;
304
305    #[inline]
306    fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
307        self.state.poll_change(cx, |value| *value)
308    }
309}
310
311// TODO use derive
312impl<A> Debug for BroadcasterSignal<A>
313    where A: Debug + Signal,
314          A::Item: Debug {
315
316    fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
317        fmt.debug_struct("BroadcasterSignal")
318            .field("state", &self.state)
319            .finish()
320    }
321}
322
323// --------------------------------------------------------------------------
324
325#[must_use = "Signals do nothing unless polled"]
326pub struct BroadcasterSignalCloned<A> where A: Signal {
327    state: BroadcasterState<A>,
328}
329
330impl<A> Signal for BroadcasterSignalCloned<A>
331    where A: Signal,
332          A::Item: Clone {
333
334    type Item = A::Item;
335
336    #[inline]
337    fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
338        self.state.poll_change(cx, |value| value.clone())
339    }
340}
341
342// TODO use derive
343impl<A> Debug for BroadcasterSignalCloned<A>
344    where A: Debug + Signal,
345          A::Item: Debug {
346
347    fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
348        fmt.debug_struct("BroadcasterSignalCloned")
349            .field("state", &self.state)
350            .finish()
351    }
352}
353
354// --------------------------------------------------------------------------
355
356#[must_use = "Signals do nothing unless polled"]
357pub struct BroadcasterSignalRef<A, F> where A: Signal {
358    state: BroadcasterState<A>,
359    callback: F,
360}
361
362impl<A, F> Unpin for BroadcasterSignalRef<A, F> where A: Signal {}
363
364impl<A, B, F> Signal for BroadcasterSignalRef<A, F>
365    where A: Signal,
366          F: FnMut(&A::Item) -> B {
367
368    type Item = B;
369
370    #[inline]
371    fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
372        let BroadcasterSignalRef { state, callback } = &mut *self;
373        state.poll_change(cx, callback)
374    }
375}
376
377// TODO use derive
378impl<A, F> Debug for BroadcasterSignalRef<A, F>
379    where A: Debug + Signal,
380          A::Item: Debug {
381
382    fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
383        fmt.debug_struct("BroadcasterSignalRef")
384            .field("state", &self.state)
385            .finish()
386    }
387}