async_watch/lib.rs
1//! A single-producer, multi-consumer channel that only retains the *last* sent
2//! value.
3//!
4//! Extracted from [Tokio's](https://github.com/tokio-rs/tokio/) `tokio::sync::watch`
5//! implementation, which was written by [Carl Lerche](https://github.com/carllerche).
6//!
7//! This channel is useful for watching for changes to a value from multiple
8//! points in the code base, for example, changes to configuration values.
9//!
10//! # Usage
11//!
12//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are
13//! the producer and sender halves of the channel. The channel is
14//! created with an initial value. The **latest** value stored in the channel is accessed with
15//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
16//! value to sent by the [`Sender`] half. Awaiting [`Receiver::recv()`] combines
17//! [`Receiver::changed()`] and [`Receiver::borrow()`] where the borrowed value
18//! is cloned and returned.
19//!
20//!
21//! # Examples
22//!
23//! ```
24//! # let executor = async_executor::LocalExecutor::new();
25//! # executor.run(async {
26//! let (tx, mut rx) = async_watch::channel("hello");
27//! let mut rx2 = rx.clone();
28//!
29//! // First variant
30//! executor.spawn(async move {
31//!     while let Ok(value) = rx.recv().await {
32//!         println!("received = {:?}", value);
33//!     }
34//! });
35//!
36//! // Second variant
37//! executor.spawn(async move {
38//!     while rx2.changed().await.is_ok() {
39//!         println!("received = {:?}", *rx2.borrow());
40//!     }
41//! });
42//!
43//! tx.send("world").unwrap();
44//! # });
45//! ```
46//!
47//! # Closing
48//!
49//! [`Sender::closed`] allows the producer to detect when all [`Receiver`]
50//! handles have been dropped. This indicates that there is no further interest
51//! in the values being produced and work can be stopped.
52//!
53//! # Thread safety
54//!
55//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
56//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
57//! handles may be moved to separate threads and also used concurrently.
58//!
59//! [`Sender`]: crate::Sender
60//! [`Receiver`]: crate::Receiver
61//! [`Receiver::recv`]: crate::Receiver::recv
62//! [`channel`]: crate::channel
63//! [`Sender::closed`]: crate::Sender::closed
64
65pub mod error;
66
67use event_listener::Event;
68
69use std::ops;
70use std::sync::atomic::AtomicUsize;
71use std::sync::atomic::Ordering::{Relaxed, SeqCst};
72use std::sync::{Arc, RwLock, RwLockReadGuard};
73
74/// The initial version starts at zero.
75const VERSION_0: usize = 0b00;
76
77/// The version counter shifted by one position to the left to leave space for the closed bit.
78const VERSION_1: usize = 0b10;
79
80/// The least significant bit signifies a closed channel.
81const CLOSED: usize = 0b01;
82
83/// Receives values from the associated [`Sender`](struct@Sender).
84///
85/// Instances are created by the [`channel`](fn@channel) function.
86#[derive(Debug)]
87pub struct Receiver<T> {
88    /// Pointer to the shared state
89    shared: Arc<Shared<T>>,
90
91    /// Last observed version.
92    version: usize,
93}
94
95/// Sends values to the associated [`Receiver`](struct@Receiver).
96///
97/// Instances are created by the [`channel`](fn@channel) function.
98#[derive(Debug)]
99pub struct Sender<T> {
100    shared: Arc<Shared<T>>,
101}
102
103/// Returns a reference to the inner value.
104///
105/// Outstanding borrows hold a read lock on the inner value. This means that
106/// long lived borrows could cause the produce half to block. It is recommended
107/// to keep the borrow as short lived as possible.
108#[derive(Debug)]
109pub struct Ref<'a, T> {
110    inner: RwLockReadGuard<'a, T>,
111}
112
113#[derive(Debug)]
114struct Shared<T> {
115    /// The most recent value
116    value: RwLock<T>,
117
118    /// The current version
119    ///
120    /// The lowest bit represents a "closed" state. The rest of the bits
121    /// represent the current version.
122    version: AtomicUsize,
123
124    /// Tracks the number of `Receiver` instances.
125    ref_count_rx: AtomicUsize,
126
127    /// Event when the value has changed or the `Sender` has been dropped.
128    event_value_changed: Event,
129
130    /// Event when all `Receiver`s have been dropped.
131    event_all_recv_dropped: Event,
132}
133
134/// Creates a new watch channel, returning the "send" and "receive" handles.
135///
136/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
137/// Only the last value sent is made available to the [`Receiver`] half. All
138/// intermediate values are dropped.
139///
140/// # Examples
141///
142/// ```
143/// # let executor = async_executor::LocalExecutor::new();
144/// # executor.run(async {
145/// let (tx, mut rx) = async_watch::channel("hello");
146///
147/// executor.spawn(async move {
148///     while let Ok(value) = rx.recv().await {
149///         println!("received = {:?}", value);
150///     }
151/// });
152///
153/// tx.send("world").unwrap();
154/// # });
155/// ```
156///
157/// [`Sender`]: struct@Sender
158/// [`Receiver`]: struct@Receiver
159pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
160    let shared = Arc::new(Shared {
161        value: RwLock::new(init),
162        version: AtomicUsize::new(VERSION_0),
163        ref_count_rx: AtomicUsize::new(1),
164        event_value_changed: Event::new(),
165        event_all_recv_dropped: Event::new(),
166    });
167
168    let tx = Sender {
169        shared: shared.clone(),
170    };
171
172    let rx = Receiver {
173        shared,
174        version: VERSION_0,
175    };
176
177    (tx, rx)
178}
179
180impl<T> Receiver<T> {
181    /// Returns a reference to the most recently sent value.
182    ///
183    /// Outstanding borrows hold a read lock. This means that long lived borrows
184    /// could cause the send half to block. It is recommended to keep the borrow
185    /// as short lived as possible.
186    ///
187    /// # Examples
188    ///
189    /// ```
190    /// let (_, rx) = async_watch::channel("hello");
191    /// assert_eq!(*rx.borrow(), "hello");
192    /// ```
193    pub fn borrow(&self) -> Ref<'_, T> {
194        let inner = self.shared.value.read().unwrap();
195        Ref { inner }
196    }
197
198    /// Wait for a change notification.
199    ///
200    /// Returns when a new value has been sent by the [`Sender`] since the last
201    /// time `changed()` was called. When the `Sender` half is dropped, `Err` is
202    /// returned.
203    ///
204    /// [`Sender`]: struct@Sender
205    ///
206    /// # Examples
207    ///
208    /// ```
209    /// # let executor = async_executor::LocalExecutor::new();
210    /// # executor.run(async {
211    /// let (tx, mut rx) = async_watch::channel("hello");
212    ///
213    /// let task = executor.spawn(async move {
214    ///     tx.send("goodbye").unwrap();
215    /// });
216    ///
217    /// assert!(rx.changed().await.is_ok());
218    /// assert_eq!(*rx.borrow(), "goodbye");
219    ///
220    /// // The `tx` handle has been dropped
221    /// assert!(rx.changed().await.is_err());
222    ///
223    /// task.await;
224    /// });
225    /// ```
226    pub async fn changed(&mut self) -> Result<(), error::RecvError> {
227        // Fast path: Check the state first.
228        if let Some(ret) = self.maybe_changed() {
229            return ret;
230        }
231
232        // In order to avoid a race condition, we first request a notification,
233        // **then** check the current value's version. If a new version exists,
234        // the notification request is dropped.
235        let listener = self.shared.event_value_changed.listen();
236
237        if let Some(ret) = self.maybe_changed() {
238            return ret;
239        }
240
241        listener.await;
242
243        self.maybe_changed()
244            .expect("[bug] failed to observe change after notificaton.")
245    }
246
247    fn maybe_changed(&mut self) -> Option<Result<(), error::RecvError>> {
248        // Load the version from the state
249        let state = self.shared.version.load(SeqCst);
250        let new_version = state & !CLOSED;
251
252        if self.version != new_version {
253            // Observe the new version and return
254            self.version = new_version;
255            return Some(Ok(()));
256        }
257
258        if CLOSED == state & CLOSED {
259            // All receivers have dropped.
260            return Some(Err(error::RecvError {}));
261        }
262
263        // No changes.
264        None
265    }
266}
267
268impl<T: Clone> Receiver<T> {
269    /// A convenience helper which combines calling [`Receiver::changed()`] and
270    /// [`Receiver::borrow()`] where the borrowed value is cloned and returned.
271    ///
272    /// Note: If this is the first time the function is called on a `Receiver`
273    /// instance, then the function **will wait** until a new value is sent into the channel.
274    ///
275    /// `None` is returned if the `Sender` half is dropped.
276    ///
277    /// # Examples
278    ///
279    /// ```
280    /// # let executor = async_executor::LocalExecutor::new();
281    /// # executor.run(async {
282    /// let (tx, mut rx) = async_watch::channel("hello");
283    ///
284    /// let task = executor.spawn(async move {
285    ///     tx.send("goodbye").unwrap();
286    /// });
287    ///
288    /// assert_eq!(*rx.borrow(), "hello");
289    ///
290    /// // Waits for the new task to spawn and send the value.
291    /// let v = rx.recv().await.unwrap();
292    /// assert_eq!(v, "goodbye");
293    ///
294    /// let v = rx.recv().await;
295    /// assert!(v.is_err());
296    ///
297    /// task.await;
298    /// # });
299    /// ```
300    pub async fn recv(&mut self) -> Result<T, error::RecvError> {
301        self.changed().await?;
302        Ok(self.borrow().clone())
303    }
304}
305
306impl<T> Clone for Receiver<T> {
307    fn clone(&self) -> Self {
308        self.shared.ref_count_rx.fetch_add(1, Relaxed);
309        Receiver {
310            shared: self.shared.clone(),
311            version: self.version,
312        }
313    }
314}
315
316impl<T> Drop for Receiver<T> {
317    fn drop(&mut self) {
318        if self.shared.ref_count_rx.fetch_sub(1, Relaxed) == 1 {
319            // Notify the single sender.
320            self.shared.event_all_recv_dropped.notify(usize::MAX);
321        }
322    }
323}
324
325impl<T> Sender<T> {
326    /// Sends a new value via the channel, notifying all receivers.
327    pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
328        if self.shared.ref_count_rx.load(Relaxed) == 0 {
329            // All watchers (`Receiver`s) have been dropped.
330            return Err(error::SendError { inner: value });
331        }
332
333        // Replace the value.
334        *self.shared.value.write().unwrap() = value;
335
336        // Update the version. 2 (`VERSION_1`) is used so that the CLOSED bit is not set.
337        self.shared.version.fetch_add(VERSION_1, SeqCst);
338
339        // Notify all watchers.
340        self.shared.event_value_changed.notify(usize::MAX);
341
342        Ok(())
343    }
344
345    /// Completes when all receivers have dropped.
346    ///
347    /// This allows the producer to get notified when interest in the produced
348    /// values is canceled and immediately stop doing work.
349    pub async fn closed(&self) {
350        // Fast path.
351        if self.shared.ref_count_rx.load(Relaxed) == 0 {
352            return;
353        }
354
355        // Listen for events now and check the reference count afterwards to avoid race condition.
356        let listener = self.shared.event_all_recv_dropped.listen();
357
358        if self.shared.ref_count_rx.load(Relaxed) == 0 {
359            return;
360        }
361
362        listener.await;
363        debug_assert_eq!(self.shared.ref_count_rx.load(Relaxed), 0);
364    }
365}
366
367impl<T> Drop for Sender<T> {
368    fn drop(&mut self) {
369        self.shared.version.fetch_or(CLOSED, SeqCst);
370        self.shared.event_value_changed.notify(usize::MAX);
371    }
372}
373
374// ===== impl Ref =====
375
376impl<T> ops::Deref for Ref<'_, T> {
377    type Target = T;
378
379    fn deref(&self) -> &T {
380        self.inner.deref()
381    }
382}