async_watch2/
lib.rs

1//! A single-producer, multi-consumer channel that only retains the *last* sent
2//! value.
3//!
4//! Extracted from [Tokio's](https://github.com/tokio-rs/tokio/) `tokio::sync::watch`
5//! implementation, which was initially written by [Carl Lerche](https://github.com/carllerche).
6//!
7//! This channel is useful for watching for changes to a value from multiple
8//! points in the code base, for example, changes to configuration values.
9//!
10//! # Usage
11//!
12//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are
13//! the producer and sender halves of the channel. The channel is
14//! created with an initial value. [`Receiver::recv`] will always
15//! be ready upon creation and will yield either this initial value or
16//! the latest value that has been sent by `Sender`.
17//!
18//! Calls to [`Receiver::recv`] will always yield the latest value.
19//!
20//! # Examples
21//!
22//! ```
23//! # let executor = async_executor::LocalExecutor::new();
24//! # executor.run(async {
25//! let (tx, mut rx) = async_watch2::channel("hello");
26//!
27//! executor.spawn(async move {
28//!     while let Some(value) = rx.recv().await {
29//!         println!("received = {:?}", value);
30//!     }
31//! });
32//!
33//! tx.broadcast("world").unwrap();
34//! # });
35//! ```
36//!
37//! # Closing
38//!
39//! [`Sender::closed`] allows the producer to detect when all [`Receiver`]
40//! handles have been dropped. This indicates that there is no further interest
41//! in the values being produced and work can be stopped.
42//!
43//! # Thread safety
44//!
45//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
46//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
47//! handles may be moved to separate threads and also used concurrently.
48//!
49//! [`Sender`]: crate::Sender
50//! [`Receiver`]: crate::Receiver
51//! [`Receiver::recv`]: crate::Receiver::recv
52//! [`channel`]: crate::channel
53//! [`Sender::closed`]: crate::Sender::closed
54
55pub mod error;
56
57mod poll_fn;
58use poll_fn::poll_fn;
59
60use atomic_waker::AtomicWaker;
61use fnv::FnvHashSet;
62
63use std::ops;
64use std::sync::atomic::AtomicUsize;
65use std::sync::atomic::Ordering::{Relaxed, SeqCst};
66use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
67use std::task::Poll::{Pending, Ready};
68use std::task::{Context, Poll};
69
70/// The initial version starts at zero.
71const VERSION_0: usize = 0b00;
72
73/// The version counter shifted by one position to the left to leave space for the closed bit.
74const VERSION_1: usize = 0b10;
75
76/// The least significant bit signifies a closed channel.
77const CLOSED: usize = 0b01;
78
79/// Receives values from the associated [`Sender`](struct@Sender).
80///
81/// Instances are created by the [`channel`](fn@channel) function.
82#[derive(Debug)]
83pub struct Receiver<T> {
84    /// Pointer to the shared state
85    shared: Arc<Shared<T>>,
86
87    /// Pointer to the watcher's internal state
88    inner: Watcher,
89}
90
91/// Sends values to the associated [`Receiver`](struct@Receiver).
92///
93/// Instances are created by the [`channel`](fn@channel) function.
94#[derive(Debug)]
95pub struct Sender<T> {
96    shared: Weak<Shared<T>>,
97}
98
99/// Returns a reference to the inner value
100///
101/// Outstanding borrows hold a read lock on the inner value. This means that
102/// long lived borrows could cause the produce half to block. It is recommended
103/// to keep the borrow as short lived as possible.
104#[derive(Debug)]
105pub struct Ref<'a, T> {
106    inner: RwLockReadGuard<'a, T>,
107}
108
109#[derive(Debug)]
110struct Shared<T> {
111    /// The most recent value
112    value: RwLock<T>,
113
114    /// The current version
115    ///
116    /// The lowest bit represents a "closed" state. The rest of the bits
117    /// represent the current version.
118    version: AtomicUsize,
119
120    /// All watchers
121    watchers: Mutex<Watchers>,
122
123    /// Task to notify when all watchers drop
124    cancel: AtomicWaker,
125}
126
127type Watchers = FnvHashSet<Watcher>;
128
129/// The watcher's ID is based on the Arc's pointer.
130#[derive(Clone, Debug)]
131struct Watcher(Arc<WatchInner>);
132
133#[derive(Debug)]
134struct WatchInner {
135    /// Last observed version
136    version: AtomicUsize,
137    waker: AtomicWaker,
138}
139
140/// Creates a new watch channel, returning the "send" and "receive" handles.
141///
142/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
143/// Only the last value sent is made available to the [`Receiver`] half. All
144/// intermediate values are dropped.
145///
146/// # Examples
147///
148/// ```
149/// # let executor = async_executor::LocalExecutor::new();
150/// # executor.run(async {
151/// let (tx, mut rx) = async_watch2::channel("hello");
152///
153/// executor.spawn(async move {
154///     while let Some(value) = rx.recv().await {
155///         println!("received = {:?}", value);
156///     }
157/// });
158///
159/// tx.broadcast("world").unwrap();
160/// # });
161/// ```
162///
163/// [`Sender`]: struct@Sender
164/// [`Receiver`]: struct@Receiver
165pub fn channel<T: Clone>(init: T) -> (Sender<T>, Receiver<T>) {
166    // We don't start knowing VERSION_1
167    let inner = Watcher::new_version(VERSION_0);
168
169    // Insert the watcher
170    let mut watchers = Watchers::with_capacity_and_hasher(0, Default::default());
171    watchers.insert(inner.clone());
172
173    let shared = Arc::new(Shared {
174        value: RwLock::new(init),
175        version: AtomicUsize::new(VERSION_1),
176        watchers: Mutex::new(watchers),
177        cancel: AtomicWaker::new(),
178    });
179
180    let tx = Sender {
181        shared: Arc::downgrade(&shared),
182    };
183
184    let rx = Receiver { shared, inner };
185
186    (tx, rx)
187}
188
189impl<T> Receiver<T> {
190    /// Returns a reference to the most recently sent value
191    ///
192    /// Outstanding borrows hold a read lock. This means that long lived borrows
193    /// could cause the send half to block. It is recommended to keep the borrow
194    /// as short lived as possible.
195    ///
196    /// # Examples
197    ///
198    /// ```
199    /// let (_, rx) = async_watch2::channel("hello");
200    /// assert_eq!(*rx.borrow(), "hello");
201    /// ```
202    pub fn borrow(&self) -> Ref<'_, T> {
203        let inner = self.shared.value.read().unwrap();
204        Ref { inner }
205    }
206
207    // TODO: document
208    #[doc(hidden)]
209    pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
210        // Make sure the task is up to date
211        self.inner.waker.register(cx.waker());
212
213        let state = self.shared.version.load(SeqCst);
214        let version = state & !CLOSED;
215
216        if self.inner.version.swap(version, Relaxed) != version {
217            let inner = self.shared.value.read().unwrap();
218
219            return Ready(Some(Ref { inner }));
220        }
221
222        if CLOSED == state & CLOSED {
223            // The `Store` handle has been dropped.
224            return Ready(None);
225        }
226
227        Pending
228    }
229}
230
231impl<T: Clone> Receiver<T> {
232    /// Attempts to clone the latest value sent via the channel.
233    ///
234    /// If this is the first time the function is called on a `Receiver`
235    /// instance, then the function completes immediately with the **current**
236    /// value held by the channel. On the next call, the function waits until
237    /// a new value is sent in the channel.
238    ///
239    /// `None` is returned if the `Sender` half is dropped.
240    ///
241    /// # Examples
242    ///
243    /// ```
244    /// # let executor = async_executor::LocalExecutor::new();
245    /// # executor.run(async {
246    /// let (tx, mut rx) = async_watch2::channel("hello");
247    ///
248    /// let v = rx.recv().await.unwrap();
249    /// assert_eq!(v, "hello");
250    ///
251    /// let task = executor.spawn(async move {
252    ///     tx.broadcast("goodbye").unwrap();
253    /// });
254    ///
255    /// // Waits for the new task to spawn and send the value.
256    /// let v = rx.recv().await.unwrap();
257    /// assert_eq!(v, "goodbye");
258    ///
259    /// let v = rx.recv().await;
260    /// assert!(v.is_none());
261    ///
262    /// task.await;
263    /// # });
264    /// ```
265    pub async fn recv(&mut self) -> Option<T> {
266        poll_fn(|cx| {
267            let v_ref = match self.poll_recv_ref(cx) {
268                Ready(v) => v,
269                Pending => return Pending,
270            };
271            Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
272        })
273        .await
274    }
275}
276
277impl<T> Clone for Receiver<T> {
278    fn clone(&self) -> Self {
279        let ver = self.inner.version.load(Relaxed);
280        let inner = Watcher::new_version(ver);
281        let shared = self.shared.clone();
282
283        shared.watchers.lock().unwrap().insert(inner.clone());
284
285        Receiver { shared, inner }
286    }
287}
288
289impl<T> Drop for Receiver<T> {
290    fn drop(&mut self) {
291        self.shared.watchers.lock().unwrap().remove(&self.inner);
292    }
293}
294
295impl<T> Sender<T> {
296    /// Broadcasts a new value via the channel, notifying all receivers.
297    pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> {
298        let shared = match self.shared.upgrade() {
299            Some(shared) => shared,
300            // All `Watch` handles have been canceled
301            None => return Err(error::SendError { inner: value }),
302        };
303
304        // Replace the value
305        {
306            let mut lock = shared.value.write().unwrap();
307            *lock = value;
308        }
309
310        // Update the version. 2 (`VERSION_1`) is used so that the CLOSED bit is not set.
311        shared.version.fetch_add(VERSION_1, SeqCst);
312
313        // Notify all watchers
314        notify_all(&*shared);
315
316        Ok(())
317    }
318
319    /// Completes when all receivers have dropped.
320    ///
321    /// This allows the producer to get notified when interest in the produced
322    /// values is canceled and immediately stop doing work.
323    pub async fn closed(&mut self) {
324        poll_fn(|cx| self.poll_close(cx)).await
325    }
326
327    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
328        match self.shared.upgrade() {
329            Some(shared) => {
330                shared.cancel.register(cx.waker());
331                Pending
332            }
333            None => Ready(()),
334        }
335    }
336}
337
338/// Notifies all watchers of a change
339fn notify_all<T>(shared: &Shared<T>) {
340    let watchers = shared.watchers.lock().unwrap();
341
342    for watcher in watchers.iter() {
343        // Notify the task
344        watcher.waker.wake();
345    }
346}
347
348impl<T> Drop for Sender<T> {
349    fn drop(&mut self) {
350        if let Some(shared) = self.shared.upgrade() {
351            shared.version.fetch_or(CLOSED, SeqCst);
352            notify_all(&*shared);
353        }
354    }
355}
356
357// ===== impl Ref =====
358
359impl<T> ops::Deref for Ref<'_, T> {
360    type Target = T;
361
362    fn deref(&self) -> &T {
363        self.inner.deref()
364    }
365}
366
367// ===== impl Shared =====
368
369impl<T> Drop for Shared<T> {
370    fn drop(&mut self) {
371        self.cancel.wake();
372    }
373}
374
375// ===== impl Watcher =====
376
377impl Watcher {
378    fn new_version(version: usize) -> Self {
379        Watcher(Arc::new(WatchInner {
380            version: AtomicUsize::new(version),
381            waker: AtomicWaker::new(),
382        }))
383    }
384}
385
386impl std::cmp::PartialEq for Watcher {
387    fn eq(&self, other: &Watcher) -> bool {
388        Arc::ptr_eq(&self.0, &other.0)
389    }
390}
391
392impl std::cmp::Eq for Watcher {}
393
394impl std::hash::Hash for Watcher {
395    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
396        (&*self.0 as *const WatchInner).hash(state)
397    }
398}
399
400impl std::ops::Deref for Watcher {
401    type Target = WatchInner;
402
403    fn deref(&self) -> &Self::Target {
404        &self.0
405    }
406}