just_watch/
lib.rs

1//! Initially a fork of the [`async_watch`](https://github.com/cynecx/async-watch) crate.
2//!
3//! A single-producer, multi-consumer channel that only retains the *last* sent
4//! value.
5//!
6//! Extracted from [Tokio's](https://github.com/tokio-rs/tokio/) `tokio::sync::watch`
7//! implementation, which was written by [Carl Lerche](https://github.com/carllerche).
8//!
9//! This channel is useful for watching for changes to a value from multiple
10//! points in the code base, for example, changes to configuration values.
11//!
12//! # Usage
13//!
14//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are
15//! the producer and sender halves of the channel. The channel is
16//! created with an initial value. The **latest** value stored in the channel is accessed with
17//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
18//! value to sent by the [`Sender`] half. Awaiting [`Receiver::recv()`] combines
19//! [`Receiver::changed()`] and [`Receiver::borrow()`] where the borrowed value
20//! is cloned and returned.
21//!
22//!
23//! # Examples
24//!
25//! ```
26//! # let executor = async_executor::LocalExecutor::new();
27//! # executor.run(async {
28//! let (tx, mut rx) = just_watch::channel("hello");
29//! let mut rx2 = rx.clone();
30//!
31//! // First variant
32//! executor.spawn(async move {
33//!     while let Ok(value) = rx.recv().await {
34//!         println!("received = {:?}", value);
35//!     }
36//! });
37//!
38//! // Second variant
39//! executor.spawn(async move {
40//!     while rx2.changed().await.is_ok() {
41//!         println!("received = {:?}", *rx2.borrow());
42//!     }
43//! });
44//!
45//! tx.send("world").unwrap();
46//! # });
47//! ```
48//!
49//! # Closing
50//!
51//! [`Sender::closed`] allows the producer to detect when all [`Receiver`]
52//! handles have been dropped. This indicates that there is no further interest
53//! in the values being produced and work can be stopped.
54//!
55//! # Thread safety
56//!
57//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
58//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
59//! handles may be moved to separate threads and also used concurrently.
60//!
61//! [`Sender`]: crate::Sender
62//! [`Receiver`]: crate::Receiver
63//! [`Receiver::recv`]: crate::Receiver::recv
64//! [`channel`]: crate::channel
65//! [`Sender::closed`]: crate::Sender::closed
66
67#![warn(missing_docs)]
68
69pub mod error;
70
71use event_listener::Event;
72
73use std::ops;
74use std::sync::atomic::AtomicUsize;
75use std::sync::atomic::Ordering::{Relaxed, SeqCst};
76use std::sync::{Arc, RwLock, RwLockReadGuard};
77
78/// The initial version starts at zero.
79const VERSION_0: usize = 0b00;
80
81/// The version counter shifted by one position to the left to leave space for the closed bit.
82const VERSION_1: usize = 0b10;
83
84/// The least significant bit signifies a closed channel.
85const CLOSED: usize = 0b01;
86
87/// Receives values from the associated [`Sender`](struct@Sender).
88///
89/// Instances are created by the [`channel`](fn@channel) function.
90#[derive(Debug)]
91pub struct Receiver<T> {
92    /// Pointer to the shared state
93    shared: Arc<Shared<T>>,
94
95    /// Last observed version.
96    version: usize,
97}
98
99/// Sends values to the associated [`Receiver`](struct@Receiver).
100///
101/// Instances are created by the [`channel`](fn@channel) function.
102#[derive(Debug)]
103pub struct Sender<T> {
104    shared: Arc<Shared<T>>,
105}
106
107/// Returns a reference to the inner value.
108///
109/// Outstanding borrows hold a read lock on the inner value. This means that
110/// long lived borrows could cause the produce half to block. It is recommended
111/// to keep the borrow as short lived as possible.
112#[derive(Debug)]
113pub struct Ref<'a, T> {
114    inner: RwLockReadGuard<'a, T>,
115}
116
117#[derive(Debug)]
118struct Shared<T> {
119    /// The most recent value
120    value: RwLock<T>,
121
122    /// The current version
123    ///
124    /// The lowest bit represents a "closed" state. The rest of the bits
125    /// represent the current version.
126    version: AtomicUsize,
127
128    /// Tracks the number of `Receiver` instances.
129    ref_count_rx: AtomicUsize,
130
131    /// Event when the value has changed or the `Sender` has been dropped.
132    event_value_changed: Event,
133
134    /// Event when all `Receiver`s have been dropped.
135    event_all_recv_dropped: Event,
136}
137
138/// Creates a new watch channel, returning the "send" and "receive" handles.
139///
140/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
141/// Only the last value sent is made available to the [`Receiver`] half. All
142/// intermediate values are dropped.
143///
144/// # Examples
145///
146/// ```
147/// # let executor = async_executor::LocalExecutor::new();
148/// # executor.run(async {
149/// let (tx, mut rx) = just_watch::channel("hello");
150///
151/// executor.spawn(async move {
152///     while let Ok(value) = rx.recv().await {
153///         println!("received = {:?}", value);
154///     }
155/// });
156///
157/// tx.send("world").unwrap();
158/// # });
159/// ```
160///
161/// [`Sender`]: struct@Sender
162/// [`Receiver`]: struct@Receiver
163pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
164    let shared = Arc::new(Shared {
165        value: RwLock::new(init),
166        version: AtomicUsize::new(VERSION_0),
167        ref_count_rx: AtomicUsize::new(1),
168        event_value_changed: Event::new(),
169        event_all_recv_dropped: Event::new(),
170    });
171
172    let tx = Sender {
173        shared: shared.clone(),
174    };
175
176    let rx = Receiver {
177        shared,
178        version: VERSION_0,
179    };
180
181    (tx, rx)
182}
183
184impl<T> Receiver<T> {
185    /// Returns a reference to the most recently sent value.
186    ///
187    /// Outstanding borrows hold a read lock. This means that long lived borrows
188    /// could cause the send half to block. It is recommended to keep the borrow
189    /// as short lived as possible.
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// let (_, rx) = just_watch::channel("hello");
195    /// assert_eq!(*rx.borrow(), "hello");
196    /// ```
197    pub fn borrow(&self) -> Ref<'_, T> {
198        let inner = self.shared.value.read().unwrap();
199        Ref { inner }
200    }
201
202    /// Wait for a change notification.
203    ///
204    /// Returns when a new value has been sent by the [`Sender`] since the last
205    /// time `changed()` was called. When the `Sender` half is dropped, `Err` is
206    /// returned.
207    ///
208    /// [`Sender`]: struct@Sender
209    ///
210    /// # Examples
211    ///
212    /// ```
213    /// # let executor = async_executor::LocalExecutor::new();
214    /// # executor.run(async {
215    /// let (tx, mut rx) = just_watch::channel("hello");
216    ///
217    /// let task = executor.spawn(async move {
218    ///     tx.send("goodbye").unwrap();
219    /// });
220    ///
221    /// assert!(rx.changed().await.is_ok());
222    /// assert_eq!(*rx.borrow(), "goodbye");
223    ///
224    /// // The `tx` handle has been dropped
225    /// assert!(rx.changed().await.is_err());
226    ///
227    /// task.await;
228    /// });
229    /// ```
230    pub async fn changed(&mut self) -> Result<(), error::RecvError> {
231        // Fast path: Check the state first.
232        if let Some(ret) = self.maybe_changed() {
233            return ret;
234        }
235
236        // In order to avoid a race condition, we first request a notification,
237        // **then** check the current value's version. If a new version exists,
238        // the notification request is dropped.
239        let listener = self.shared.event_value_changed.listen();
240
241        if let Some(ret) = self.maybe_changed() {
242            return ret;
243        }
244
245        listener.await;
246
247        self.maybe_changed()
248            .expect("[bug] failed to observe change after notificaton.")
249    }
250
251    fn maybe_changed(&mut self) -> Option<Result<(), error::RecvError>> {
252        // Load the version from the state
253        let state = self.shared.version.load(SeqCst);
254        let new_version = state & !CLOSED;
255
256        if self.version != new_version {
257            // Observe the new version and return
258            self.version = new_version;
259            return Some(Ok(()));
260        }
261
262        if CLOSED == state & CLOSED {
263            // All receivers have dropped.
264            return Some(Err(error::RecvError {}));
265        }
266
267        // No changes.
268        None
269    }
270}
271
272impl<T: Clone> Receiver<T> {
273    /// A convenience helper which combines calling [`Receiver::changed()`] and
274    /// [`Receiver::borrow()`] where the borrowed value is cloned and returned.
275    ///
276    /// Note: If this is the first time the function is called on a `Receiver`
277    /// instance, then the function **will wait** until a new value is sent into the channel.
278    ///
279    /// `None` is returned if the `Sender` half is dropped.
280    ///
281    /// # Examples
282    ///
283    /// ```
284    /// # let executor = async_executor::LocalExecutor::new();
285    /// # executor.run(async {
286    /// let (tx, mut rx) = just_watch::channel("hello");
287    ///
288    /// let task = executor.spawn(async move {
289    ///     tx.send("goodbye").unwrap();
290    /// });
291    ///
292    /// assert_eq!(*rx.borrow(), "hello");
293    ///
294    /// // Waits for the new task to spawn and send the value.
295    /// let v = rx.recv().await.unwrap();
296    /// assert_eq!(v, "goodbye");
297    ///
298    /// let v = rx.recv().await;
299    /// assert!(v.is_err());
300    ///
301    /// task.await;
302    /// # });
303    /// ```
304    pub async fn recv(&mut self) -> Result<T, error::RecvError> {
305        self.changed().await?;
306        Ok(self.borrow().clone())
307    }
308}
309
310impl<T> Clone for Receiver<T> {
311    fn clone(&self) -> Self {
312        self.shared.ref_count_rx.fetch_add(1, Relaxed);
313        Receiver {
314            shared: self.shared.clone(),
315            version: self.version,
316        }
317    }
318}
319
320impl<T> Drop for Receiver<T> {
321    fn drop(&mut self) {
322        if self.shared.ref_count_rx.fetch_sub(1, Relaxed) == 1 {
323            // Notify the single sender.
324            self.shared.event_all_recv_dropped.notify(usize::MAX);
325        }
326    }
327}
328
329impl<T> Sender<T> {
330    /// Creates a new `Receiver` connected to a `Sender`.
331    pub fn subscribe(&self) -> Receiver<T> {
332        self.shared.ref_count_rx.fetch_add(1, Relaxed);
333        let version = self.shared.version.load(SeqCst);
334        Receiver {
335            shared: self.shared.clone(),
336            version,
337        }
338    }
339
340    /// Sends a new value via the channel, notifying all receivers.
341    pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
342        if self.shared.ref_count_rx.load(Relaxed) == 0 {
343            // All watchers (`Receiver`s) have been dropped.
344            return Err(error::SendError { inner: value });
345        }
346
347        // Replace the value.
348        *self.shared.value.write().unwrap() = value;
349
350        // Update the version. 2 (`VERSION_1`) is used so that the CLOSED bit is not set.
351        self.shared.version.fetch_add(VERSION_1, SeqCst);
352
353        // Notify all watchers.
354        self.shared.event_value_changed.notify(usize::MAX);
355
356        Ok(())
357    }
358
359    /// Applies a function to the existing value.
360    ///
361    /// A function gets a mutable reference that can be used to change a stored value.
362    pub fn send_modify<F>(&self, func: F)
363    where
364        F: FnOnce(&mut T),
365    {
366        // Replace the value.
367        func(&mut *self.shared.value.write().unwrap());
368
369        // Update the version. 2 (`VERSION_1`) is used so that the CLOSED bit is not set.
370        self.shared.version.fetch_add(VERSION_1, SeqCst);
371
372        // Notify all watchers.
373        self.shared.event_value_changed.notify(usize::MAX);
374    }
375
376    /// Completes when all receivers have dropped.
377    ///
378    /// This allows the producer to get notified when interest in the produced
379    /// values is canceled and immediately stop doing work.
380    pub async fn closed(&self) {
381        // Fast path.
382        if self.shared.ref_count_rx.load(Relaxed) == 0 {
383            return;
384        }
385
386        // Listen for events now and check the reference count afterwards to avoid race condition.
387        let listener = self.shared.event_all_recv_dropped.listen();
388
389        if self.shared.ref_count_rx.load(Relaxed) == 0 {
390            return;
391        }
392
393        listener.await;
394    }
395}
396
397impl<T> Drop for Sender<T> {
398    fn drop(&mut self) {
399        self.shared.version.fetch_or(CLOSED, SeqCst);
400        self.shared.event_value_changed.notify(usize::MAX);
401    }
402}
403
404// ===== impl Ref =====
405
406impl<T> ops::Deref for Ref<'_, T> {
407    type Target = T;
408
409    fn deref(&self) -> &T {
410        self.inner.deref()
411    }
412}