use core::ops::{Deref, DerefMut};
use alloc::sync::{Arc, Weak};
use owner_monad::{Owner, OwnerMut};
use super::{handle_event, Event, EventHandle, GenericSleep, Mutex, Selectable};
use crate::error::Error;
pub struct Broadcast<T: Clone>(Arc<Mutex<BroadcastData<T>>>);
impl<T: Clone> Broadcast<T> {
#[inline]
pub fn new(data: T) -> Self {
Self::try_new(data).unwrap_or_else(|err| panic!("failed to create broadcast: {:?}", err))
}
pub fn try_new(data: T) -> Result<Self, Error> {
Ok(Self(Arc::new(Mutex::try_new(BroadcastData {
data: Arc::new(data),
event: Event::new(),
})?)))
}
pub fn value(&self) -> T {
(*self.0.lock().data).clone()
}
#[inline]
pub fn listen(&self) -> BroadcastListener<T> {
BroadcastListener::new(Weak::new(), Arc::downgrade(&self.0))
}
pub fn publish(&self, data: T) {
let mut lock = self.0.lock();
lock.data = Arc::new(data);
lock.event.notify();
}
}
#[derive(Clone)]
pub struct BroadcastListener<T: Clone> {
value: Weak<T>,
data: Weak<Mutex<BroadcastData<T>>>,
}
impl<T: Clone> BroadcastListener<T> {
#[inline]
fn new(value: Weak<T>, data: Weak<Mutex<BroadcastData<T>>>) -> Self {
Self { value, data }
}
#[inline]
pub fn next_value(&mut self) -> Option<T> {
Self::next_value_impl(&mut self.value, &self.data)
}
#[inline]
pub fn select(&'_ mut self) -> impl Selectable<T> + '_ {
struct BroadcastSelect<'b, T: Clone> {
value: &'b mut Weak<T>,
handle: EventHandle<&'b Weak<Mutex<BroadcastData<T>>>>,
}
impl<'b, T: Clone> Selectable<T> for BroadcastSelect<'b, T> {
#[inline]
fn poll(mut self) -> Result<T, Self> {
let value = &mut self.value;
self.handle
.with(|data| BroadcastListener::next_value_impl(value, *data))
.flatten()
.ok_or(self)
}
#[inline]
fn sleep(&self) -> GenericSleep {
GenericSleep::NotifyTake(None)
}
}
BroadcastSelect {
value: &mut self.value,
handle: handle_event(&self.data),
}
}
fn next_value_impl(value: &mut Weak<T>, data: &Weak<Mutex<BroadcastData<T>>>) -> Option<T> {
let data = data.upgrade()?;
let lock = data.lock();
match value.upgrade() {
Some(arc) if Arc::ptr_eq(&arc, &lock.data) => None,
_ => {
*value = Arc::downgrade(&lock.data);
Some((*lock.data).clone())
}
}
}
}
pub trait DataSource {
type Data: Clone + 'static;
type Error;
fn read(&self) -> Result<Self::Data, Self::Error>;
}
pub trait IntoBroadcast: Sized + DataSource {
fn into_broadcast(self) -> Result<BroadcastWrapper<Self>, (Self::Error, Self)>;
}
impl<T: Sized + DataSource> IntoBroadcast for T {
fn into_broadcast(self) -> Result<BroadcastWrapper<Self>, (Self::Error, Self)> {
let data = match self.read() {
Ok(data) => data,
Err(err) => return Err((err, self)),
};
Ok(BroadcastWrapper {
inner: self,
broadcast: Broadcast::new(data),
})
}
}
pub struct BroadcastWrapper<T: DataSource> {
inner: T,
broadcast: Broadcast<T::Data>,
}
impl<T: DataSource> BroadcastWrapper<T> {
pub fn into_inner(self) -> T {
self.inner
}
pub fn update(&self) -> Result<T::Data, T::Error> {
let data = self.inner.read()?;
self.broadcast.publish(data.clone());
Ok(data)
}
pub fn listen(&self) -> BroadcastListener<T::Data> {
self.broadcast.listen()
}
}
impl<T: DataSource> Deref for BroadcastWrapper<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T: DataSource> DerefMut for BroadcastWrapper<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<T> OwnerMut<Event> for &Weak<Mutex<BroadcastData<T>>> {
fn with<'a, U>(&'a mut self, f: impl FnOnce(&mut Event) -> U) -> Option<U>
where
Event: 'a,
{
Some(f(&mut self.upgrade()?.try_lock().ok()?.event))
}
}
struct BroadcastData<T> {
data: Arc<T>,
event: Event,
}