1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use alloc::sync::{Arc, Weak};
use crate::{error::Error, util::owner::Owner};
use super::{handle_event, Event, EventHandle, GenericSleep, Mutex, Selectable};
pub struct Broadcast<T: Clone>(Mutex<BroadcastData<T>>);
impl<T: Clone> Broadcast<T> {
#[inline]
pub fn new(data: T) -> Self {
Self::try_new(data).unwrap_or_else(|err| panic!("failed to create broadcast: {:?}", err))
}
pub fn try_new(data: T) -> Result<Self, Error> {
Ok(Self(Mutex::try_new(BroadcastData {
data: Arc::new(data),
event: Event::new(),
})?))
}
pub fn value(&self) -> T {
(*self.0.lock().data).clone()
}
#[inline]
pub fn listen(&self) -> BroadcastListener<'_, T> {
BroadcastListener::new(Weak::new(), &self.0)
}
pub fn publish(&self, data: T) {
let mut lock = self.0.lock();
lock.data = Arc::new(data);
lock.event.notify();
}
}
pub struct BroadcastListener<'a, T: Clone> {
data: Weak<T>,
mtx: &'a Mutex<BroadcastData<T>>,
}
impl<'a, T: Clone> BroadcastListener<'a, T> {
#[inline]
fn new(data: Weak<T>, mtx: &'a Mutex<BroadcastData<T>>) -> Self {
Self { data, mtx }
}
#[inline]
pub fn next_value(&mut self) -> Option<T> {
Self::next_value_impl(&mut self.data, &self.mtx)
}
#[inline]
pub fn select(&'_ mut self) -> impl Selectable<T> + '_ {
struct BroadcastSelect<'b, T: Clone> {
data: &'b mut Weak<T>,
handle: EventHandle<&'b Mutex<BroadcastData<T>>>,
}
impl<'b, T: Clone> Selectable<T> for BroadcastSelect<'b, T> {
#[inline]
fn poll(mut self) -> Result<T, Self> {
let data = &mut self.data;
self.handle
.with_owner(|mtx| BroadcastListener::next_value_impl(data, &mtx))
.flatten()
.ok_or(self)
}
#[inline]
fn sleep(&self) -> GenericSleep {
GenericSleep::NotifyTake(None)
}
}
let mtx: &'_ Mutex<BroadcastData<T>> = self.mtx;
BroadcastSelect {
data: &mut self.data,
handle: handle_event(mtx),
}
}
fn next_value_impl(data: &mut Weak<T>, mtx: &'a Mutex<BroadcastData<T>>) -> Option<T> {
let lock = mtx.lock();
match data.upgrade() {
Some(arc) if Arc::ptr_eq(&arc, &lock.data) => None,
_ => {
*data = Arc::downgrade(&lock.data);
Some((*lock.data).clone())
}
}
}
}
impl<T> Owner<Event> for Mutex<BroadcastData<T>> {
fn with<U>(&self, f: impl for<'b> FnOnce(&'b mut Event) -> U) -> Option<U> {
Some(f(&mut self.try_lock().ok()?.event))
}
}
struct BroadcastData<T> {
data: Arc<T>,
event: Event,
}