vex_rt/rtos/
broadcast.rs

1use core::ops::{Deref, DerefMut};
2
3use alloc::sync::{Arc, Weak};
4use owner_monad::{Owner, OwnerMut};
5
6use super::{handle_event, Event, EventHandle, GenericSleep, Mutex, Selectable};
7use crate::error::Error;
8
9/// Represents a source of data which notifies listeners on a new value.
10pub struct Broadcast<T: Clone>(Arc<Mutex<BroadcastData<T>>>);
11
12impl<T: Clone> Broadcast<T> {
13    #[inline]
14    /// Creates a new broadcast event with the associated initial value. Panics
15    /// on failure; see [`Broadcast::try_new()`].
16    pub fn new(data: T) -> Self {
17        Self::try_new(data).unwrap_or_else(|err| panic!("failed to create broadcast: {:?}", err))
18    }
19
20    /// Creates a new broadcast event with the associated initial value.
21    pub fn try_new(data: T) -> Result<Self, Error> {
22        Ok(Self(Arc::new(Mutex::try_new(BroadcastData {
23            data: Arc::new(data),
24            event: Event::new(),
25        })?)))
26    }
27
28    /// Gets a copy of the current value of the broadcast event.
29    pub fn value(&self) -> T {
30        (*self.0.lock().data).clone()
31    }
32
33    #[inline]
34    /// Creates a new listener for the broadcast event.
35    pub fn listen(&self) -> BroadcastListener<T> {
36        BroadcastListener::new(Weak::new(), Arc::downgrade(&self.0))
37    }
38
39    /// Publishes a new value for the broadcast event.
40    pub fn publish(&self, data: T) {
41        let mut lock = self.0.lock();
42        lock.data = Arc::new(data);
43        lock.event.notify();
44    }
45}
46
47#[derive(Clone)]
48/// Provides a means of listening to updates from a [`Broadcast`] event.
49pub struct BroadcastListener<T: Clone> {
50    value: Weak<T>,
51    data: Weak<Mutex<BroadcastData<T>>>,
52}
53
54impl<T: Clone> BroadcastListener<T> {
55    #[inline]
56    fn new(value: Weak<T>, data: Weak<Mutex<BroadcastData<T>>>) -> Self {
57        Self { value, data }
58    }
59
60    #[inline]
61    /// Get the latest unprocessed value from the event, if there is one.
62    pub fn next_value(&mut self) -> Option<T> {
63        Self::next_value_impl(&mut self.value, &self.data)
64    }
65
66    #[inline]
67    /// A [`Selectable`] event which occurs when new data is published to the
68    /// underlying [`Broadcast`] event.
69    pub fn select(&'_ mut self) -> impl Selectable<T> + '_ {
70        struct BroadcastSelect<'b, T: Clone> {
71            value: &'b mut Weak<T>,
72            handle: EventHandle<&'b Weak<Mutex<BroadcastData<T>>>>,
73        }
74
75        impl<'b, T: Clone> Selectable<T> for BroadcastSelect<'b, T> {
76            #[inline]
77            fn poll(mut self) -> Result<T, Self> {
78                let value = &mut self.value;
79                self.handle
80                    .with(|data| BroadcastListener::next_value_impl(value, *data))
81                    .flatten()
82                    .ok_or(self)
83            }
84            #[inline]
85            fn sleep(&self) -> GenericSleep {
86                GenericSleep::NotifyTake(None)
87            }
88        }
89
90        BroadcastSelect {
91            value: &mut self.value,
92            handle: handle_event(&self.data),
93        }
94    }
95
96    fn next_value_impl(value: &mut Weak<T>, data: &Weak<Mutex<BroadcastData<T>>>) -> Option<T> {
97        let data = data.upgrade()?;
98        let lock = data.lock();
99        match value.upgrade() {
100            Some(arc) if Arc::ptr_eq(&arc, &lock.data) => None,
101            _ => {
102                *value = Arc::downgrade(&lock.data);
103                Some((*lock.data).clone())
104            }
105        }
106    }
107}
108
109/// Describes an object which is a source of data, such as a sensor.
110///
111/// Used to facilitate broadcasting readings via [`IntoBroadcast`].
112pub trait DataSource {
113    /// The type of data produced by the data source.
114    type Data: Clone + 'static;
115
116    /// The type of errors which could occur while reading data.
117    type Error;
118
119    /// Tries to read a new value form the data source.
120    fn read(&self) -> Result<Self::Data, Self::Error>;
121}
122
123/// Extension trait for converting any [`DataSource`] into a
124/// [`BroadcastWrapper`] to facilitate broadcasting readings.
125pub trait IntoBroadcast: Sized + DataSource {
126    /// Converts the data source into a [`BroadcastWrapper`].
127    ///
128    /// This internally calls [`DataSource::read()`]; if that call fails, the
129    /// error is propagated and the data source object is returned.
130    fn into_broadcast(self) -> Result<BroadcastWrapper<Self>, (Self::Error, Self)>;
131}
132
133impl<T: Sized + DataSource> IntoBroadcast for T {
134    fn into_broadcast(self) -> Result<BroadcastWrapper<Self>, (Self::Error, Self)> {
135        let data = match self.read() {
136            Ok(data) => data,
137            Err(err) => return Err((err, self)),
138        };
139
140        Ok(BroadcastWrapper {
141            inner: self,
142            broadcast: Broadcast::new(data),
143        })
144    }
145}
146
147/// Wraps a [`DataSource`], exposing the ability to broadcast readings.
148pub struct BroadcastWrapper<T: DataSource> {
149    inner: T,
150    broadcast: Broadcast<T::Data>,
151}
152
153impl<T: DataSource> BroadcastWrapper<T> {
154    /// Converts the [`BroadcastWrapper`] back into the internal [`DataSource`].
155    pub fn into_inner(self) -> T {
156        self.inner
157    }
158
159    /// Tries to take a new reading and publish it to all listeners.
160    pub fn update(&self) -> Result<T::Data, T::Error> {
161        let data = self.inner.read()?;
162        self.broadcast.publish(data.clone());
163        Ok(data)
164    }
165
166    /// Creates a [`BroadcastListener`] which receives any new readings.
167    pub fn listen(&self) -> BroadcastListener<T::Data> {
168        self.broadcast.listen()
169    }
170}
171
172impl<T: DataSource> Deref for BroadcastWrapper<T> {
173    type Target = T;
174
175    fn deref(&self) -> &Self::Target {
176        &self.inner
177    }
178}
179
180impl<T: DataSource> DerefMut for BroadcastWrapper<T> {
181    fn deref_mut(&mut self) -> &mut Self::Target {
182        &mut self.inner
183    }
184}
185
186impl<T> OwnerMut<Event> for &Weak<Mutex<BroadcastData<T>>> {
187    fn with<'a, U>(&'a mut self, f: impl FnOnce(&mut Event) -> U) -> Option<U>
188    where
189        Event: 'a,
190    {
191        Some(f(&mut self.upgrade()?.try_lock().ok()?.event))
192    }
193}
194
195struct BroadcastData<T> {
196    data: Arc<T>,
197    event: Event,
198}