1use super::Signal;
2use std::pin::Pin;
3use std::marker::Unpin;
4use std::fmt::Debug;
5use std::sync::{Arc, Mutex, RwLock, Weak};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::task::{Poll, Waker, Context};
8use futures_util::task::{self, ArcWake};
9use crate::signal::ChangedWaker;
10
11
12#[derive(Debug)]
15struct BroadcasterNotifier {
16 is_changed: AtomicBool,
17 wakers: Mutex<Vec<Weak<ChangedWaker>>>,
18}
19
20impl BroadcasterNotifier {
21 fn new() -> Self {
22 Self {
23 is_changed: AtomicBool::new(true),
24 wakers: Mutex::new(vec![]),
25 }
26 }
27
28 fn notify(&self) {
29 let mut lock = self.wakers.lock().unwrap();
30
31 self.is_changed.store(true, Ordering::SeqCst);
32
33 lock.retain(|waker| {
35 if let Some(waker) = waker.upgrade() {
36 waker.wake(false);
37 true
38
39 } else {
40 false
41 }
42 });
43 }
44
45 fn is_changed(&self) -> bool {
46 self.is_changed.swap(false, Ordering::SeqCst)
47 }
48}
49
50impl ArcWake for BroadcasterNotifier {
51 #[inline]
52 fn wake_by_ref(arc_self: &Arc<Self>) {
53 arc_self.notify();
54 }
55}
56
57
58#[derive(Debug)]
60struct BroadcasterInnerState<A> where A: Signal {
61 signal: Option<Pin<Box<A>>>,
63 waker: Waker,
64 value: Option<A::Item>,
65 epoch: usize,
67}
68
69impl<A> BroadcasterInnerState<A> where A: Signal {
70 fn new(signal: A, waker: Waker) -> Self {
71 Self {
72 signal: Some(Box::pin(signal)),
73 waker,
74 value: None,
75 epoch: 0,
76 }
77 }
78
79 fn poll_signal(&mut self) {
80 if let Some(ref mut signal) = self.signal {
81 let cx = &mut Context::from_waker(&self.waker);
82
83 let mut changed = false;
84
85 loop {
86 match signal.as_mut().poll_change(cx) {
88 Poll::Ready(Some(value)) => {
89 self.value = Some(value);
90 changed = true;
91 continue;
92 },
93 Poll::Ready(None) => {
94 self.signal = None;
95 break;
96 },
97 Poll::Pending => {
98 break;
99 },
100 }
101 }
102
103 if changed {
104 self.epoch += 1;
105 }
106 }
107 }
108}
109
110
111struct BroadcasterSharedState<A> where A: Signal {
113 inner: RwLock<BroadcasterInnerState<A>>,
114 notifier: Arc<BroadcasterNotifier>,
115}
116
117impl<A> BroadcasterSharedState<A> where A: Signal {
118 fn new(signal: A) -> Self {
119 let notifier = Arc::new(BroadcasterNotifier::new());
120 let waker = task::waker(notifier.clone());
121
122 Self {
123 inner: RwLock::new(BroadcasterInnerState::new(signal, waker)),
124 notifier,
125 }
126 }
127
128 fn poll<B, F>(&self, f: F) -> B where F: FnOnce(&BroadcasterInnerState<A>) -> B {
129 if self.notifier.is_changed() {
130 let mut lock = self.inner.write().unwrap();
131
132 lock.poll_signal();
133
134 f(&lock)
135
136 } else {
137 let lock = self.inner.read().unwrap();
138
139 f(&lock)
140 }
141 }
142}
143
144impl<A> Debug for BroadcasterSharedState<A>
146 where A: Debug + Signal,
147 A::Item: Debug {
148
149 fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
150 fmt.debug_struct("BroadcasterSharedState")
151 .field("inner", &self.inner)
152 .field("notifier", &self.notifier)
153 .finish()
154 }
155}
156
157
158struct BroadcasterState<A> where A: Signal {
159 epoch: usize,
160 waker: Arc<ChangedWaker>,
161 shared_state: Arc<BroadcasterSharedState<A>>,
162}
163
164impl<A> BroadcasterState<A> where A: Signal {
165 fn new(shared_state: &Arc<BroadcasterSharedState<A>>) -> Self {
166 let waker = Arc::new(ChangedWaker::new());
167
168 {
169 let mut lock = shared_state.notifier.wakers.lock().unwrap();
170 lock.push(Arc::downgrade(&waker));
171 }
172
173 Self {
174 epoch: 0,
175 waker,
176 shared_state: shared_state.clone(),
177 }
178 }
179
180 fn poll_change<B, F>(&mut self, cx: &mut Context, f: F) -> Poll<Option<B>> where F: FnOnce(&A::Item) -> B {
181 let BroadcasterState { epoch, waker, shared_state } = self;
182
183 shared_state.poll(|state| {
184 if state.epoch == *epoch {
186 if state.signal.is_none() {
187 Poll::Ready(None)
188
189 } else {
190 waker.set_waker(cx);
191 Poll::Pending
192 }
193
194 } else {
195 *epoch = state.epoch;
196 Poll::Ready(Some(f(state.value.as_ref().unwrap())))
197 }
198 })
199 }
200}
201
202impl<A> Debug for BroadcasterState<A>
204 where A: Debug + Signal,
205 A::Item: Debug {
206
207 fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
208 fmt.debug_struct("BroadcasterState")
209 .field("epoch", &self.epoch)
210 .field("waker", &self.waker)
211 .field("shared_state", &self.shared_state)
212 .finish()
213 }
214}
215
216pub struct Broadcaster<A> where A: Signal {
230 shared_state: Arc<BroadcasterSharedState<A>>,
231}
232
233impl<A> Broadcaster<A> where A: Signal {
234 pub fn new(signal: A) -> Self {
236 Self {
237 shared_state: Arc::new(BroadcasterSharedState::new(signal)),
238 }
239 }
240
241 #[inline]
242 pub fn signal_ref<B, F>(&self, f: F) -> BroadcasterSignalRef<A, F>
243 where F: FnMut(&A::Item) -> B {
244 BroadcasterSignalRef {
245 state: BroadcasterState::new(&self.shared_state),
246 callback: f,
247 }
248 }
249}
250
251impl<A> Broadcaster<A> where A: Signal, A::Item: Copy {
252 #[inline]
254 pub fn signal(&self) -> BroadcasterSignal<A> {
255 BroadcasterSignal {
256 state: BroadcasterState::new(&self.shared_state),
257 }
258 }
259}
260
261impl<A> Broadcaster<A> where A: Signal, A::Item: Clone {
262 #[inline]
264 pub fn signal_cloned(&self) -> BroadcasterSignalCloned<A> {
265 BroadcasterSignalCloned {
266 state: BroadcasterState::new(&self.shared_state),
267 }
268 }
269}
270
271impl<A> Debug for Broadcaster<A>
273 where A: Debug + Signal,
274 A::Item: Debug {
275
276 fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
277 fmt.debug_struct("Broadcaster")
278 .field("shared_state", &self.shared_state)
279 .finish()
280 }
281}
282
283impl<A> Clone for Broadcaster<A> where A: Signal {
284 #[inline]
285 fn clone(&self) -> Self {
286 Self {
287 shared_state: self.shared_state.clone(),
288 }
289 }
290}
291
292#[must_use = "Signals do nothing unless polled"]
295pub struct BroadcasterSignal<A> where A: Signal {
296 state: BroadcasterState<A>,
297}
298
299impl<A> Signal for BroadcasterSignal<A>
300 where A: Signal,
301 A::Item: Copy {
302
303 type Item = A::Item;
304
305 #[inline]
306 fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
307 self.state.poll_change(cx, |value| *value)
308 }
309}
310
311impl<A> Debug for BroadcasterSignal<A>
313 where A: Debug + Signal,
314 A::Item: Debug {
315
316 fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
317 fmt.debug_struct("BroadcasterSignal")
318 .field("state", &self.state)
319 .finish()
320 }
321}
322
323#[must_use = "Signals do nothing unless polled"]
326pub struct BroadcasterSignalCloned<A> where A: Signal {
327 state: BroadcasterState<A>,
328}
329
330impl<A> Signal for BroadcasterSignalCloned<A>
331 where A: Signal,
332 A::Item: Clone {
333
334 type Item = A::Item;
335
336 #[inline]
337 fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
338 self.state.poll_change(cx, |value| value.clone())
339 }
340}
341
342impl<A> Debug for BroadcasterSignalCloned<A>
344 where A: Debug + Signal,
345 A::Item: Debug {
346
347 fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
348 fmt.debug_struct("BroadcasterSignalCloned")
349 .field("state", &self.state)
350 .finish()
351 }
352}
353
354#[must_use = "Signals do nothing unless polled"]
357pub struct BroadcasterSignalRef<A, F> where A: Signal {
358 state: BroadcasterState<A>,
359 callback: F,
360}
361
362impl<A, F> Unpin for BroadcasterSignalRef<A, F> where A: Signal {}
363
364impl<A, B, F> Signal for BroadcasterSignalRef<A, F>
365 where A: Signal,
366 F: FnMut(&A::Item) -> B {
367
368 type Item = B;
369
370 #[inline]
371 fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
372 let BroadcasterSignalRef { state, callback } = &mut *self;
373 state.poll_change(cx, callback)
374 }
375}
376
377impl<A, F> Debug for BroadcasterSignalRef<A, F>
379 where A: Debug + Signal,
380 A::Item: Debug {
381
382 fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
383 fmt.debug_struct("BroadcasterSignalRef")
384 .field("state", &self.state)
385 .finish()
386 }
387}