1use core::ops::{Deref, DerefMut};
2
3use alloc::sync::{Arc, Weak};
4use owner_monad::{Owner, OwnerMut};
5
6use super::{handle_event, Event, EventHandle, GenericSleep, Mutex, Selectable};
7use crate::error::Error;
8
9pub struct Broadcast<T: Clone>(Arc<Mutex<BroadcastData<T>>>);
11
12impl<T: Clone> Broadcast<T> {
13 #[inline]
14 pub fn new(data: T) -> Self {
17 Self::try_new(data).unwrap_or_else(|err| panic!("failed to create broadcast: {:?}", err))
18 }
19
20 pub fn try_new(data: T) -> Result<Self, Error> {
22 Ok(Self(Arc::new(Mutex::try_new(BroadcastData {
23 data: Arc::new(data),
24 event: Event::new(),
25 })?)))
26 }
27
28 pub fn value(&self) -> T {
30 (*self.0.lock().data).clone()
31 }
32
33 #[inline]
34 pub fn listen(&self) -> BroadcastListener<T> {
36 BroadcastListener::new(Weak::new(), Arc::downgrade(&self.0))
37 }
38
39 pub fn publish(&self, data: T) {
41 let mut lock = self.0.lock();
42 lock.data = Arc::new(data);
43 lock.event.notify();
44 }
45}
46
47#[derive(Clone)]
48pub struct BroadcastListener<T: Clone> {
50 value: Weak<T>,
51 data: Weak<Mutex<BroadcastData<T>>>,
52}
53
54impl<T: Clone> BroadcastListener<T> {
55 #[inline]
56 fn new(value: Weak<T>, data: Weak<Mutex<BroadcastData<T>>>) -> Self {
57 Self { value, data }
58 }
59
60 #[inline]
61 pub fn next_value(&mut self) -> Option<T> {
63 Self::next_value_impl(&mut self.value, &self.data)
64 }
65
66 #[inline]
67 pub fn select(&'_ mut self) -> impl Selectable<T> + '_ {
70 struct BroadcastSelect<'b, T: Clone> {
71 value: &'b mut Weak<T>,
72 handle: EventHandle<&'b Weak<Mutex<BroadcastData<T>>>>,
73 }
74
75 impl<'b, T: Clone> Selectable<T> for BroadcastSelect<'b, T> {
76 #[inline]
77 fn poll(mut self) -> Result<T, Self> {
78 let value = &mut self.value;
79 self.handle
80 .with(|data| BroadcastListener::next_value_impl(value, *data))
81 .flatten()
82 .ok_or(self)
83 }
84 #[inline]
85 fn sleep(&self) -> GenericSleep {
86 GenericSleep::NotifyTake(None)
87 }
88 }
89
90 BroadcastSelect {
91 value: &mut self.value,
92 handle: handle_event(&self.data),
93 }
94 }
95
96 fn next_value_impl(value: &mut Weak<T>, data: &Weak<Mutex<BroadcastData<T>>>) -> Option<T> {
97 let data = data.upgrade()?;
98 let lock = data.lock();
99 match value.upgrade() {
100 Some(arc) if Arc::ptr_eq(&arc, &lock.data) => None,
101 _ => {
102 *value = Arc::downgrade(&lock.data);
103 Some((*lock.data).clone())
104 }
105 }
106 }
107}
108
109pub trait DataSource {
113 type Data: Clone + 'static;
115
116 type Error;
118
119 fn read(&self) -> Result<Self::Data, Self::Error>;
121}
122
123pub trait IntoBroadcast: Sized + DataSource {
126 fn into_broadcast(self) -> Result<BroadcastWrapper<Self>, (Self::Error, Self)>;
131}
132
133impl<T: Sized + DataSource> IntoBroadcast for T {
134 fn into_broadcast(self) -> Result<BroadcastWrapper<Self>, (Self::Error, Self)> {
135 let data = match self.read() {
136 Ok(data) => data,
137 Err(err) => return Err((err, self)),
138 };
139
140 Ok(BroadcastWrapper {
141 inner: self,
142 broadcast: Broadcast::new(data),
143 })
144 }
145}
146
147pub struct BroadcastWrapper<T: DataSource> {
149 inner: T,
150 broadcast: Broadcast<T::Data>,
151}
152
153impl<T: DataSource> BroadcastWrapper<T> {
154 pub fn into_inner(self) -> T {
156 self.inner
157 }
158
159 pub fn update(&self) -> Result<T::Data, T::Error> {
161 let data = self.inner.read()?;
162 self.broadcast.publish(data.clone());
163 Ok(data)
164 }
165
166 pub fn listen(&self) -> BroadcastListener<T::Data> {
168 self.broadcast.listen()
169 }
170}
171
172impl<T: DataSource> Deref for BroadcastWrapper<T> {
173 type Target = T;
174
175 fn deref(&self) -> &Self::Target {
176 &self.inner
177 }
178}
179
180impl<T: DataSource> DerefMut for BroadcastWrapper<T> {
181 fn deref_mut(&mut self) -> &mut Self::Target {
182 &mut self.inner
183 }
184}
185
186impl<T> OwnerMut<Event> for &Weak<Mutex<BroadcastData<T>>> {
187 fn with<'a, U>(&'a mut self, f: impl FnOnce(&mut Event) -> U) -> Option<U>
188 where
189 Event: 'a,
190 {
191 Some(f(&mut self.upgrade()?.try_lock().ok()?.event))
192 }
193}
194
195struct BroadcastData<T> {
196 data: Arc<T>,
197 event: Event,
198}