use core::cell::RefCell;
use core::future::{Future, poll_fn};
use core::marker::PhantomData;
use core::ops::{Deref, DerefMut};
use core::task::{Context, Poll};
use crate::blocking_mutex::Mutex;
use crate::blocking_mutex::raw::RawMutex;
use crate::waitqueue::MultiWakerRegistration;
#[derive(Debug)]
pub struct Watch<M: RawMutex, T: Clone, const N: usize> {
mutex: Mutex<M, RefCell<WatchState<T, N>>>,
}
#[derive(Debug)]
struct WatchState<T: Clone, const N: usize> {
data: Option<T>,
current_id: u64,
wakers: MultiWakerRegistration<N>,
receiver_count: usize,
}
trait SealedWatchBehavior<T> {
fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
fn try_changed(&self, id: &mut u64) -> Option<T>;
fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
fn drop_receiver(&self);
fn clear(&self);
fn send(&self, val: T);
fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>));
fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool);
}
#[allow(private_bounds)]
pub trait WatchBehavior<T: Clone>: SealedWatchBehavior<T> {
fn try_get(&self, id: Option<&mut u64>) -> Option<T>;
fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
fn contains_value(&self) -> bool;
}
impl<M: RawMutex, T: Clone, const N: usize> SealedWatchBehavior<T> for Watch<M, T, N> {
fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
match &s.data {
Some(data) => {
*id = s.current_id;
Poll::Ready(data.clone())
}
None => {
s.wakers.register(cx.waker());
Poll::Pending
}
}
})
}
fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
match s.data {
Some(ref data) if f(data) => {
*id = s.current_id;
Poll::Ready(data.clone())
}
_ => {
s.wakers.register(cx.waker());
Poll::Pending
}
}
})
}
fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
match (&s.data, s.current_id > *id) {
(Some(data), true) => {
*id = s.current_id;
Poll::Ready(data.clone())
}
_ => {
s.wakers.register(cx.waker());
Poll::Pending
}
}
})
}
fn try_changed(&self, id: &mut u64) -> Option<T> {
self.mutex.lock(|state| {
let s = state.borrow();
match s.current_id > *id {
true => {
*id = s.current_id;
s.data.clone()
}
false => None,
}
})
}
fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
match (&s.data, s.current_id > *id) {
(Some(data), true) if f(data) => {
*id = s.current_id;
Poll::Ready(data.clone())
}
_ => {
s.wakers.register(cx.waker());
Poll::Pending
}
}
})
}
fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
self.mutex.lock(|state| {
let s = state.borrow();
match (&s.data, s.current_id > *id) {
(Some(data), true) if f(data) => {
*id = s.current_id;
s.data.clone()
}
_ => None,
}
})
}
fn drop_receiver(&self) {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
s.receiver_count -= 1;
})
}
fn clear(&self) {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
s.data = None;
})
}
fn send(&self, val: T) {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
s.data = Some(val);
s.current_id += 1;
s.wakers.wake();
})
}
fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
f(&mut s.data);
s.current_id += 1;
s.wakers.wake();
})
}
fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
if f(&mut s.data) {
s.current_id += 1;
s.wakers.wake();
}
})
}
}
impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
fn try_get(&self, id: Option<&mut u64>) -> Option<T> {
self.mutex.lock(|state| {
let s = state.borrow();
if let Some(id) = id {
*id = s.current_id;
}
s.data.clone()
})
}
fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
self.mutex.lock(|state| {
let s = state.borrow();
match s.data {
Some(ref data) if f(data) => {
if let Some(id) = id {
*id = s.current_id;
}
Some(data.clone())
}
_ => None,
}
})
}
fn contains_value(&self) -> bool {
self.mutex.lock(|state| state.borrow().data.is_some())
}
}
impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
pub const fn new() -> Self {
Self {
mutex: Mutex::new(RefCell::new(WatchState {
data: None,
current_id: 0,
wakers: MultiWakerRegistration::new(),
receiver_count: 0,
})),
}
}
pub const fn new_with(data: T) -> Self {
Self {
mutex: Mutex::new(RefCell::new(WatchState {
data: Some(data),
current_id: 0,
wakers: MultiWakerRegistration::new(),
receiver_count: 0,
})),
}
}
pub fn sender(&self) -> Sender<'_, M, T, N> {
Sender(Snd::new(self))
}
pub fn dyn_sender(&self) -> DynSender<'_, T> {
DynSender(Snd::new(self))
}
pub fn receiver(&self) -> Option<Receiver<'_, M, T, N>> {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
if s.receiver_count < N {
s.receiver_count += 1;
Some(Receiver(Rcv::new(self, 0)))
} else {
None
}
})
}
pub fn dyn_receiver(&self) -> Option<DynReceiver<'_, T>> {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
if s.receiver_count < N {
s.receiver_count += 1;
Some(DynReceiver(Rcv::new(self, 0)))
} else {
None
}
})
}
pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> {
AnonReceiver(AnonRcv::new(self, 0))
}
pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> {
DynAnonReceiver(AnonRcv::new(self, 0))
}
pub fn get_msg_id(&self) -> u64 {
self.mutex.lock(|state| state.borrow().current_id)
}
pub fn try_get(&self) -> Option<T> {
WatchBehavior::try_get(self, None)
}
pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
where
F: Fn(&T) -> bool,
{
WatchBehavior::try_get_and(self, None, &mut f)
}
}
#[derive(Debug)]
pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
watch: &'a W,
_phantom: PhantomData<T>,
}
impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> {
fn clone(&self) -> Self {
Self {
watch: self.watch,
_phantom: PhantomData,
}
}
}
impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
fn new(watch: &'a W) -> Self {
Self {
watch,
_phantom: PhantomData,
}
}
pub fn send(&self, val: T) {
self.watch.send(val)
}
pub fn clear(&self) {
self.watch.clear()
}
pub fn try_get(&self) -> Option<T> {
self.watch.try_get(None)
}
pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
where
F: Fn(&T) -> bool,
{
self.watch.try_get_and(None, &mut f)
}
pub fn contains_value(&self) -> bool {
self.watch.contains_value()
}
pub fn send_modify<F>(&self, mut f: F)
where
F: Fn(&mut Option<T>),
{
self.watch.send_modify(&mut f)
}
pub fn send_if_modified<F>(&self, mut f: F)
where
F: Fn(&mut Option<T>) -> bool,
{
self.watch.send_if_modified(&mut f)
}
}
#[derive(Debug)]
pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>);
impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> {
pub fn as_dyn(self) -> DynSender<'a, T> {
DynSender(Snd::new(self.watch))
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> {
fn into(self) -> DynSender<'a, T> {
self.as_dyn()
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> {
type Target = Snd<'a, T, Watch<M, T, N>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>);
impl<'a, T: Clone> Clone for DynSender<'a, T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<'a, T: Clone> Deref for DynSender<'a, T> {
type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a, T: Clone> DerefMut for DynSender<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
watch: &'a W,
at_id: u64,
_phantom: PhantomData<T>,
}
impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
fn new(watch: &'a W, at_id: u64) -> Self {
Self {
watch,
at_id,
_phantom: PhantomData,
}
}
pub fn get(&mut self) -> impl Future<Output = T> + '_ {
poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx))
}
pub fn try_get(&mut self) -> Option<T> {
self.watch.try_get(Some(&mut self.at_id))
}
pub async fn get_and<F>(&mut self, mut f: F) -> T
where
F: Fn(&T) -> bool,
{
poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await
}
pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
where
F: Fn(&T) -> bool,
{
self.watch.try_get_and(Some(&mut self.at_id), &mut f)
}
pub async fn changed(&mut self) -> T {
poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await
}
pub fn try_changed(&mut self) -> Option<T> {
self.watch.try_changed(&mut self.at_id)
}
pub async fn changed_and<F>(&mut self, mut f: F) -> T
where
F: Fn(&T) -> bool,
{
poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await
}
pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
where
F: Fn(&T) -> bool,
{
self.watch.try_changed_and(&mut self.at_id, &mut f)
}
pub fn contains_value(&self) -> bool {
self.watch.contains_value()
}
}
impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
fn drop(&mut self) {
self.watch.drop_receiver();
}
}
#[derive(Debug)]
pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
watch: &'a W,
at_id: u64,
_phantom: PhantomData<T>,
}
impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> {
fn new(watch: &'a W, at_id: u64) -> Self {
Self {
watch,
at_id,
_phantom: PhantomData,
}
}
pub fn try_get(&mut self) -> Option<T> {
self.watch.try_get(Some(&mut self.at_id))
}
pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
where
F: Fn(&T) -> bool,
{
self.watch.try_get_and(Some(&mut self.at_id), &mut f)
}
pub fn try_changed(&mut self) -> Option<T> {
self.watch.try_changed(&mut self.at_id)
}
pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
where
F: Fn(&T) -> bool,
{
self.watch.try_changed_and(&mut self.at_id, &mut f)
}
pub fn contains_value(&self) -> bool {
self.watch.contains_value()
}
}
pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> {
pub fn as_dyn(self) -> DynReceiver<'a, T> {
let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id));
core::mem::forget(self); rcv
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> {
fn into(self) -> DynReceiver<'a, T> {
self.as_dyn()
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
type Target = Rcv<'a, T, Watch<M, T, N>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Debug)]
pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>);
impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> {
pub fn as_dyn(self) -> DynAnonReceiver<'a, T> {
let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id));
core::mem::forget(self); rcv
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> {
fn into(self) -> DynAnonReceiver<'a, T> {
self.as_dyn()
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> {
type Target = AnonRcv<'a, T, Watch<M, T, N>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>);
impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> {
type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[cfg(test)]
mod tests {
use futures_executor::block_on;
use super::Watch;
use crate::blocking_mutex::raw::CriticalSectionRawMutex;
#[test]
fn multiple_sends() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
let mut rcv = WATCH.receiver().unwrap();
let snd = WATCH.sender();
assert_eq!(rcv.try_changed(), None);
snd.send(10);
assert_eq!(rcv.changed().await, 10);
snd.send(20);
assert_eq!(rcv.try_changed(), Some(20));
assert_eq!(rcv.try_changed(), None);
};
block_on(f);
}
#[test]
fn all_try_get() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
let mut rcv = WATCH.receiver().unwrap();
let snd = WATCH.sender();
assert_eq!(WATCH.try_get(), None);
assert_eq!(rcv.try_get(), None);
assert_eq!(snd.try_get(), None);
snd.send(10);
assert_eq!(WATCH.try_get(), Some(10));
assert_eq!(rcv.try_get(), Some(10));
assert_eq!(snd.try_get(), Some(10));
assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10));
assert_eq!(rcv.try_get_and(|x| x > &5), Some(10));
assert_eq!(snd.try_get_and(|x| x > &5), Some(10));
assert_eq!(WATCH.try_get_and(|x| x < &5), None);
assert_eq!(rcv.try_get_and(|x| x < &5), None);
assert_eq!(snd.try_get_and(|x| x < &5), None);
};
block_on(f);
}
#[test]
fn once_lock_like() {
let f = async {
static CONFIG0: u8 = 10;
static CONFIG1: u8 = 20;
static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new();
let mut rcv = WATCH.receiver().unwrap();
let snd = WATCH.sender();
assert_eq!(rcv.try_changed(), None);
snd.send(&CONFIG0);
let rcv0 = rcv.changed().await;
assert_eq!(rcv0, &10);
snd.send(&CONFIG1);
let rcv1 = rcv.try_changed();
assert_eq!(rcv1, Some(&20));
assert_eq!(rcv.try_changed(), None);
assert_eq!(rcv0, &CONFIG0);
assert_eq!(rcv1, Some(&CONFIG1));
};
block_on(f);
}
#[test]
fn sender_modify() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
let mut rcv = WATCH.receiver().unwrap();
let snd = WATCH.sender();
snd.send(10);
assert_eq!(rcv.try_changed(), Some(10));
snd.send_modify(|opt| {
if let Some(inner) = opt {
*inner += 5;
}
});
assert_eq!(rcv.try_changed(), Some(15));
assert_eq!(rcv.try_changed(), None);
};
block_on(f);
}
#[test]
fn predicate_fn() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
let mut rcv = WATCH.receiver().unwrap();
let snd = WATCH.sender();
snd.send(15);
assert_eq!(rcv.try_get_and(|x| x > &5), Some(15));
assert_eq!(rcv.try_get_and(|x| x < &5), None);
assert!(rcv.try_changed().is_none());
snd.send(20);
assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20));
assert_eq!(rcv.try_changed_and(|x| x > &5), None);
snd.send(25);
assert_eq!(rcv.try_changed_and(|x| x < &5), None);
assert_eq!(rcv.try_changed(), Some(25));
snd.send(30);
assert_eq!(rcv.changed_and(|x| x > &5).await, 30);
assert_eq!(rcv.get_and(|x| x > &5).await, 30);
};
block_on(f);
}
#[test]
fn receive_after_create() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
let snd = WATCH.sender();
snd.send(10);
let mut rcv = WATCH.receiver().unwrap();
assert_eq!(rcv.try_changed(), Some(10));
};
block_on(f);
}
#[test]
fn max_receivers_drop() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
let rcv0 = WATCH.receiver();
let rcv1 = WATCH.receiver();
let rcv2 = WATCH.receiver();
assert!(rcv0.is_some());
assert!(rcv1.is_some());
assert!(rcv2.is_none());
drop(rcv0);
let rcv3 = WATCH.receiver();
assert!(rcv3.is_some());
};
block_on(f);
}
#[test]
fn multiple_receivers() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
let mut rcv0 = WATCH.receiver().unwrap();
let mut rcv1 = WATCH.anon_receiver();
let snd = WATCH.sender();
assert_eq!(rcv0.try_changed(), None);
assert_eq!(rcv1.try_changed(), None);
snd.send(0);
assert_eq!(rcv0.try_changed(), Some(0));
assert_eq!(rcv1.try_changed(), Some(0));
};
block_on(f);
}
#[test]
fn clone_senders() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
let snd0 = WATCH.sender();
let snd1 = snd0.clone();
let mut rcv = WATCH.receiver().unwrap().as_dyn();
snd0.send(10);
assert_eq!(rcv.try_changed(), Some(10));
snd1.send(20);
assert_eq!(rcv.try_changed(), Some(20));
};
block_on(f);
}
#[test]
fn use_dynamics() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
let mut anon_rcv = WATCH.dyn_anon_receiver();
let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
let dyn_snd = WATCH.dyn_sender();
dyn_snd.send(10);
assert_eq!(anon_rcv.try_changed(), Some(10));
assert_eq!(dyn_rcv.try_changed(), Some(10));
assert_eq!(dyn_rcv.try_changed(), None);
};
block_on(f);
}
#[test]
fn convert_to_dyn() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
let anon_rcv = WATCH.anon_receiver();
let rcv = WATCH.receiver().unwrap();
let snd = WATCH.sender();
let mut dyn_anon_rcv = anon_rcv.as_dyn();
let mut dyn_rcv = rcv.as_dyn();
let dyn_snd = snd.as_dyn();
dyn_snd.send(10);
assert_eq!(dyn_anon_rcv.try_changed(), Some(10));
assert_eq!(dyn_rcv.try_changed(), Some(10));
assert_eq!(dyn_rcv.try_changed(), None);
};
block_on(f);
}
#[test]
fn dynamic_receiver_count() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
let rcv0 = WATCH.receiver();
let rcv1 = WATCH.receiver();
let rcv2 = WATCH.receiver();
assert!(rcv0.is_some());
assert!(rcv1.is_some());
assert!(rcv2.is_none());
let dyn_rcv0 = rcv0.unwrap().as_dyn();
drop(dyn_rcv0);
let rcv3 = WATCH.receiver();
let rcv4 = WATCH.receiver();
assert!(rcv3.is_some());
assert!(rcv4.is_none());
};
block_on(f);
}
#[test]
fn contains_value() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
let rcv = WATCH.receiver().unwrap();
let snd = WATCH.sender();
assert_eq!(rcv.contains_value(), false);
assert_eq!(snd.contains_value(), false);
snd.send(10);
assert_eq!(rcv.contains_value(), true);
assert_eq!(snd.contains_value(), true);
};
block_on(f);
}
}