async_watch2/lib.rs
1//! A single-producer, multi-consumer channel that only retains the *last* sent
2//! value.
3//!
4//! Extracted from [Tokio's](https://github.com/tokio-rs/tokio/) `tokio::sync::watch`
5//! implementation, which was initially written by [Carl Lerche](https://github.com/carllerche).
6//!
7//! This channel is useful for watching for changes to a value from multiple
8//! points in the code base, for example, changes to configuration values.
9//!
10//! # Usage
11//!
12//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are
13//! the producer and sender halves of the channel. The channel is
14//! created with an initial value. [`Receiver::recv`] will always
15//! be ready upon creation and will yield either this initial value or
16//! the latest value that has been sent by `Sender`.
17//!
18//! Calls to [`Receiver::recv`] will always yield the latest value.
19//!
20//! # Examples
21//!
22//! ```
23//! # let executor = async_executor::LocalExecutor::new();
24//! # executor.run(async {
25//! let (tx, mut rx) = async_watch2::channel("hello");
26//!
27//! executor.spawn(async move {
28//! while let Some(value) = rx.recv().await {
29//! println!("received = {:?}", value);
30//! }
31//! });
32//!
33//! tx.broadcast("world").unwrap();
34//! # });
35//! ```
36//!
37//! # Closing
38//!
39//! [`Sender::closed`] allows the producer to detect when all [`Receiver`]
40//! handles have been dropped. This indicates that there is no further interest
41//! in the values being produced and work can be stopped.
42//!
43//! # Thread safety
44//!
45//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
46//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
47//! handles may be moved to separate threads and also used concurrently.
48//!
49//! [`Sender`]: crate::Sender
50//! [`Receiver`]: crate::Receiver
51//! [`Receiver::recv`]: crate::Receiver::recv
52//! [`channel`]: crate::channel
53//! [`Sender::closed`]: crate::Sender::closed
54
55pub mod error;
56
57mod poll_fn;
58use poll_fn::poll_fn;
59
60use atomic_waker::AtomicWaker;
61use fnv::FnvHashSet;
62
63use std::ops;
64use std::sync::atomic::AtomicUsize;
65use std::sync::atomic::Ordering::{Relaxed, SeqCst};
66use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
67use std::task::Poll::{Pending, Ready};
68use std::task::{Context, Poll};
69
70/// The initial version starts at zero.
71const VERSION_0: usize = 0b00;
72
73/// The version counter shifted by one position to the left to leave space for the closed bit.
74const VERSION_1: usize = 0b10;
75
76/// The least significant bit signifies a closed channel.
77const CLOSED: usize = 0b01;
78
79/// Receives values from the associated [`Sender`](struct@Sender).
80///
81/// Instances are created by the [`channel`](fn@channel) function.
82#[derive(Debug)]
83pub struct Receiver<T> {
84 /// Pointer to the shared state
85 shared: Arc<Shared<T>>,
86
87 /// Pointer to the watcher's internal state
88 inner: Watcher,
89}
90
91/// Sends values to the associated [`Receiver`](struct@Receiver).
92///
93/// Instances are created by the [`channel`](fn@channel) function.
94#[derive(Debug)]
95pub struct Sender<T> {
96 shared: Weak<Shared<T>>,
97}
98
99/// Returns a reference to the inner value
100///
101/// Outstanding borrows hold a read lock on the inner value. This means that
102/// long lived borrows could cause the produce half to block. It is recommended
103/// to keep the borrow as short lived as possible.
104#[derive(Debug)]
105pub struct Ref<'a, T> {
106 inner: RwLockReadGuard<'a, T>,
107}
108
109#[derive(Debug)]
110struct Shared<T> {
111 /// The most recent value
112 value: RwLock<T>,
113
114 /// The current version
115 ///
116 /// The lowest bit represents a "closed" state. The rest of the bits
117 /// represent the current version.
118 version: AtomicUsize,
119
120 /// All watchers
121 watchers: Mutex<Watchers>,
122
123 /// Task to notify when all watchers drop
124 cancel: AtomicWaker,
125}
126
127type Watchers = FnvHashSet<Watcher>;
128
129/// The watcher's ID is based on the Arc's pointer.
130#[derive(Clone, Debug)]
131struct Watcher(Arc<WatchInner>);
132
133#[derive(Debug)]
134struct WatchInner {
135 /// Last observed version
136 version: AtomicUsize,
137 waker: AtomicWaker,
138}
139
140/// Creates a new watch channel, returning the "send" and "receive" handles.
141///
142/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
143/// Only the last value sent is made available to the [`Receiver`] half. All
144/// intermediate values are dropped.
145///
146/// # Examples
147///
148/// ```
149/// # let executor = async_executor::LocalExecutor::new();
150/// # executor.run(async {
151/// let (tx, mut rx) = async_watch2::channel("hello");
152///
153/// executor.spawn(async move {
154/// while let Some(value) = rx.recv().await {
155/// println!("received = {:?}", value);
156/// }
157/// });
158///
159/// tx.broadcast("world").unwrap();
160/// # });
161/// ```
162///
163/// [`Sender`]: struct@Sender
164/// [`Receiver`]: struct@Receiver
165pub fn channel<T: Clone>(init: T) -> (Sender<T>, Receiver<T>) {
166 // We don't start knowing VERSION_1
167 let inner = Watcher::new_version(VERSION_0);
168
169 // Insert the watcher
170 let mut watchers = Watchers::with_capacity_and_hasher(0, Default::default());
171 watchers.insert(inner.clone());
172
173 let shared = Arc::new(Shared {
174 value: RwLock::new(init),
175 version: AtomicUsize::new(VERSION_1),
176 watchers: Mutex::new(watchers),
177 cancel: AtomicWaker::new(),
178 });
179
180 let tx = Sender {
181 shared: Arc::downgrade(&shared),
182 };
183
184 let rx = Receiver { shared, inner };
185
186 (tx, rx)
187}
188
189impl<T> Receiver<T> {
190 /// Returns a reference to the most recently sent value
191 ///
192 /// Outstanding borrows hold a read lock. This means that long lived borrows
193 /// could cause the send half to block. It is recommended to keep the borrow
194 /// as short lived as possible.
195 ///
196 /// # Examples
197 ///
198 /// ```
199 /// let (_, rx) = async_watch2::channel("hello");
200 /// assert_eq!(*rx.borrow(), "hello");
201 /// ```
202 pub fn borrow(&self) -> Ref<'_, T> {
203 let inner = self.shared.value.read().unwrap();
204 Ref { inner }
205 }
206
207 // TODO: document
208 #[doc(hidden)]
209 pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
210 // Make sure the task is up to date
211 self.inner.waker.register(cx.waker());
212
213 let state = self.shared.version.load(SeqCst);
214 let version = state & !CLOSED;
215
216 if self.inner.version.swap(version, Relaxed) != version {
217 let inner = self.shared.value.read().unwrap();
218
219 return Ready(Some(Ref { inner }));
220 }
221
222 if CLOSED == state & CLOSED {
223 // The `Store` handle has been dropped.
224 return Ready(None);
225 }
226
227 Pending
228 }
229}
230
231impl<T: Clone> Receiver<T> {
232 /// Attempts to clone the latest value sent via the channel.
233 ///
234 /// If this is the first time the function is called on a `Receiver`
235 /// instance, then the function completes immediately with the **current**
236 /// value held by the channel. On the next call, the function waits until
237 /// a new value is sent in the channel.
238 ///
239 /// `None` is returned if the `Sender` half is dropped.
240 ///
241 /// # Examples
242 ///
243 /// ```
244 /// # let executor = async_executor::LocalExecutor::new();
245 /// # executor.run(async {
246 /// let (tx, mut rx) = async_watch2::channel("hello");
247 ///
248 /// let v = rx.recv().await.unwrap();
249 /// assert_eq!(v, "hello");
250 ///
251 /// let task = executor.spawn(async move {
252 /// tx.broadcast("goodbye").unwrap();
253 /// });
254 ///
255 /// // Waits for the new task to spawn and send the value.
256 /// let v = rx.recv().await.unwrap();
257 /// assert_eq!(v, "goodbye");
258 ///
259 /// let v = rx.recv().await;
260 /// assert!(v.is_none());
261 ///
262 /// task.await;
263 /// # });
264 /// ```
265 pub async fn recv(&mut self) -> Option<T> {
266 poll_fn(|cx| {
267 let v_ref = match self.poll_recv_ref(cx) {
268 Ready(v) => v,
269 Pending => return Pending,
270 };
271 Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
272 })
273 .await
274 }
275}
276
277impl<T> Clone for Receiver<T> {
278 fn clone(&self) -> Self {
279 let ver = self.inner.version.load(Relaxed);
280 let inner = Watcher::new_version(ver);
281 let shared = self.shared.clone();
282
283 shared.watchers.lock().unwrap().insert(inner.clone());
284
285 Receiver { shared, inner }
286 }
287}
288
289impl<T> Drop for Receiver<T> {
290 fn drop(&mut self) {
291 self.shared.watchers.lock().unwrap().remove(&self.inner);
292 }
293}
294
295impl<T> Sender<T> {
296 /// Broadcasts a new value via the channel, notifying all receivers.
297 pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> {
298 let shared = match self.shared.upgrade() {
299 Some(shared) => shared,
300 // All `Watch` handles have been canceled
301 None => return Err(error::SendError { inner: value }),
302 };
303
304 // Replace the value
305 {
306 let mut lock = shared.value.write().unwrap();
307 *lock = value;
308 }
309
310 // Update the version. 2 (`VERSION_1`) is used so that the CLOSED bit is not set.
311 shared.version.fetch_add(VERSION_1, SeqCst);
312
313 // Notify all watchers
314 notify_all(&*shared);
315
316 Ok(())
317 }
318
319 /// Completes when all receivers have dropped.
320 ///
321 /// This allows the producer to get notified when interest in the produced
322 /// values is canceled and immediately stop doing work.
323 pub async fn closed(&mut self) {
324 poll_fn(|cx| self.poll_close(cx)).await
325 }
326
327 fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
328 match self.shared.upgrade() {
329 Some(shared) => {
330 shared.cancel.register(cx.waker());
331 Pending
332 }
333 None => Ready(()),
334 }
335 }
336}
337
338/// Notifies all watchers of a change
339fn notify_all<T>(shared: &Shared<T>) {
340 let watchers = shared.watchers.lock().unwrap();
341
342 for watcher in watchers.iter() {
343 // Notify the task
344 watcher.waker.wake();
345 }
346}
347
348impl<T> Drop for Sender<T> {
349 fn drop(&mut self) {
350 if let Some(shared) = self.shared.upgrade() {
351 shared.version.fetch_or(CLOSED, SeqCst);
352 notify_all(&*shared);
353 }
354 }
355}
356
357// ===== impl Ref =====
358
359impl<T> ops::Deref for Ref<'_, T> {
360 type Target = T;
361
362 fn deref(&self) -> &T {
363 self.inner.deref()
364 }
365}
366
367// ===== impl Shared =====
368
369impl<T> Drop for Shared<T> {
370 fn drop(&mut self) {
371 self.cancel.wake();
372 }
373}
374
375// ===== impl Watcher =====
376
377impl Watcher {
378 fn new_version(version: usize) -> Self {
379 Watcher(Arc::new(WatchInner {
380 version: AtomicUsize::new(version),
381 waker: AtomicWaker::new(),
382 }))
383 }
384}
385
386impl std::cmp::PartialEq for Watcher {
387 fn eq(&self, other: &Watcher) -> bool {
388 Arc::ptr_eq(&self.0, &other.0)
389 }
390}
391
392impl std::cmp::Eq for Watcher {}
393
394impl std::hash::Hash for Watcher {
395 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
396 (&*self.0 as *const WatchInner).hash(state)
397 }
398}
399
400impl std::ops::Deref for Watcher {
401 type Target = WatchInner;
402
403 fn deref(&self) -> &Self::Target {
404 &self.0
405 }
406}