Skip to main content

n0_watcher/
lib.rs

1//! Watchable values.
2//!
3//! A [`Watchable`] exists to keep track of a value which may change over time.  It allows
4//! observers to be notified of changes to the value.  The aim is to always be aware of the
5//! **last** value, not to observe *every* value change.
6//!
7//! The reason for this is ergonomics and predictable resource usage: Requiring every
8//! intermediate value to be observable would mean that either the side that sets new values
9//! using [`Watchable::set`] would need to wait for all "receivers" of these intermediate
10//! values to catch up and thus be an async operation, or it would require the receivers
11//! to buffer intermediate values until they've been "received" on the [`Watcher`]s with
12//! an unlimited buffer size and thus potentially unlimited memory growth.
13//!
14//! # Example
15//!
16//! ```
17//! use n0_future::StreamExt;
18//! use n0_watcher::{Watchable, Watcher as _};
19//!
20//! #[tokio::main(flavor = "current_thread", start_paused = true)]
21//! async fn main() {
22//!     let watchable = Watchable::new(None);
23//!
24//!     // A task that waits for the watcher to be initialized to Some(value) before printing it
25//!     let mut watcher = watchable.watch();
26//!     tokio::spawn(async move {
27//!         let initialized_value = watcher.initialized().await;
28//!         println!("initialized: {initialized_value}");
29//!     });
30//!
31//!     // A task that prints every update to the watcher since the initial one:
32//!     let mut updates = watchable.watch().stream_updates_only();
33//!     tokio::spawn(async move {
34//!         while let Some(update) = updates.next().await {
35//!             println!("update: {update:?}");
36//!         }
37//!     });
38//!
39//!     // A task that prints the current value and then every update it can catch,
40//!     // but it also does something else which makes it very slow to pick up new
41//!     // values, so it'll skip some:
42//!     let mut current_and_updates = watchable.watch().stream();
43//!     tokio::spawn(async move {
44//!         while let Some(update) = current_and_updates.next().await {
45//!             println!("update2: {update:?}");
46//!             tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
47//!         }
48//!     });
49//!
50//!     for i in 0..20 {
51//!         println!("Setting watchable to {i}");
52//!         watchable.set(Some(i)).ok();
53//!         tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
54//!     }
55//! }
56//! ```
57//!
58//! # Similar but different
59//!
60//! - `async_channel`: This is a multi-producer, multi-consumer channel implementation.
61//!   Only at most one consumer will receive each "produced" value.
62//!   What we want is to have every "produced" value to be "broadcast" to every receiver.
63//! - `tokio::broadcast`: Also a multi-producer, multi-consumer channel implementation.
64//!   This is very similar to this crate (`tokio::broadcast::Sender` is like [`Watchable`]
65//!   and `tokio::broadcast::Receiver` is like [`Watcher`]), but you can't get the latest
66//!   value without `.await`ing on the receiver, and it'll internally store a queue of
67//!   intermediate values.
68//! - `tokio::watch`: Also a MPSC channel, and unlike `tokio::broadcast` only retains the
69//!   latest value. That module has pretty much the same purpose as this crate, but doesn't
70//!   implement a poll-based method of getting updates and doesn't implement combinators.
71//! - [`std::sync::RwLock`]: (wrapped in an [`std::sync::Arc`]) This allows you access
72//!   to the latest values, but might block while it's being set (but that could be short
73//!   enough not to matter for async rust purposes).
74//!   This doesn't allow you to be notified whenever a new value is written.
75//! - The `watchable` crate: We used to use this crate at n0, but we wanted to experiment
76//!   with different APIs and needed Wasm support.
77#[cfg(not(watcher_loom))]
78use std::sync;
79use std::{
80    collections::VecDeque,
81    future::Future,
82    pin::Pin,
83    sync::{Arc, RwLockReadGuard, Weak},
84    task::{self, ready, Poll, Waker},
85};
86
87#[cfg(watcher_loom)]
88use loom::sync;
89use n0_error::StackError;
90use sync::{Mutex, RwLock};
91
92/// A wrapper around a value that notifies [`Watcher`]s when the value is modified.
93///
94/// Only the most recent value is available to any observer, but the observer is guaranteed
95/// to be notified of the most recent value.
96#[derive(Debug, Default)]
97pub struct Watchable<T> {
98    shared: Arc<Shared<T>>,
99}
100
101impl<T> Clone for Watchable<T> {
102    fn clone(&self) -> Self {
103        Self {
104            shared: self.shared.clone(),
105        }
106    }
107}
108
109/// Abstracts over `Option<T>` and `Vec<T>`
110pub trait Nullable<T> {
111    /// Converts this value into an `Option`.
112    fn into_option(self) -> Option<T>;
113}
114
115impl<T> Nullable<T> for Option<T> {
116    fn into_option(self) -> Option<T> {
117        self
118    }
119}
120
121impl<T> Nullable<T> for Vec<T> {
122    fn into_option(mut self) -> Option<T> {
123        self.pop()
124    }
125}
126
127impl<T: Clone + Eq> Watchable<T> {
128    /// Creates a [`Watchable`] initialized to given value.
129    pub fn new(value: T) -> Self {
130        Self {
131            shared: Arc::new(Shared {
132                state: RwLock::new(State {
133                    value,
134                    epoch: INITIAL_EPOCH,
135                }),
136                wakers: Default::default(),
137            }),
138        }
139    }
140
141    /// Sets a new value.
142    ///
143    /// Returns `Ok(previous_value)` if the value was different from the one set, or
144    /// returns the provided value back as `Err(value)` if the value didn't change.
145    ///
146    /// Watchers are only notified if the value changed.
147    pub fn set(&self, value: T) -> Result<T, T> {
148        // We don't actually write when the value didn't change, but there's unfortunately
149        // no way to upgrade a read guard to a write guard, and locking as read first, then
150        // dropping and locking as write introduces a possible race condition.
151        let mut state = self.shared.state.write().expect("poisoned");
152
153        // Find out if the value changed
154        let changed = state.value != value;
155
156        let ret = if changed {
157            let old = std::mem::replace(&mut state.value, value);
158            state.epoch += 1;
159            Ok(old)
160        } else {
161            Err(value)
162        };
163        drop(state); // No need to write anymore
164
165        // Notify watchers
166        if changed {
167            for watcher in self.shared.wakers.lock().expect("poisoned").drain(..) {
168                watcher.wake();
169            }
170        }
171        ret
172    }
173
174    /// Creates a [`Direct`] [`Watcher`], allowing the value to be observed, but not modified.
175    pub fn watch(&self) -> Direct<T> {
176        Direct {
177            state: self.shared.state().clone(),
178            shared: Some(Arc::downgrade(&self.shared)),
179        }
180    }
181
182    /// Returns the currently stored value.
183    pub fn get(&self) -> T {
184        self.shared.get()
185    }
186
187    /// Returns true when there are any watchers actively listening on changes,
188    /// or false when all watchers have been dropped or none have been created yet.
189    pub fn has_watchers(&self) -> bool {
190        // `Watchable`s will increase the strong count
191        // `Direct`s watchers (which all watchers descend from) will increase the weak count
192        Arc::weak_count(&self.shared) != 0
193    }
194}
195
196impl<T> Drop for Shared<T> {
197    fn drop(&mut self) {
198        let Ok(mut watchers) = self.wakers.lock() else {
199            return; // Poisoned waking?
200        };
201        // Wake all watchers once we drop Shared (this happens when
202        // the last `Watchable` is dropped).
203        // This allows us to notify `NextFut::poll`s and have that
204        // return `Disconnected`.
205        for watcher in watchers.drain(..) {
206            watcher.wake();
207        }
208    }
209}
210
211/// A handle to a value that's represented by one or more underlying [`Watchable`]s.
212///
213/// A [`Watcher`] can get the current value, and will be notified when the value changes.
214/// Only the most recent value is accessible, and if the threads with the underlying [`Watchable`]s
215/// change the value faster than the threads with the [`Watcher`] can keep up with, then
216/// it'll miss in-between values.
217/// When the thread changing the [`Watchable`] pauses updating, the [`Watcher`] will always
218/// end up reporting the most recent state eventually.
219///
220/// Watchers can be modified via [`Watcher::map`] to observe a value derived from the original
221/// value via a function.
222///
223/// Watchers can be combined via [`Watcher::or`] to allow observing multiple values at once and
224/// getting an update in case any of the values updates.
225///
226/// One of the underlying [`Watchable`]s might already be dropped. In that case,
227/// the watcher will be "disconnected" and return [`Err(Disconnected)`](Disconnected)
228/// on some function calls or, when turned into a stream, that stream will end.
229/// This property can also be checked with [`Watcher::is_connected`].
230pub trait Watcher: Clone {
231    /// The type of value that can change.
232    ///
233    /// We require `Clone`, because we need to be able to make
234    /// the values have a lifetime that's detached from the original [`Watchable`]'s
235    /// lifetime.
236    ///
237    /// We require `Eq`, to be able to check whether the value actually changed or
238    /// not, so we can notify or not notify accordingly.
239    type Value: Clone + Eq;
240
241    /// Updates the watcher to the latest value and returns that value.
242    ///
243    /// If any of the underlying [`Watchable`] values have been dropped, then this
244    /// might return an outdated value for that watchable, specifically, the latest
245    /// value that was fetched for that watchable, as opposed to the latest value
246    /// that was set on the watchable before it was dropped.
247    ///
248    /// The default implementation for this is simply
249    /// ```ignore
250    /// fn get(&mut self) -> Self::Value {
251    ///     self.update();
252    ///     self.peek().clone()
253    /// }
254    /// ```
255    fn get(&mut self) -> Self::Value {
256        self.update();
257        self.peek().clone()
258    }
259
260    /// Updates the watcher to the latest value and returns whether it changed.
261    ///
262    /// Watchers keep track of the "latest known" value they fetched.
263    /// This function updates that internal value by looking up the latest value
264    /// at the [`Watchable`]\(s\) that this watcher is linked to.
265    fn update(&mut self) -> bool;
266
267    /// Returns a reference to the value currently stored in the watcher.
268    ///
269    /// Watchers keep track of the "latest known" value they fetched.
270    /// Calling this won't update the latest value, unlike [`Watcher::get`] or
271    /// [`Watcher::update`].
272    ///
273    /// This can be useful if you want to avoid copying out the internal value
274    /// frequently like what [`Watcher::get`] will end up doing.
275    fn peek(&self) -> &Self::Value;
276
277    /// Whether this watcher is still connected to all of its underlying [`Watchable`]s.
278    ///
279    /// Returns false when any of the underlying watchables has been dropped.
280    fn is_connected(&self) -> bool;
281
282    /// Polls for the next value, or returns [`Disconnected`] if one of the underlying
283    /// [`Watchable`]s has been dropped.
284    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>>;
285
286    /// Returns a future completing with `Ok(value)` once a new value is set, or with
287    /// [`Err(Disconnected)`](Disconnected) if the connected [`Watchable`] was dropped.
288    ///
289    /// # Cancel Safety
290    ///
291    /// The returned future is cancel-safe.
292    fn updated(&mut self) -> NextFut<'_, Self> {
293        NextFut { watcher: self }
294    }
295
296    /// Returns a future completing once the value is set to [`Some`] value.
297    ///
298    /// If the current value is [`Some`] value, this future will resolve immediately.
299    ///
300    /// This is a utility for the common case of storing an [`Option`] inside a
301    /// [`Watchable`].
302    ///
303    /// # Cancel Safety
304    ///
305    /// The returned future is cancel-safe.
306    fn initialized<T, W>(&mut self) -> InitializedFut<'_, T, W, Self>
307    where
308        W: Nullable<T> + Clone,
309        Self: Watcher<Value = W>,
310    {
311        InitializedFut {
312            initial: self.get().into_option(),
313            watcher: self,
314        }
315    }
316
317    /// Returns a stream which will yield the most recent values as items.
318    ///
319    /// The first item of the stream is the current value, so that this stream can be easily
320    /// used to operate on the most recent value.
321    ///
322    /// Note however, that only the last item is stored.  If the stream is not polled when an
323    /// item is available it can be replaced with another item by the time it is polled.
324    ///
325    /// This stream ends once the original [`Watchable`] has been dropped.
326    ///
327    /// # Cancel Safety
328    ///
329    /// The returned stream is cancel-safe.
330    fn stream(mut self) -> Stream<Self>
331    where
332        Self: Unpin,
333    {
334        Stream {
335            initial: Some(self.get()),
336            watcher: self,
337        }
338    }
339
340    /// Returns a stream which will yield the most recent values as items, starting from
341    /// the next unobserved future value.
342    ///
343    /// This means this stream will only yield values when the watched value changes,
344    /// the value stored at the time the stream is created is not yielded.
345    ///
346    /// Note however, that only the last item is stored.  If the stream is not polled when an
347    /// item is available it can be replaced with another item by the time it is polled.
348    ///
349    /// This stream ends once the original [`Watchable`] has been dropped.
350    ///
351    /// # Cancel Safety
352    ///
353    /// The returned stream is cancel-safe.
354    fn stream_updates_only(self) -> Stream<Self>
355    where
356        Self: Unpin,
357    {
358        Stream {
359            initial: None,
360            watcher: self,
361        }
362    }
363
364    /// Maps this watcher with a function that transforms the observed values.
365    ///
366    /// The returned watcher will only register updates, when the *mapped* value
367    /// observably changes.
368    fn map<T: Clone + Eq>(
369        mut self,
370        map: impl Fn(Self::Value) -> T + Send + Sync + 'static,
371    ) -> Map<Self, T> {
372        Map {
373            current: (map)(self.get()),
374            map: Arc::new(map),
375            watcher: self,
376        }
377    }
378
379    /// Returns a watcher that updates every time this or the other watcher
380    /// updates, and yields both watcher's items together when that happens.
381    fn or<W: Watcher>(self, other: W) -> Tuple<Self, W> {
382        Tuple::new(self, other)
383    }
384}
385
386/// The immediate, direct observer of a [`Watchable`] value.
387///
388/// This type is mainly used via the [`Watcher`] interface.
389#[derive(Debug, Clone)]
390pub struct Direct<T> {
391    state: State<T>,
392    // We wrap the Weak with an Option, so that we can set it to `None` once we
393    // notice that Weak is not upgradable anymore for the first time.
394    // This allows the weak pointer's allocation to be freed in case this makes
395    // the weak count go to zero (even if Direct is still kept around).
396    shared: Option<Weak<Shared<T>>>,
397}
398
399impl<T: Clone + Eq> Watcher for Direct<T> {
400    type Value = T;
401
402    fn update(&mut self) -> bool {
403        let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
404            self.shared = None; // Weak won't be upgradable in the future, this way we can allow the allocation to be freed
405            return false;
406        };
407        let state = shared.state();
408        if state.epoch > self.state.epoch {
409            self.state = state.clone();
410            true
411        } else {
412            false
413        }
414    }
415
416    fn peek(&self) -> &Self::Value {
417        &self.state.value
418    }
419
420    fn is_connected(&self) -> bool {
421        self.shared
422            .as_ref()
423            .and_then(|weak| weak.upgrade())
424            .is_some()
425    }
426
427    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
428        let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
429            self.shared = None; // Weak won't be upgradable in the future, this way we can allow the allocation to be freed
430            return Poll::Ready(Err(Disconnected));
431        };
432        self.state = ready!(shared.poll_updated(cx, self.state.epoch));
433        Poll::Ready(Ok(()))
434    }
435}
436
437#[derive(Debug, Clone)]
438pub struct Tuple<S: Watcher, T: Watcher> {
439    inner: (S, T),
440    current: (S::Value, T::Value),
441}
442
443impl<S: Watcher, T: Watcher> Tuple<S, T> {
444    pub fn new(mut s: S, mut t: T) -> Self {
445        let current = (s.get(), t.get());
446        Self {
447            inner: (s, t),
448            current,
449        }
450    }
451}
452
453impl<S: Watcher, T: Watcher> Watcher for Tuple<S, T> {
454    type Value = (S::Value, T::Value);
455
456    fn update(&mut self) -> bool {
457        // We need to update all watchers! So don't early-return
458        let s_updated = self.inner.0.update();
459        let t_updated = self.inner.1.update();
460        let updated = s_updated || t_updated;
461        if updated {
462            self.current = (self.inner.0.peek().clone(), self.inner.1.peek().clone());
463        }
464        updated
465    }
466
467    fn peek(&self) -> &Self::Value {
468        &self.current
469    }
470
471    fn is_connected(&self) -> bool {
472        self.inner.0.is_connected() && self.inner.1.is_connected()
473    }
474
475    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
476        let poll_0 = self.inner.0.poll_updated(cx)?;
477        let poll_1 = self.inner.1.poll_updated(cx)?;
478        if poll_0.is_pending() && poll_1.is_pending() {
479            return Poll::Pending;
480        }
481        if poll_0.is_ready() {
482            self.current.0 = self.inner.0.peek().clone();
483        }
484        if poll_1.is_ready() {
485            self.current.1 = self.inner.1.peek().clone();
486        }
487        Poll::Ready(Ok(()))
488    }
489}
490
491#[derive(Debug, Clone)]
492pub struct Triple<S: Watcher, T: Watcher, U: Watcher> {
493    inner: (S, T, U),
494    current: (S::Value, T::Value, U::Value),
495}
496
497impl<S: Watcher, T: Watcher, U: Watcher> Triple<S, T, U> {
498    pub fn new(mut s: S, mut t: T, mut u: U) -> Self {
499        let current = (s.get(), t.get(), u.get());
500        Self {
501            inner: (s, t, u),
502            current,
503        }
504    }
505}
506
507impl<S: Watcher, T: Watcher, U: Watcher> Watcher for Triple<S, T, U> {
508    type Value = (S::Value, T::Value, U::Value);
509
510    fn update(&mut self) -> bool {
511        // We need to update all watchers! So don't early-return
512        let s_updated = self.inner.0.update();
513        let t_updated = self.inner.1.update();
514        let u_updated = self.inner.2.update();
515        let updated = s_updated || t_updated || u_updated;
516        if updated {
517            self.current = (
518                self.inner.0.peek().clone(),
519                self.inner.1.peek().clone(),
520                self.inner.2.peek().clone(),
521            );
522        }
523        updated
524    }
525
526    fn peek(&self) -> &Self::Value {
527        &self.current
528    }
529
530    fn is_connected(&self) -> bool {
531        self.inner.0.is_connected() && self.inner.1.is_connected() && self.inner.2.is_connected()
532    }
533
534    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
535        let poll_0 = self.inner.0.poll_updated(cx)?;
536        let poll_1 = self.inner.1.poll_updated(cx)?;
537        let poll_2 = self.inner.2.poll_updated(cx)?;
538
539        if poll_0.is_pending() && poll_1.is_pending() && poll_2.is_pending() {
540            return Poll::Pending;
541        }
542        if poll_0.is_ready() {
543            self.current.0 = self.inner.0.peek().clone();
544        }
545        if poll_1.is_ready() {
546            self.current.1 = self.inner.1.peek().clone();
547        }
548        if poll_2.is_ready() {
549            self.current.2 = self.inner.2.peek().clone();
550        }
551        Poll::Ready(Ok(()))
552    }
553}
554
555/// Combinator to join two watchers
556#[derive(Debug, Clone)]
557pub struct Join<T: Clone + Eq, W: Watcher<Value = T>> {
558    // invariant: watchers.len() == current.len()
559    watchers: Vec<W>,
560    current: Vec<T>,
561}
562
563impl<T: Clone + Eq, W: Watcher<Value = T>> Join<T, W> {
564    /// Joins a set of watchers into a single watcher
565    pub fn new(watchers: impl Iterator<Item = W>) -> Self {
566        let mut watchers: Vec<W> = watchers.into_iter().collect();
567
568        let mut current = Vec::with_capacity(watchers.len());
569        for watcher in &mut watchers {
570            current.push(watcher.get());
571        }
572        Self { watchers, current }
573    }
574}
575
576impl<T: Clone + Eq, W: Watcher<Value = T>> Watcher for Join<T, W> {
577    type Value = Vec<T>;
578
579    fn update(&mut self) -> bool {
580        let mut any_updated = false;
581        for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
582            if watcher.update() {
583                any_updated = true;
584                *value = watcher.peek().clone();
585            }
586        }
587        any_updated
588    }
589
590    fn peek(&self) -> &Self::Value {
591        &self.current
592    }
593
594    fn is_connected(&self) -> bool {
595        self.watchers.iter().all(|w| w.is_connected())
596    }
597
598    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
599        let mut any_updated = false;
600        for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
601            if watcher.poll_updated(cx)?.is_ready() {
602                any_updated = true;
603                *value = watcher.peek().clone();
604            }
605        }
606
607        if any_updated {
608            Poll::Ready(Ok(()))
609        } else {
610            Poll::Pending
611        }
612    }
613}
614
615/// Wraps a [`Watcher`] to allow observing a derived value.
616///
617/// See [`Watcher::map`].
618#[derive(derive_more::Debug, Clone)]
619pub struct Map<W: Watcher, T: Clone + Eq> {
620    #[debug("Arc<dyn Fn(W::Value) -> T>")]
621    map: Arc<dyn Fn(W::Value) -> T + Send + Sync + 'static>,
622    watcher: W,
623    current: T,
624}
625
626impl<W: Watcher, T: Clone + Eq> Watcher for Map<W, T> {
627    type Value = T;
628
629    fn update(&mut self) -> bool {
630        if self.watcher.update() {
631            let new = (self.map)(self.watcher.peek().clone());
632            if new != self.current {
633                self.current = new;
634                true
635            } else {
636                false
637            }
638        } else {
639            false
640        }
641    }
642
643    fn peek(&self) -> &Self::Value {
644        &self.current
645    }
646
647    fn is_connected(&self) -> bool {
648        self.watcher.is_connected()
649    }
650
651    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
652        loop {
653            ready!(self.watcher.poll_updated(cx)?);
654            let new = (self.map)(self.watcher.peek().clone());
655            if new != self.current {
656                self.current = new;
657                return Poll::Ready(Ok(()));
658            }
659        }
660    }
661}
662
663/// Future returning the next item after the current one in a [`Watcher`].
664///
665/// See [`Watcher::updated`].
666///
667/// # Cancel Safety
668///
669/// This future is cancel-safe.
670#[derive(Debug)]
671pub struct NextFut<'a, W: Watcher> {
672    watcher: &'a mut W,
673}
674
675impl<W: Watcher> Future for NextFut<'_, W> {
676    type Output = Result<W::Value, Disconnected>;
677
678    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
679        ready!(self.watcher.poll_updated(cx))?;
680        Poll::Ready(Ok(self.watcher.peek().clone()))
681    }
682}
683
684/// Future returning the current or next value that's [`Some`] value.
685/// in a [`Watcher`].
686///
687/// See [`Watcher::initialized`].
688///
689/// # Cancel Safety
690///
691/// This Future is cancel-safe.
692#[derive(Debug)]
693pub struct InitializedFut<'a, T, V: Nullable<T> + Clone, W: Watcher<Value = V>> {
694    initial: Option<T>,
695    watcher: &'a mut W,
696}
697
698impl<T: Clone + Eq + Unpin, V: Nullable<T> + Clone, W: Watcher<Value = V> + Unpin> Future
699    for InitializedFut<'_, T, V, W>
700{
701    type Output = T;
702
703    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
704        let mut this = self.as_mut();
705        if let Some(value) = this.initial.take() {
706            return Poll::Ready(value);
707        }
708        loop {
709            if ready!(this.watcher.poll_updated(cx)).is_err() {
710                // The value will never be initialized
711                return Poll::Pending;
712            };
713            let value = this.watcher.peek();
714            if let Some(value) = value.clone().into_option() {
715                return Poll::Ready(value);
716            }
717        }
718    }
719}
720
721/// A stream for a [`Watcher`]'s next values.
722///
723/// See [`Watcher::stream`] and [`Watcher::stream_updates_only`].
724///
725/// # Cancel Safety
726///
727/// This stream is cancel-safe.
728#[derive(Debug, Clone)]
729pub struct Stream<W: Watcher + Unpin> {
730    initial: Option<W::Value>,
731    watcher: W,
732}
733
734impl<W: Watcher + Unpin> n0_future::Stream for Stream<W>
735where
736    W::Value: Unpin,
737{
738    type Item = W::Value;
739
740    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
741        if let Some(value) = self.as_mut().initial.take() {
742            return Poll::Ready(Some(value));
743        }
744        match self.as_mut().watcher.poll_updated(cx) {
745            Poll::Ready(Ok(())) => Poll::Ready(Some(self.as_ref().watcher.peek().clone())),
746            Poll::Ready(Err(Disconnected)) => Poll::Ready(None),
747            Poll::Pending => Poll::Pending,
748        }
749    }
750}
751
752/// The error for when a [`Watcher`] is disconnected from its underlying
753/// [`Watchable`] value, because of that watchable having been dropped.
754#[derive(StackError)]
755#[error("Watcher lost connection to underlying Watchable, it was dropped")]
756pub struct Disconnected;
757
758// Private:
759
760const INITIAL_EPOCH: u64 = 1;
761
762/// The shared state for a [`Watchable`].
763#[derive(Debug, Default)]
764struct Shared<T> {
765    /// The value to be watched and its current epoch.
766    state: RwLock<State<T>>,
767    wakers: Mutex<VecDeque<Waker>>,
768}
769
770#[derive(Debug, Clone)]
771struct State<T> {
772    value: T,
773    epoch: u64,
774}
775
776impl<T: Default> Default for State<T> {
777    fn default() -> Self {
778        Self {
779            value: Default::default(),
780            epoch: INITIAL_EPOCH,
781        }
782    }
783}
784
785impl<T: Clone> Shared<T> {
786    fn get(&self) -> T {
787        self.state.read().expect("poisoned").value.clone()
788    }
789
790    fn state(&self) -> RwLockReadGuard<'_, State<T>> {
791        self.state.read().expect("poisoned")
792    }
793
794    fn poll_updated(&self, cx: &mut task::Context<'_>, last_epoch: u64) -> Poll<State<T>> {
795        {
796            let state = self.state();
797
798            // We might get spurious wakeups due to e.g. a second-to-last Watchable being dropped.
799            // This makes sure we don't accidentally return an update that's not actually an update.
800            if last_epoch < state.epoch {
801                return Poll::Ready(state.clone());
802            }
803        }
804
805        self.add_waker(cx);
806
807        #[cfg(watcher_loom)]
808        loom::thread::yield_now();
809
810        // We check for an update again to prevent races between putting in wakers and looking for updates.
811        {
812            let state = self.state();
813
814            if last_epoch < state.epoch {
815                return Poll::Ready(state.clone());
816            }
817        }
818
819        Poll::Pending
820    }
821
822    fn add_waker(&self, cx: &mut task::Context<'_>) {
823        let mut wakers = self.wakers.lock().expect("poisoned");
824        for waker in wakers.iter() {
825            if waker.will_wake(cx.waker()) {
826                return;
827            }
828        }
829        wakers.push_back(cx.waker().clone());
830    }
831}
832
833#[cfg(test)]
834mod tests {
835
836    use n0_future::{future::poll_once, StreamExt};
837    use rand::{rng, Rng};
838    use tokio::{
839        task::JoinSet,
840        time::{Duration, Instant},
841    };
842    use tokio_util::sync::CancellationToken;
843
844    use super::*;
845
846    #[tokio::test]
847    async fn test_watcher() {
848        let cancel = CancellationToken::new();
849        let watchable = Watchable::new(17);
850
851        assert_eq!(watchable.watch().stream().next().await.unwrap(), 17);
852
853        let start = Instant::now();
854        // spawn watchers
855        let mut tasks = JoinSet::new();
856        for i in 0..3 {
857            let mut watch = watchable.watch().stream();
858            let cancel = cancel.clone();
859            tasks.spawn(async move {
860                println!("[{i}] spawn");
861                let mut expected_value = 17;
862                loop {
863                    tokio::select! {
864                        biased;
865                        Some(value) = &mut watch.next() => {
866                            println!("{:?} [{i}] update: {value}", start.elapsed());
867                            assert_eq!(value, expected_value);
868                            if expected_value == 17 {
869                                expected_value = 0;
870                            } else {
871                                expected_value += 1;
872                            }
873                        },
874                        _ = cancel.cancelled() => {
875                            println!("{:?} [{i}] cancel", start.elapsed());
876                            assert_eq!(expected_value, 10);
877                            break;
878                        }
879                    }
880                }
881            });
882        }
883        for i in 0..3 {
884            let mut watch = watchable.watch().stream_updates_only();
885            let cancel = cancel.clone();
886            tasks.spawn(async move {
887                println!("[{i}] spawn");
888                let mut expected_value = 0;
889                loop {
890                    tokio::select! {
891                        biased;
892                        Some(value) = watch.next() => {
893                            println!("{:?} [{i}] stream update: {value}", start.elapsed());
894                            assert_eq!(value, expected_value);
895                            expected_value += 1;
896                        },
897                        _ = cancel.cancelled() => {
898                            println!("{:?} [{i}] cancel", start.elapsed());
899                            assert_eq!(expected_value, 10);
900                            break;
901                        }
902                        else => {
903                            panic!("stream died");
904                        }
905                    }
906                }
907            });
908        }
909
910        // set value
911        for next_value in 0..10 {
912            let sleep = Duration::from_nanos(rng().random_range(0..100_000_000));
913            println!("{:?} sleep {sleep:?}", start.elapsed());
914            tokio::time::sleep(sleep).await;
915
916            let changed = watchable.set(next_value);
917            println!("{:?} set {next_value} changed={changed:?}", start.elapsed());
918        }
919
920        println!("cancel");
921        cancel.cancel();
922        while let Some(res) = tasks.join_next().await {
923            res.expect("task failed");
924        }
925    }
926
927    #[test]
928    fn test_get() {
929        let watchable = Watchable::new(None);
930        assert!(watchable.get().is_none());
931
932        watchable.set(Some(1u8)).ok();
933        assert_eq!(watchable.get(), Some(1u8));
934    }
935
936    #[tokio::test]
937    async fn test_initialize() {
938        let watchable = Watchable::new(None);
939
940        let mut watcher = watchable.watch();
941        let mut initialized = watcher.initialized();
942
943        let poll = poll_once(&mut initialized).await;
944        assert!(poll.is_none());
945
946        watchable.set(Some(1u8)).ok();
947
948        let poll = poll_once(&mut initialized).await;
949        assert_eq!(poll.unwrap(), 1u8);
950    }
951
952    #[tokio::test]
953    async fn test_initialize_already_init() {
954        let watchable = Watchable::new(Some(1u8));
955
956        let mut watcher = watchable.watch();
957        let mut initialized = watcher.initialized();
958
959        let poll = poll_once(&mut initialized).await;
960        assert_eq!(poll.unwrap(), 1u8);
961    }
962
963    #[test]
964    fn test_initialized_always_resolves() {
965        #[cfg(not(watcher_loom))]
966        use std::thread;
967
968        #[cfg(watcher_loom)]
969        use loom::thread;
970
971        let test_case = || {
972            let watchable = Watchable::<Option<u8>>::new(None);
973
974            let mut watch = watchable.watch();
975            let thread = thread::spawn(move || n0_future::future::block_on(watch.initialized()));
976
977            watchable.set(Some(42)).ok();
978
979            thread::yield_now();
980
981            let value: u8 = thread.join().unwrap();
982
983            assert_eq!(value, 42);
984        };
985
986        #[cfg(watcher_loom)]
987        loom::model(test_case);
988        #[cfg(not(watcher_loom))]
989        test_case();
990    }
991
992    #[tokio::test(flavor = "multi_thread")]
993    async fn test_update_cancel_safety() {
994        let watchable = Watchable::new(0);
995        let mut watch = watchable.watch();
996        const MAX: usize = 100_000;
997
998        let handle = tokio::spawn(async move {
999            let mut last_observed = 0;
1000
1001            while last_observed != MAX {
1002                tokio::select! {
1003                    val = watch.updated() => {
1004                        let Ok(val) = val else {
1005                            return;
1006                        };
1007
1008                        assert_ne!(val, last_observed, "never observe the same value twice, even with cancellation");
1009                        last_observed = val;
1010                    }
1011                    _ = tokio::time::sleep(Duration::from_micros(rng().random_range(0..10_000))) => {
1012                        // We cancel the other future and start over again
1013                        continue;
1014                    }
1015                }
1016            }
1017        });
1018
1019        for i in 1..=MAX {
1020            watchable.set(i).ok();
1021            if rng().random_bool(0.2) {
1022                tokio::task::yield_now().await;
1023            }
1024        }
1025
1026        tokio::time::timeout(Duration::from_secs(10), handle)
1027            .await
1028            .unwrap()
1029            .unwrap()
1030    }
1031
1032    #[tokio::test]
1033    async fn test_join_simple() {
1034        let a = Watchable::new(1u8);
1035        let b = Watchable::new(1u8);
1036
1037        let mut ab = Join::new([a.watch(), b.watch()].into_iter());
1038
1039        let stream = ab.clone().stream();
1040        let handle = tokio::task::spawn(async move { stream.take(5).collect::<Vec<_>>().await });
1041
1042        // get
1043        assert_eq!(ab.get(), vec![1, 1]);
1044        // set a
1045        a.set(2u8).unwrap();
1046        tokio::task::yield_now().await;
1047        assert_eq!(ab.get(), vec![2, 1]);
1048        // set b
1049        b.set(3u8).unwrap();
1050        tokio::task::yield_now().await;
1051        assert_eq!(ab.get(), vec![2, 3]);
1052
1053        a.set(3u8).unwrap();
1054        tokio::task::yield_now().await;
1055        b.set(4u8).unwrap();
1056        tokio::task::yield_now().await;
1057
1058        let values = tokio::time::timeout(Duration::from_secs(5), handle)
1059            .await
1060            .unwrap()
1061            .unwrap();
1062        assert_eq!(
1063            values,
1064            vec![vec![1, 1], vec![2, 1], vec![2, 3], vec![3, 3], vec![3, 4]]
1065        );
1066    }
1067
1068    #[tokio::test]
1069    async fn test_updated_then_disconnect_then_get() {
1070        let watchable = Watchable::new(10);
1071        let mut watcher = watchable.watch();
1072        assert_eq!(watchable.get(), 10);
1073        watchable.set(42).ok();
1074        assert_eq!(watcher.updated().await.unwrap(), 42);
1075        drop(watchable);
1076        assert_eq!(watcher.get(), 42);
1077    }
1078
1079    #[tokio::test(start_paused = true)]
1080    async fn test_update_wakeup_on_watchable_drop() {
1081        let watchable = Watchable::new(10);
1082        let mut watcher = watchable.watch();
1083
1084        let start = Instant::now();
1085        let (_, result) = tokio::time::timeout(Duration::from_secs(2), async move {
1086            tokio::join!(
1087                async move {
1088                    tokio::time::sleep(Duration::from_secs(1)).await;
1089                    drop(watchable);
1090                },
1091                async move { watcher.updated().await }
1092            )
1093        })
1094        .await
1095        .expect("watcher never updated");
1096        // We should've updated 1s after start, since that's when the watchable was dropped.
1097        // If this is 2s, then the watchable dropping didn't wake up the `Watcher::updated` future.
1098        assert_eq!(start.elapsed(), Duration::from_secs(1));
1099        assert!(result.is_err());
1100    }
1101
1102    #[tokio::test(start_paused = true)]
1103    async fn test_update_wakeup_always_a_change() {
1104        let watchable = Watchable::new(10);
1105        let mut watcher = watchable.watch();
1106
1107        let task = tokio::spawn(async move {
1108            let mut last_value = watcher.get();
1109            let mut values = Vec::new();
1110            while let Ok(value) = watcher.updated().await {
1111                values.push(value);
1112                if last_value == value {
1113                    return Err("value duplicated");
1114                }
1115                last_value = value;
1116            }
1117            Ok(values)
1118        });
1119
1120        // wait for the task to get set up and polled till pending for once
1121        tokio::time::sleep(Duration::from_millis(100)).await;
1122
1123        watchable.set(11).ok();
1124        tokio::time::sleep(Duration::from_millis(100)).await;
1125        let clone = watchable.clone();
1126        drop(clone); // this shouldn't trigger an update
1127        tokio::time::sleep(Duration::from_millis(100)).await;
1128        for i in 1..=10 {
1129            watchable.set(i + 11).ok();
1130            tokio::time::sleep(Duration::from_millis(100)).await;
1131        }
1132        drop(watchable);
1133
1134        let values = task
1135            .await
1136            .expect("task panicked")
1137            .expect("value duplicated");
1138        assert_eq!(values, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]);
1139    }
1140
1141    #[test]
1142    fn test_has_watchers() {
1143        let a = Watchable::new(1u8);
1144        assert!(!a.has_watchers());
1145        let b = a.clone();
1146        assert!(!a.has_watchers());
1147        assert!(!b.has_watchers());
1148
1149        let watcher = a.watch();
1150        assert!(a.has_watchers());
1151        assert!(b.has_watchers());
1152
1153        drop(watcher);
1154
1155        assert!(!a.has_watchers());
1156        assert!(!b.has_watchers());
1157    }
1158
1159    #[tokio::test]
1160    async fn test_three_watchers_basic() {
1161        let watchable = Watchable::new(1u8);
1162
1163        let mut w1 = watchable.watch();
1164        let mut w2 = watchable.watch();
1165        let mut w3 = watchable.watch();
1166
1167        // All see the initial value
1168
1169        assert_eq!(w1.get(), 1);
1170        assert_eq!(w2.get(), 1);
1171        assert_eq!(w3.get(), 1);
1172
1173        // Change  value
1174        watchable.set(42).unwrap();
1175
1176        // All watchers get notified
1177        assert_eq!(w1.updated().await.unwrap(), 42);
1178        assert_eq!(w2.updated().await.unwrap(), 42);
1179        assert_eq!(w3.updated().await.unwrap(), 42);
1180    }
1181
1182    #[tokio::test]
1183    async fn test_three_watchers_skip_intermediate() {
1184        let watchable = Watchable::new(0u8);
1185        let mut watcher = watchable.watch();
1186
1187        watchable.set(1).ok();
1188        watchable.set(2).ok();
1189        watchable.set(3).ok();
1190        watchable.set(4).ok();
1191
1192        let value = watcher.updated().await.unwrap();
1193
1194        assert_eq!(value, 4);
1195    }
1196
1197    #[tokio::test]
1198    async fn test_three_watchers_with_streams() {
1199        let watchable = Watchable::new(10u8);
1200
1201        let mut stream1 = watchable.watch().stream();
1202        let mut stream2 = watchable.watch().stream();
1203        let mut stream3 = watchable.watch().stream_updates_only();
1204
1205        assert_eq!(stream1.next().await.unwrap(), 10);
1206        assert_eq!(stream2.next().await.unwrap(), 10);
1207
1208        // Update the value
1209        watchable.set(20).ok();
1210
1211        // All streams see the update
1212        assert_eq!(stream1.next().await.unwrap(), 20);
1213        assert_eq!(stream2.next().await.unwrap(), 20);
1214        assert_eq!(stream3.next().await.unwrap(), 20);
1215    }
1216
1217    #[tokio::test]
1218    async fn test_three_watchers_independent() {
1219        let watchable = Watchable::new(0u8);
1220
1221        let mut fast_watcher = watchable.watch();
1222        let mut slow_watcher = watchable.watch();
1223        let mut lazy_watcher = watchable.watch();
1224
1225        watchable.set(1).ok();
1226        assert_eq!(fast_watcher.updated().await.unwrap(), 1);
1227
1228        // More updates happen
1229        watchable.set(2).ok();
1230        watchable.set(3).ok();
1231
1232        assert_eq!(slow_watcher.updated().await.unwrap(), 3);
1233        assert_eq!(lazy_watcher.get(), 3);
1234    }
1235
1236    #[tokio::test]
1237    async fn test_combine_three_watchers() {
1238        let a = Watchable::new(1u8);
1239        let b = Watchable::new(2u8);
1240        let c = Watchable::new(3u8);
1241
1242        let mut combined = Triple::new(a.watch(), b.watch(), c.watch());
1243
1244        assert_eq!(combined.get(), (1, 2, 3));
1245
1246        // Update one
1247        b.set(20).ok();
1248
1249        assert_eq!(combined.updated().await.unwrap(), (1, 20, 3));
1250
1251        c.set(30).ok();
1252        assert_eq!(combined.updated().await.unwrap(), (1, 20, 30));
1253    }
1254
1255    #[tokio::test]
1256    async fn test_three_watchers_disconnection() {
1257        let watchable = Watchable::new(5u8);
1258
1259        // All connected
1260        let mut w1 = watchable.watch();
1261        let mut w2 = watchable.watch();
1262        let mut w3 = watchable.watch();
1263
1264        // Drop the watchable
1265        drop(watchable);
1266
1267        // All become disconnected
1268        assert!(!w1.is_connected());
1269        assert!(!w2.is_connected());
1270        assert!(!w3.is_connected());
1271
1272        // Can still get last known value
1273        assert_eq!(w1.get(), 5);
1274        assert_eq!(w2.get(), 5);
1275
1276        // But updates fail
1277        assert!(w3.updated().await.is_err());
1278    }
1279
1280    #[tokio::test]
1281    async fn test_three_watchers_truly_concurrent() {
1282        use tokio::time::sleep;
1283        let watchable = Watchable::new(0u8);
1284
1285        // Spawn three READER tasks
1286        let mut reader_handles = vec![];
1287        for i in 0..3 {
1288            let mut watcher = watchable.watch();
1289            let handle = tokio::spawn(async move {
1290                let mut values = vec![];
1291                // Collect up to 5 updates
1292                for _ in 0..5 {
1293                    if let Ok(value) = watcher.updated().await {
1294                        values.push(value);
1295                    } else {
1296                        break;
1297                    }
1298                }
1299                (i, values)
1300            });
1301            reader_handles.push(handle);
1302        }
1303
1304        // Spawn three WRITER tasks that update concurrently
1305        let mut writer_handles = vec![];
1306        for i in 0..3 {
1307            let watchable_clone = watchable.clone();
1308            let handle = tokio::spawn(async move {
1309                for j in 0..5 {
1310                    let value = (i * 10) + j;
1311                    watchable_clone.set(value).ok();
1312                    sleep(Duration::from_millis(5)).await;
1313                }
1314            });
1315            writer_handles.push(handle);
1316        }
1317
1318        // Wait for writers to finish
1319        for handle in writer_handles {
1320            handle.await.unwrap();
1321        }
1322
1323        // Wait for readers and check results
1324        for handle in reader_handles {
1325            let (task_id, values) = handle.await.unwrap();
1326            println!("Reader {}: saw values {:?}", task_id, values);
1327            assert!(!values.is_empty());
1328        }
1329    }
1330
1331    #[tokio::test]
1332    async fn test_peek() {
1333        let a = Watchable::new(vec![1, 2, 3]);
1334        let mut wa = a.watch();
1335
1336        assert_eq!(wa.get(), vec![1, 2, 3]);
1337        assert_eq!(wa.peek(), &vec![1, 2, 3]);
1338
1339        let mut wa_map = wa.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
1340
1341        assert_eq!(wa_map.get(), vec![2, 4, 6]);
1342        assert_eq!(wa_map.peek(), &vec![2, 4, 6]);
1343
1344        let mut wb = a.watch();
1345
1346        assert_eq!(wb.get(), vec![1, 2, 3]);
1347        assert_eq!(wb.peek(), &vec![1, 2, 3]);
1348
1349        let mut wb_map = wb.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
1350
1351        assert_eq!(wb_map.get(), vec![2, 4, 6]);
1352        assert_eq!(wb_map.peek(), &vec![2, 4, 6]);
1353
1354        let mut w_join = Join::new([wa_map, wb_map].into_iter());
1355
1356        assert_eq!(w_join.get(), vec![vec![2, 4, 6], vec![2, 4, 6]]);
1357        assert_eq!(w_join.peek(), &vec![vec![2, 4, 6], vec![2, 4, 6]]);
1358    }
1359
1360    #[tokio::test]
1361    async fn test_update_updates_peek() {
1362        let value = Watchable::new(42);
1363        let mut watcher = value.watch();
1364
1365        assert_eq!(watcher.peek(), &42);
1366        assert!(!watcher.update());
1367
1368        value.set(50).ok();
1369
1370        assert_eq!(watcher.peek(), &42); // watcher wasn't updated yet
1371        assert!(watcher.update()); // Update returns true, because there was an update
1372        assert_eq!(watcher.peek(), &50);
1373        assert!(!watcher.update());
1374
1375        let mut watcher_map = watcher.clone().map(|v| v * 2);
1376
1377        assert_eq!(watcher_map.peek(), &100);
1378        assert!(!watcher_map.update());
1379
1380        value.set(10).ok();
1381
1382        assert_eq!(watcher_map.peek(), &100);
1383        assert!(watcher_map.update());
1384        assert_eq!(watcher_map.peek(), &20);
1385        assert!(!watcher_map.update());
1386
1387        let value2 = Watchable::new(0);
1388        let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
1389
1390        assert_eq!(watcher_join.peek(), &vec![10, 0]);
1391        assert!(!watcher_join.update());
1392
1393        value.set(0).ok();
1394        value2.set(1).ok();
1395
1396        assert_eq!(watcher_join.peek(), &vec![10, 0]);
1397        assert!(watcher_join.update());
1398        assert_eq!(watcher_join.peek(), &vec![0, 1]);
1399        assert!(!watcher_join.update());
1400    }
1401
1402    #[tokio::test]
1403    async fn test_get_updates_peek() {
1404        let value = Watchable::new(42);
1405        let mut watcher = value.watch();
1406
1407        assert_eq!(watcher.peek(), &42);
1408        assert!(!watcher.update());
1409
1410        value.set(50).ok();
1411
1412        assert_eq!(watcher.peek(), &42); // watcher wasn't updated yet
1413        assert_eq!(watcher.get(), 50); // Update returns true, because there was an update
1414        assert_eq!(watcher.peek(), &50);
1415        assert!(!watcher.update());
1416
1417        let mut watcher_map = watcher.clone().map(|v| v * 2);
1418
1419        assert_eq!(watcher_map.peek(), &100);
1420        assert!(!watcher_map.update());
1421
1422        value.set(10).ok();
1423
1424        assert_eq!(watcher_map.peek(), &100);
1425        assert_eq!(watcher_map.get(), 20);
1426        assert_eq!(watcher_map.peek(), &20);
1427        assert!(!watcher_map.update());
1428
1429        let value2 = Watchable::new(0);
1430        let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
1431
1432        assert_eq!(watcher_join.peek(), &vec![10, 0]);
1433        assert!(!watcher_join.update());
1434
1435        value.set(0).ok();
1436        value2.set(1).ok();
1437
1438        assert_eq!(watcher_join.peek(), &vec![10, 0]);
1439        assert_eq!(watcher_join.get(), vec![0, 1]);
1440        assert_eq!(watcher_join.peek(), &vec![0, 1]);
1441        assert!(!watcher_join.update());
1442    }
1443
1444    #[tokio::test]
1445    async fn test_ensure_wakers_bounded() {
1446        use tokio::time::{interval, Duration};
1447        let watchable = Watchable::new(0);
1448        let mut watcher = watchable.watch();
1449        let max_tick = 1000;
1450
1451        let handle = tokio::spawn(async move {
1452            let mut ticker = interval(Duration::from_nanos(1));
1453            let mut tick_no = 0;
1454            loop {
1455                tokio::select! {
1456                    _ = watcher.updated() => {}
1457                    _ = ticker.tick() => {
1458                        // We cancel the other future and start over again
1459                        tick_no += 1;
1460                        if tick_no > max_tick{
1461                            return
1462                        }
1463                    }
1464                }
1465                let num_wakers = watchable.shared.wakers.lock().unwrap().len();
1466                assert_eq!(num_wakers, 1);
1467            }
1468        });
1469
1470        tokio::time::timeout(Duration::from_secs(1), handle)
1471            .await
1472            .unwrap()
1473            .unwrap()
1474    }
1475}