#[cfg(not(watcher_loom))]
use std::sync;
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
sync::{Arc, RwLockReadGuard, Weak},
task::{self, ready, Poll, Waker},
};
#[cfg(watcher_loom)]
use loom::sync;
use n0_error::StackError;
use sync::{Mutex, RwLock};
#[derive(Debug, Default)]
pub struct Watchable<T> {
shared: Arc<Shared<T>>,
}
impl<T> Clone for Watchable<T> {
fn clone(&self) -> Self {
Self {
shared: self.shared.clone(),
}
}
}
pub trait Nullable<T> {
fn into_option(self) -> Option<T>;
}
impl<T> Nullable<T> for Option<T> {
fn into_option(self) -> Option<T> {
self
}
}
impl<T> Nullable<T> for Vec<T> {
fn into_option(mut self) -> Option<T> {
self.pop()
}
}
impl<T: Clone + Eq> Watchable<T> {
pub fn new(value: T) -> Self {
Self {
shared: Arc::new(Shared {
state: RwLock::new(State {
value,
epoch: INITIAL_EPOCH,
}),
wakers: Default::default(),
}),
}
}
pub fn set(&self, value: T) -> Result<T, T> {
let mut state = self.shared.state.write().expect("poisoned");
let changed = state.value != value;
let ret = if changed {
let old = std::mem::replace(&mut state.value, value);
state.epoch += 1;
Ok(old)
} else {
Err(value)
};
drop(state);
if changed {
for watcher in self.shared.wakers.lock().expect("poisoned").drain(..) {
watcher.wake();
}
}
ret
}
pub fn watch(&self) -> Direct<T> {
Direct {
state: self.shared.state().clone(),
shared: Some(Arc::downgrade(&self.shared)),
}
}
pub fn get(&self) -> T {
self.shared.get()
}
pub fn has_watchers(&self) -> bool {
Arc::weak_count(&self.shared) != 0
}
}
impl<T> Drop for Shared<T> {
fn drop(&mut self) {
let Ok(mut watchers) = self.wakers.lock() else {
return; };
for watcher in watchers.drain(..) {
watcher.wake();
}
}
}
pub trait Watcher: Clone {
type Value: Clone + Eq;
fn get(&mut self) -> Self::Value {
self.update();
self.peek().clone()
}
fn update(&mut self) -> bool;
fn peek(&self) -> &Self::Value;
fn is_connected(&self) -> bool;
fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>>;
fn updated(&mut self) -> NextFut<'_, Self> {
NextFut { watcher: self }
}
fn initialized<T, W>(&mut self) -> InitializedFut<'_, T, W, Self>
where
W: Nullable<T> + Clone,
Self: Watcher<Value = W>,
{
InitializedFut {
initial: self.get().into_option(),
watcher: self,
}
}
fn stream(mut self) -> Stream<Self>
where
Self: Unpin,
{
Stream {
initial: Some(self.get()),
watcher: self,
}
}
fn stream_updates_only(self) -> Stream<Self>
where
Self: Unpin,
{
Stream {
initial: None,
watcher: self,
}
}
fn map<T: Clone + Eq>(
mut self,
map: impl Fn(Self::Value) -> T + Send + Sync + 'static,
) -> Map<Self, T> {
Map {
current: (map)(self.get()),
map: Arc::new(map),
watcher: self,
}
}
fn or<W: Watcher>(self, other: W) -> Tuple<Self, W> {
Tuple::new(self, other)
}
}
#[derive(Debug, Clone)]
pub struct Direct<T> {
state: State<T>,
shared: Option<Weak<Shared<T>>>,
}
impl<T: Clone + Eq> Watcher for Direct<T> {
type Value = T;
fn update(&mut self) -> bool {
let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
self.shared = None; return false;
};
let state = shared.state();
if state.epoch > self.state.epoch {
self.state = state.clone();
true
} else {
false
}
}
fn peek(&self) -> &Self::Value {
&self.state.value
}
fn is_connected(&self) -> bool {
self.shared
.as_ref()
.and_then(|weak| weak.upgrade())
.is_some()
}
fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
self.shared = None; return Poll::Ready(Err(Disconnected));
};
self.state = ready!(shared.poll_updated(cx, self.state.epoch));
Poll::Ready(Ok(()))
}
}
#[derive(Debug, Clone)]
pub struct Tuple<S: Watcher, T: Watcher> {
inner: (S, T),
current: (S::Value, T::Value),
}
impl<S: Watcher, T: Watcher> Tuple<S, T> {
pub fn new(mut s: S, mut t: T) -> Self {
let current = (s.get(), t.get());
Self {
inner: (s, t),
current,
}
}
}
impl<S: Watcher, T: Watcher> Watcher for Tuple<S, T> {
type Value = (S::Value, T::Value);
fn update(&mut self) -> bool {
let s_updated = self.inner.0.update();
let t_updated = self.inner.1.update();
let updated = s_updated || t_updated;
if updated {
self.current = (self.inner.0.peek().clone(), self.inner.1.peek().clone());
}
updated
}
fn peek(&self) -> &Self::Value {
&self.current
}
fn is_connected(&self) -> bool {
self.inner.0.is_connected() && self.inner.1.is_connected()
}
fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
let poll_0 = self.inner.0.poll_updated(cx)?;
let poll_1 = self.inner.1.poll_updated(cx)?;
if poll_0.is_pending() && poll_1.is_pending() {
return Poll::Pending;
}
if poll_0.is_ready() {
self.current.0 = self.inner.0.peek().clone();
}
if poll_1.is_ready() {
self.current.1 = self.inner.1.peek().clone();
}
Poll::Ready(Ok(()))
}
}
#[derive(Debug, Clone)]
pub struct Triple<S: Watcher, T: Watcher, U: Watcher> {
inner: (S, T, U),
current: (S::Value, T::Value, U::Value),
}
impl<S: Watcher, T: Watcher, U: Watcher> Triple<S, T, U> {
pub fn new(mut s: S, mut t: T, mut u: U) -> Self {
let current = (s.get(), t.get(), u.get());
Self {
inner: (s, t, u),
current,
}
}
}
impl<S: Watcher, T: Watcher, U: Watcher> Watcher for Triple<S, T, U> {
type Value = (S::Value, T::Value, U::Value);
fn update(&mut self) -> bool {
let s_updated = self.inner.0.update();
let t_updated = self.inner.1.update();
let u_updated = self.inner.2.update();
let updated = s_updated || t_updated || u_updated;
if updated {
self.current = (
self.inner.0.peek().clone(),
self.inner.1.peek().clone(),
self.inner.2.peek().clone(),
);
}
updated
}
fn peek(&self) -> &Self::Value {
&self.current
}
fn is_connected(&self) -> bool {
self.inner.0.is_connected() && self.inner.1.is_connected() && self.inner.2.is_connected()
}
fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
let poll_0 = self.inner.0.poll_updated(cx)?;
let poll_1 = self.inner.1.poll_updated(cx)?;
let poll_2 = self.inner.2.poll_updated(cx)?;
if poll_0.is_pending() && poll_1.is_pending() && poll_2.is_pending() {
return Poll::Pending;
}
if poll_0.is_ready() {
self.current.0 = self.inner.0.peek().clone();
}
if poll_1.is_ready() {
self.current.1 = self.inner.1.peek().clone();
}
if poll_2.is_ready() {
self.current.2 = self.inner.2.peek().clone();
}
Poll::Ready(Ok(()))
}
}
#[derive(Debug, Clone)]
pub struct Join<T: Clone + Eq, W: Watcher<Value = T>> {
watchers: Vec<W>,
current: Vec<T>,
}
impl<T: Clone + Eq, W: Watcher<Value = T>> Join<T, W> {
pub fn new(watchers: impl Iterator<Item = W>) -> Self {
let mut watchers: Vec<W> = watchers.into_iter().collect();
let mut current = Vec::with_capacity(watchers.len());
for watcher in &mut watchers {
current.push(watcher.get());
}
Self { watchers, current }
}
}
impl<T: Clone + Eq, W: Watcher<Value = T>> Watcher for Join<T, W> {
type Value = Vec<T>;
fn update(&mut self) -> bool {
let mut any_updated = false;
for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
if watcher.update() {
any_updated = true;
*value = watcher.peek().clone();
}
}
any_updated
}
fn peek(&self) -> &Self::Value {
&self.current
}
fn is_connected(&self) -> bool {
self.watchers.iter().all(|w| w.is_connected())
}
fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
let mut any_updated = false;
for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
if watcher.poll_updated(cx)?.is_ready() {
any_updated = true;
*value = watcher.peek().clone();
}
}
if any_updated {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
}
#[derive(derive_more::Debug, Clone)]
pub struct Map<W: Watcher, T: Clone + Eq> {
#[debug("Arc<dyn Fn(W::Value) -> T>")]
map: Arc<dyn Fn(W::Value) -> T + Send + Sync + 'static>,
watcher: W,
current: T,
}
impl<W: Watcher, T: Clone + Eq> Watcher for Map<W, T> {
type Value = T;
fn update(&mut self) -> bool {
if self.watcher.update() {
let new = (self.map)(self.watcher.peek().clone());
if new != self.current {
self.current = new;
true
} else {
false
}
} else {
false
}
}
fn peek(&self) -> &Self::Value {
&self.current
}
fn is_connected(&self) -> bool {
self.watcher.is_connected()
}
fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
loop {
ready!(self.watcher.poll_updated(cx)?);
let new = (self.map)(self.watcher.peek().clone());
if new != self.current {
self.current = new;
return Poll::Ready(Ok(()));
}
}
}
}
#[derive(Debug)]
pub struct NextFut<'a, W: Watcher> {
watcher: &'a mut W,
}
impl<W: Watcher> Future for NextFut<'_, W> {
type Output = Result<W::Value, Disconnected>;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
ready!(self.watcher.poll_updated(cx))?;
Poll::Ready(Ok(self.watcher.peek().clone()))
}
}
#[derive(Debug)]
pub struct InitializedFut<'a, T, V: Nullable<T> + Clone, W: Watcher<Value = V>> {
initial: Option<T>,
watcher: &'a mut W,
}
impl<T: Clone + Eq + Unpin, V: Nullable<T> + Clone, W: Watcher<Value = V> + Unpin> Future
for InitializedFut<'_, T, V, W>
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
if let Some(value) = this.initial.take() {
return Poll::Ready(value);
}
loop {
if ready!(this.watcher.poll_updated(cx)).is_err() {
return Poll::Pending;
};
let value = this.watcher.peek();
if let Some(value) = value.clone().into_option() {
return Poll::Ready(value);
}
}
}
}
#[derive(Debug, Clone)]
pub struct Stream<W: Watcher + Unpin> {
initial: Option<W::Value>,
watcher: W,
}
impl<W: Watcher + Unpin> n0_future::Stream for Stream<W>
where
W::Value: Unpin,
{
type Item = W::Value;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(value) = self.as_mut().initial.take() {
return Poll::Ready(Some(value));
}
match self.as_mut().watcher.poll_updated(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Some(self.as_ref().watcher.peek().clone())),
Poll::Ready(Err(Disconnected)) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(StackError)]
#[error("Watcher lost connection to underlying Watchable, it was dropped")]
pub struct Disconnected;
const INITIAL_EPOCH: u64 = 1;
#[derive(Debug, Default)]
struct Shared<T> {
state: RwLock<State<T>>,
wakers: Mutex<VecDeque<Waker>>,
}
#[derive(Debug, Clone)]
struct State<T> {
value: T,
epoch: u64,
}
impl<T: Default> Default for State<T> {
fn default() -> Self {
Self {
value: Default::default(),
epoch: INITIAL_EPOCH,
}
}
}
impl<T: Clone> Shared<T> {
fn get(&self) -> T {
self.state.read().expect("poisoned").value.clone()
}
fn state(&self) -> RwLockReadGuard<'_, State<T>> {
self.state.read().expect("poisoned")
}
fn poll_updated(&self, cx: &mut task::Context<'_>, last_epoch: u64) -> Poll<State<T>> {
{
let state = self.state();
if last_epoch < state.epoch {
return Poll::Ready(state.clone());
}
}
self.add_waker(cx);
#[cfg(watcher_loom)]
loom::thread::yield_now();
{
let state = self.state();
if last_epoch < state.epoch {
return Poll::Ready(state.clone());
}
}
Poll::Pending
}
fn add_waker(&self, cx: &mut task::Context<'_>) {
let mut wakers = self.wakers.lock().expect("poisoned");
for waker in wakers.iter() {
if waker.will_wake(cx.waker()) {
return;
}
}
wakers.push_back(cx.waker().clone());
}
}
#[cfg(test)]
mod tests {
use n0_future::{future::poll_once, StreamExt};
use rand::{rng, Rng};
use tokio::{
task::JoinSet,
time::{Duration, Instant},
};
use tokio_util::sync::CancellationToken;
use super::*;
#[tokio::test]
async fn test_watcher() {
let cancel = CancellationToken::new();
let watchable = Watchable::new(17);
assert_eq!(watchable.watch().stream().next().await.unwrap(), 17);
let start = Instant::now();
let mut tasks = JoinSet::new();
for i in 0..3 {
let mut watch = watchable.watch().stream();
let cancel = cancel.clone();
tasks.spawn(async move {
println!("[{i}] spawn");
let mut expected_value = 17;
loop {
tokio::select! {
biased;
Some(value) = &mut watch.next() => {
println!("{:?} [{i}] update: {value}", start.elapsed());
assert_eq!(value, expected_value);
if expected_value == 17 {
expected_value = 0;
} else {
expected_value += 1;
}
},
_ = cancel.cancelled() => {
println!("{:?} [{i}] cancel", start.elapsed());
assert_eq!(expected_value, 10);
break;
}
}
}
});
}
for i in 0..3 {
let mut watch = watchable.watch().stream_updates_only();
let cancel = cancel.clone();
tasks.spawn(async move {
println!("[{i}] spawn");
let mut expected_value = 0;
loop {
tokio::select! {
biased;
Some(value) = watch.next() => {
println!("{:?} [{i}] stream update: {value}", start.elapsed());
assert_eq!(value, expected_value);
expected_value += 1;
},
_ = cancel.cancelled() => {
println!("{:?} [{i}] cancel", start.elapsed());
assert_eq!(expected_value, 10);
break;
}
else => {
panic!("stream died");
}
}
}
});
}
for next_value in 0..10 {
let sleep = Duration::from_nanos(rng().random_range(0..100_000_000));
println!("{:?} sleep {sleep:?}", start.elapsed());
tokio::time::sleep(sleep).await;
let changed = watchable.set(next_value);
println!("{:?} set {next_value} changed={changed:?}", start.elapsed());
}
println!("cancel");
cancel.cancel();
while let Some(res) = tasks.join_next().await {
res.expect("task failed");
}
}
#[test]
fn test_get() {
let watchable = Watchable::new(None);
assert!(watchable.get().is_none());
watchable.set(Some(1u8)).ok();
assert_eq!(watchable.get(), Some(1u8));
}
#[tokio::test]
async fn test_initialize() {
let watchable = Watchable::new(None);
let mut watcher = watchable.watch();
let mut initialized = watcher.initialized();
let poll = poll_once(&mut initialized).await;
assert!(poll.is_none());
watchable.set(Some(1u8)).ok();
let poll = poll_once(&mut initialized).await;
assert_eq!(poll.unwrap(), 1u8);
}
#[tokio::test]
async fn test_initialize_already_init() {
let watchable = Watchable::new(Some(1u8));
let mut watcher = watchable.watch();
let mut initialized = watcher.initialized();
let poll = poll_once(&mut initialized).await;
assert_eq!(poll.unwrap(), 1u8);
}
#[test]
fn test_initialized_always_resolves() {
#[cfg(not(watcher_loom))]
use std::thread;
#[cfg(watcher_loom)]
use loom::thread;
let test_case = || {
let watchable = Watchable::<Option<u8>>::new(None);
let mut watch = watchable.watch();
let thread = thread::spawn(move || n0_future::future::block_on(watch.initialized()));
watchable.set(Some(42)).ok();
thread::yield_now();
let value: u8 = thread.join().unwrap();
assert_eq!(value, 42);
};
#[cfg(watcher_loom)]
loom::model(test_case);
#[cfg(not(watcher_loom))]
test_case();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_update_cancel_safety() {
let watchable = Watchable::new(0);
let mut watch = watchable.watch();
const MAX: usize = 100_000;
let handle = tokio::spawn(async move {
let mut last_observed = 0;
while last_observed != MAX {
tokio::select! {
val = watch.updated() => {
let Ok(val) = val else {
return;
};
assert_ne!(val, last_observed, "never observe the same value twice, even with cancellation");
last_observed = val;
}
_ = tokio::time::sleep(Duration::from_micros(rng().random_range(0..10_000))) => {
continue;
}
}
}
});
for i in 1..=MAX {
watchable.set(i).ok();
if rng().random_bool(0.2) {
tokio::task::yield_now().await;
}
}
tokio::time::timeout(Duration::from_secs(10), handle)
.await
.unwrap()
.unwrap()
}
#[tokio::test]
async fn test_join_simple() {
let a = Watchable::new(1u8);
let b = Watchable::new(1u8);
let mut ab = Join::new([a.watch(), b.watch()].into_iter());
let stream = ab.clone().stream();
let handle = tokio::task::spawn(async move { stream.take(5).collect::<Vec<_>>().await });
assert_eq!(ab.get(), vec![1, 1]);
a.set(2u8).unwrap();
tokio::task::yield_now().await;
assert_eq!(ab.get(), vec![2, 1]);
b.set(3u8).unwrap();
tokio::task::yield_now().await;
assert_eq!(ab.get(), vec![2, 3]);
a.set(3u8).unwrap();
tokio::task::yield_now().await;
b.set(4u8).unwrap();
tokio::task::yield_now().await;
let values = tokio::time::timeout(Duration::from_secs(5), handle)
.await
.unwrap()
.unwrap();
assert_eq!(
values,
vec![vec![1, 1], vec![2, 1], vec![2, 3], vec![3, 3], vec![3, 4]]
);
}
#[tokio::test]
async fn test_updated_then_disconnect_then_get() {
let watchable = Watchable::new(10);
let mut watcher = watchable.watch();
assert_eq!(watchable.get(), 10);
watchable.set(42).ok();
assert_eq!(watcher.updated().await.unwrap(), 42);
drop(watchable);
assert_eq!(watcher.get(), 42);
}
#[tokio::test(start_paused = true)]
async fn test_update_wakeup_on_watchable_drop() {
let watchable = Watchable::new(10);
let mut watcher = watchable.watch();
let start = Instant::now();
let (_, result) = tokio::time::timeout(Duration::from_secs(2), async move {
tokio::join!(
async move {
tokio::time::sleep(Duration::from_secs(1)).await;
drop(watchable);
},
async move { watcher.updated().await }
)
})
.await
.expect("watcher never updated");
assert_eq!(start.elapsed(), Duration::from_secs(1));
assert!(result.is_err());
}
#[tokio::test(start_paused = true)]
async fn test_update_wakeup_always_a_change() {
let watchable = Watchable::new(10);
let mut watcher = watchable.watch();
let task = tokio::spawn(async move {
let mut last_value = watcher.get();
let mut values = Vec::new();
while let Ok(value) = watcher.updated().await {
values.push(value);
if last_value == value {
return Err("value duplicated");
}
last_value = value;
}
Ok(values)
});
tokio::time::sleep(Duration::from_millis(100)).await;
watchable.set(11).ok();
tokio::time::sleep(Duration::from_millis(100)).await;
let clone = watchable.clone();
drop(clone); tokio::time::sleep(Duration::from_millis(100)).await;
for i in 1..=10 {
watchable.set(i + 11).ok();
tokio::time::sleep(Duration::from_millis(100)).await;
}
drop(watchable);
let values = task
.await
.expect("task panicked")
.expect("value duplicated");
assert_eq!(values, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]);
}
#[test]
fn test_has_watchers() {
let a = Watchable::new(1u8);
assert!(!a.has_watchers());
let b = a.clone();
assert!(!a.has_watchers());
assert!(!b.has_watchers());
let watcher = a.watch();
assert!(a.has_watchers());
assert!(b.has_watchers());
drop(watcher);
assert!(!a.has_watchers());
assert!(!b.has_watchers());
}
#[tokio::test]
async fn test_three_watchers_basic() {
let watchable = Watchable::new(1u8);
let mut w1 = watchable.watch();
let mut w2 = watchable.watch();
let mut w3 = watchable.watch();
assert_eq!(w1.get(), 1);
assert_eq!(w2.get(), 1);
assert_eq!(w3.get(), 1);
watchable.set(42).unwrap();
assert_eq!(w1.updated().await.unwrap(), 42);
assert_eq!(w2.updated().await.unwrap(), 42);
assert_eq!(w3.updated().await.unwrap(), 42);
}
#[tokio::test]
async fn test_three_watchers_skip_intermediate() {
let watchable = Watchable::new(0u8);
let mut watcher = watchable.watch();
watchable.set(1).ok();
watchable.set(2).ok();
watchable.set(3).ok();
watchable.set(4).ok();
let value = watcher.updated().await.unwrap();
assert_eq!(value, 4);
}
#[tokio::test]
async fn test_three_watchers_with_streams() {
let watchable = Watchable::new(10u8);
let mut stream1 = watchable.watch().stream();
let mut stream2 = watchable.watch().stream();
let mut stream3 = watchable.watch().stream_updates_only();
assert_eq!(stream1.next().await.unwrap(), 10);
assert_eq!(stream2.next().await.unwrap(), 10);
watchable.set(20).ok();
assert_eq!(stream1.next().await.unwrap(), 20);
assert_eq!(stream2.next().await.unwrap(), 20);
assert_eq!(stream3.next().await.unwrap(), 20);
}
#[tokio::test]
async fn test_three_watchers_independent() {
let watchable = Watchable::new(0u8);
let mut fast_watcher = watchable.watch();
let mut slow_watcher = watchable.watch();
let mut lazy_watcher = watchable.watch();
watchable.set(1).ok();
assert_eq!(fast_watcher.updated().await.unwrap(), 1);
watchable.set(2).ok();
watchable.set(3).ok();
assert_eq!(slow_watcher.updated().await.unwrap(), 3);
assert_eq!(lazy_watcher.get(), 3);
}
#[tokio::test]
async fn test_combine_three_watchers() {
let a = Watchable::new(1u8);
let b = Watchable::new(2u8);
let c = Watchable::new(3u8);
let mut combined = Triple::new(a.watch(), b.watch(), c.watch());
assert_eq!(combined.get(), (1, 2, 3));
b.set(20).ok();
assert_eq!(combined.updated().await.unwrap(), (1, 20, 3));
c.set(30).ok();
assert_eq!(combined.updated().await.unwrap(), (1, 20, 30));
}
#[tokio::test]
async fn test_three_watchers_disconnection() {
let watchable = Watchable::new(5u8);
let mut w1 = watchable.watch();
let mut w2 = watchable.watch();
let mut w3 = watchable.watch();
drop(watchable);
assert!(!w1.is_connected());
assert!(!w2.is_connected());
assert!(!w3.is_connected());
assert_eq!(w1.get(), 5);
assert_eq!(w2.get(), 5);
assert!(w3.updated().await.is_err());
}
#[tokio::test]
async fn test_three_watchers_truly_concurrent() {
use tokio::time::sleep;
let watchable = Watchable::new(0u8);
let mut reader_handles = vec![];
for i in 0..3 {
let mut watcher = watchable.watch();
let handle = tokio::spawn(async move {
let mut values = vec![];
for _ in 0..5 {
if let Ok(value) = watcher.updated().await {
values.push(value);
} else {
break;
}
}
(i, values)
});
reader_handles.push(handle);
}
let mut writer_handles = vec![];
for i in 0..3 {
let watchable_clone = watchable.clone();
let handle = tokio::spawn(async move {
for j in 0..5 {
let value = (i * 10) + j;
watchable_clone.set(value).ok();
sleep(Duration::from_millis(5)).await;
}
});
writer_handles.push(handle);
}
for handle in writer_handles {
handle.await.unwrap();
}
for handle in reader_handles {
let (task_id, values) = handle.await.unwrap();
println!("Reader {}: saw values {:?}", task_id, values);
assert!(!values.is_empty());
}
}
#[tokio::test]
async fn test_peek() {
let a = Watchable::new(vec![1, 2, 3]);
let mut wa = a.watch();
assert_eq!(wa.get(), vec![1, 2, 3]);
assert_eq!(wa.peek(), &vec![1, 2, 3]);
let mut wa_map = wa.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
assert_eq!(wa_map.get(), vec![2, 4, 6]);
assert_eq!(wa_map.peek(), &vec![2, 4, 6]);
let mut wb = a.watch();
assert_eq!(wb.get(), vec![1, 2, 3]);
assert_eq!(wb.peek(), &vec![1, 2, 3]);
let mut wb_map = wb.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
assert_eq!(wb_map.get(), vec![2, 4, 6]);
assert_eq!(wb_map.peek(), &vec![2, 4, 6]);
let mut w_join = Join::new([wa_map, wb_map].into_iter());
assert_eq!(w_join.get(), vec![vec![2, 4, 6], vec![2, 4, 6]]);
assert_eq!(w_join.peek(), &vec![vec![2, 4, 6], vec![2, 4, 6]]);
}
#[tokio::test]
async fn test_update_updates_peek() {
let value = Watchable::new(42);
let mut watcher = value.watch();
assert_eq!(watcher.peek(), &42);
assert!(!watcher.update());
value.set(50).ok();
assert_eq!(watcher.peek(), &42); assert!(watcher.update()); assert_eq!(watcher.peek(), &50);
assert!(!watcher.update());
let mut watcher_map = watcher.clone().map(|v| v * 2);
assert_eq!(watcher_map.peek(), &100);
assert!(!watcher_map.update());
value.set(10).ok();
assert_eq!(watcher_map.peek(), &100);
assert!(watcher_map.update());
assert_eq!(watcher_map.peek(), &20);
assert!(!watcher_map.update());
let value2 = Watchable::new(0);
let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
assert_eq!(watcher_join.peek(), &vec![10, 0]);
assert!(!watcher_join.update());
value.set(0).ok();
value2.set(1).ok();
assert_eq!(watcher_join.peek(), &vec![10, 0]);
assert!(watcher_join.update());
assert_eq!(watcher_join.peek(), &vec![0, 1]);
assert!(!watcher_join.update());
}
#[tokio::test]
async fn test_get_updates_peek() {
let value = Watchable::new(42);
let mut watcher = value.watch();
assert_eq!(watcher.peek(), &42);
assert!(!watcher.update());
value.set(50).ok();
assert_eq!(watcher.peek(), &42); assert_eq!(watcher.get(), 50); assert_eq!(watcher.peek(), &50);
assert!(!watcher.update());
let mut watcher_map = watcher.clone().map(|v| v * 2);
assert_eq!(watcher_map.peek(), &100);
assert!(!watcher_map.update());
value.set(10).ok();
assert_eq!(watcher_map.peek(), &100);
assert_eq!(watcher_map.get(), 20);
assert_eq!(watcher_map.peek(), &20);
assert!(!watcher_map.update());
let value2 = Watchable::new(0);
let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
assert_eq!(watcher_join.peek(), &vec![10, 0]);
assert!(!watcher_join.update());
value.set(0).ok();
value2.set(1).ok();
assert_eq!(watcher_join.peek(), &vec![10, 0]);
assert_eq!(watcher_join.get(), vec![0, 1]);
assert_eq!(watcher_join.peek(), &vec![0, 1]);
assert!(!watcher_join.update());
}
#[tokio::test]
async fn test_ensure_wakers_bounded() {
use tokio::time::{interval, Duration};
let watchable = Watchable::new(0);
let mut watcher = watchable.watch();
let max_tick = 1000;
let handle = tokio::spawn(async move {
let mut ticker = interval(Duration::from_nanos(1));
let mut tick_no = 0;
loop {
tokio::select! {
_ = watcher.updated() => {}
_ = ticker.tick() => {
tick_no += 1;
if tick_no > max_tick{
return
}
}
}
let num_wakers = watchable.shared.wakers.lock().unwrap().len();
assert_eq!(num_wakers, 1);
}
});
tokio::time::timeout(Duration::from_secs(1), handle)
.await
.unwrap()
.unwrap()
}
}