async_watch/lib.rs
1//! A single-producer, multi-consumer channel that only retains the *last* sent
2//! value.
3//!
4//! Extracted from [Tokio's](https://github.com/tokio-rs/tokio/) `tokio::sync::watch`
5//! implementation, which was written by [Carl Lerche](https://github.com/carllerche).
6//!
7//! This channel is useful for watching for changes to a value from multiple
8//! points in the code base, for example, changes to configuration values.
9//!
10//! # Usage
11//!
12//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are
13//! the producer and sender halves of the channel. The channel is
14//! created with an initial value. The **latest** value stored in the channel is accessed with
15//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
16//! value to sent by the [`Sender`] half. Awaiting [`Receiver::recv()`] combines
17//! [`Receiver::changed()`] and [`Receiver::borrow()`] where the borrowed value
18//! is cloned and returned.
19//!
20//!
21//! # Examples
22//!
23//! ```
24//! # let executor = async_executor::LocalExecutor::new();
25//! # executor.run(async {
26//! let (tx, mut rx) = async_watch::channel("hello");
27//! let mut rx2 = rx.clone();
28//!
29//! // First variant
30//! executor.spawn(async move {
31//! while let Ok(value) = rx.recv().await {
32//! println!("received = {:?}", value);
33//! }
34//! });
35//!
36//! // Second variant
37//! executor.spawn(async move {
38//! while rx2.changed().await.is_ok() {
39//! println!("received = {:?}", *rx2.borrow());
40//! }
41//! });
42//!
43//! tx.send("world").unwrap();
44//! # });
45//! ```
46//!
47//! # Closing
48//!
49//! [`Sender::closed`] allows the producer to detect when all [`Receiver`]
50//! handles have been dropped. This indicates that there is no further interest
51//! in the values being produced and work can be stopped.
52//!
53//! # Thread safety
54//!
55//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
56//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
57//! handles may be moved to separate threads and also used concurrently.
58//!
59//! [`Sender`]: crate::Sender
60//! [`Receiver`]: crate::Receiver
61//! [`Receiver::recv`]: crate::Receiver::recv
62//! [`channel`]: crate::channel
63//! [`Sender::closed`]: crate::Sender::closed
64
65pub mod error;
66
67use event_listener::Event;
68
69use std::ops;
70use std::sync::atomic::AtomicUsize;
71use std::sync::atomic::Ordering::{Relaxed, SeqCst};
72use std::sync::{Arc, RwLock, RwLockReadGuard};
73
74/// The initial version starts at zero.
75const VERSION_0: usize = 0b00;
76
77/// The version counter shifted by one position to the left to leave space for the closed bit.
78const VERSION_1: usize = 0b10;
79
80/// The least significant bit signifies a closed channel.
81const CLOSED: usize = 0b01;
82
83/// Receives values from the associated [`Sender`](struct@Sender).
84///
85/// Instances are created by the [`channel`](fn@channel) function.
86#[derive(Debug)]
87pub struct Receiver<T> {
88 /// Pointer to the shared state
89 shared: Arc<Shared<T>>,
90
91 /// Last observed version.
92 version: usize,
93}
94
95/// Sends values to the associated [`Receiver`](struct@Receiver).
96///
97/// Instances are created by the [`channel`](fn@channel) function.
98#[derive(Debug)]
99pub struct Sender<T> {
100 shared: Arc<Shared<T>>,
101}
102
103/// Returns a reference to the inner value.
104///
105/// Outstanding borrows hold a read lock on the inner value. This means that
106/// long lived borrows could cause the produce half to block. It is recommended
107/// to keep the borrow as short lived as possible.
108#[derive(Debug)]
109pub struct Ref<'a, T> {
110 inner: RwLockReadGuard<'a, T>,
111}
112
113#[derive(Debug)]
114struct Shared<T> {
115 /// The most recent value
116 value: RwLock<T>,
117
118 /// The current version
119 ///
120 /// The lowest bit represents a "closed" state. The rest of the bits
121 /// represent the current version.
122 version: AtomicUsize,
123
124 /// Tracks the number of `Receiver` instances.
125 ref_count_rx: AtomicUsize,
126
127 /// Event when the value has changed or the `Sender` has been dropped.
128 event_value_changed: Event,
129
130 /// Event when all `Receiver`s have been dropped.
131 event_all_recv_dropped: Event,
132}
133
134/// Creates a new watch channel, returning the "send" and "receive" handles.
135///
136/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
137/// Only the last value sent is made available to the [`Receiver`] half. All
138/// intermediate values are dropped.
139///
140/// # Examples
141///
142/// ```
143/// # let executor = async_executor::LocalExecutor::new();
144/// # executor.run(async {
145/// let (tx, mut rx) = async_watch::channel("hello");
146///
147/// executor.spawn(async move {
148/// while let Ok(value) = rx.recv().await {
149/// println!("received = {:?}", value);
150/// }
151/// });
152///
153/// tx.send("world").unwrap();
154/// # });
155/// ```
156///
157/// [`Sender`]: struct@Sender
158/// [`Receiver`]: struct@Receiver
159pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
160 let shared = Arc::new(Shared {
161 value: RwLock::new(init),
162 version: AtomicUsize::new(VERSION_0),
163 ref_count_rx: AtomicUsize::new(1),
164 event_value_changed: Event::new(),
165 event_all_recv_dropped: Event::new(),
166 });
167
168 let tx = Sender {
169 shared: shared.clone(),
170 };
171
172 let rx = Receiver {
173 shared,
174 version: VERSION_0,
175 };
176
177 (tx, rx)
178}
179
180impl<T> Receiver<T> {
181 /// Returns a reference to the most recently sent value.
182 ///
183 /// Outstanding borrows hold a read lock. This means that long lived borrows
184 /// could cause the send half to block. It is recommended to keep the borrow
185 /// as short lived as possible.
186 ///
187 /// # Examples
188 ///
189 /// ```
190 /// let (_, rx) = async_watch::channel("hello");
191 /// assert_eq!(*rx.borrow(), "hello");
192 /// ```
193 pub fn borrow(&self) -> Ref<'_, T> {
194 let inner = self.shared.value.read().unwrap();
195 Ref { inner }
196 }
197
198 /// Wait for a change notification.
199 ///
200 /// Returns when a new value has been sent by the [`Sender`] since the last
201 /// time `changed()` was called. When the `Sender` half is dropped, `Err` is
202 /// returned.
203 ///
204 /// [`Sender`]: struct@Sender
205 ///
206 /// # Examples
207 ///
208 /// ```
209 /// # let executor = async_executor::LocalExecutor::new();
210 /// # executor.run(async {
211 /// let (tx, mut rx) = async_watch::channel("hello");
212 ///
213 /// let task = executor.spawn(async move {
214 /// tx.send("goodbye").unwrap();
215 /// });
216 ///
217 /// assert!(rx.changed().await.is_ok());
218 /// assert_eq!(*rx.borrow(), "goodbye");
219 ///
220 /// // The `tx` handle has been dropped
221 /// assert!(rx.changed().await.is_err());
222 ///
223 /// task.await;
224 /// });
225 /// ```
226 pub async fn changed(&mut self) -> Result<(), error::RecvError> {
227 // Fast path: Check the state first.
228 if let Some(ret) = self.maybe_changed() {
229 return ret;
230 }
231
232 // In order to avoid a race condition, we first request a notification,
233 // **then** check the current value's version. If a new version exists,
234 // the notification request is dropped.
235 let listener = self.shared.event_value_changed.listen();
236
237 if let Some(ret) = self.maybe_changed() {
238 return ret;
239 }
240
241 listener.await;
242
243 self.maybe_changed()
244 .expect("[bug] failed to observe change after notificaton.")
245 }
246
247 fn maybe_changed(&mut self) -> Option<Result<(), error::RecvError>> {
248 // Load the version from the state
249 let state = self.shared.version.load(SeqCst);
250 let new_version = state & !CLOSED;
251
252 if self.version != new_version {
253 // Observe the new version and return
254 self.version = new_version;
255 return Some(Ok(()));
256 }
257
258 if CLOSED == state & CLOSED {
259 // All receivers have dropped.
260 return Some(Err(error::RecvError {}));
261 }
262
263 // No changes.
264 None
265 }
266}
267
268impl<T: Clone> Receiver<T> {
269 /// A convenience helper which combines calling [`Receiver::changed()`] and
270 /// [`Receiver::borrow()`] where the borrowed value is cloned and returned.
271 ///
272 /// Note: If this is the first time the function is called on a `Receiver`
273 /// instance, then the function **will wait** until a new value is sent into the channel.
274 ///
275 /// `None` is returned if the `Sender` half is dropped.
276 ///
277 /// # Examples
278 ///
279 /// ```
280 /// # let executor = async_executor::LocalExecutor::new();
281 /// # executor.run(async {
282 /// let (tx, mut rx) = async_watch::channel("hello");
283 ///
284 /// let task = executor.spawn(async move {
285 /// tx.send("goodbye").unwrap();
286 /// });
287 ///
288 /// assert_eq!(*rx.borrow(), "hello");
289 ///
290 /// // Waits for the new task to spawn and send the value.
291 /// let v = rx.recv().await.unwrap();
292 /// assert_eq!(v, "goodbye");
293 ///
294 /// let v = rx.recv().await;
295 /// assert!(v.is_err());
296 ///
297 /// task.await;
298 /// # });
299 /// ```
300 pub async fn recv(&mut self) -> Result<T, error::RecvError> {
301 self.changed().await?;
302 Ok(self.borrow().clone())
303 }
304}
305
306impl<T> Clone for Receiver<T> {
307 fn clone(&self) -> Self {
308 self.shared.ref_count_rx.fetch_add(1, Relaxed);
309 Receiver {
310 shared: self.shared.clone(),
311 version: self.version,
312 }
313 }
314}
315
316impl<T> Drop for Receiver<T> {
317 fn drop(&mut self) {
318 if self.shared.ref_count_rx.fetch_sub(1, Relaxed) == 1 {
319 // Notify the single sender.
320 self.shared.event_all_recv_dropped.notify(usize::MAX);
321 }
322 }
323}
324
325impl<T> Sender<T> {
326 /// Sends a new value via the channel, notifying all receivers.
327 pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
328 if self.shared.ref_count_rx.load(Relaxed) == 0 {
329 // All watchers (`Receiver`s) have been dropped.
330 return Err(error::SendError { inner: value });
331 }
332
333 // Replace the value.
334 *self.shared.value.write().unwrap() = value;
335
336 // Update the version. 2 (`VERSION_1`) is used so that the CLOSED bit is not set.
337 self.shared.version.fetch_add(VERSION_1, SeqCst);
338
339 // Notify all watchers.
340 self.shared.event_value_changed.notify(usize::MAX);
341
342 Ok(())
343 }
344
345 /// Completes when all receivers have dropped.
346 ///
347 /// This allows the producer to get notified when interest in the produced
348 /// values is canceled and immediately stop doing work.
349 pub async fn closed(&self) {
350 // Fast path.
351 if self.shared.ref_count_rx.load(Relaxed) == 0 {
352 return;
353 }
354
355 // Listen for events now and check the reference count afterwards to avoid race condition.
356 let listener = self.shared.event_all_recv_dropped.listen();
357
358 if self.shared.ref_count_rx.load(Relaxed) == 0 {
359 return;
360 }
361
362 listener.await;
363 debug_assert_eq!(self.shared.ref_count_rx.load(Relaxed), 0);
364 }
365}
366
367impl<T> Drop for Sender<T> {
368 fn drop(&mut self) {
369 self.shared.version.fetch_or(CLOSED, SeqCst);
370 self.shared.event_value_changed.notify(usize::MAX);
371 }
372}
373
374// ===== impl Ref =====
375
376impl<T> ops::Deref for Ref<'_, T> {
377 type Target = T;
378
379 fn deref(&self) -> &T {
380 self.inner.deref()
381 }
382}