just_watch/lib.rs
1//! Initially a fork of the [`async_watch`](https://github.com/cynecx/async-watch) crate.
2//!
3//! A single-producer, multi-consumer channel that only retains the *last* sent
4//! value.
5//!
6//! Extracted from [Tokio's](https://github.com/tokio-rs/tokio/) `tokio::sync::watch`
7//! implementation, which was written by [Carl Lerche](https://github.com/carllerche).
8//!
9//! This channel is useful for watching for changes to a value from multiple
10//! points in the code base, for example, changes to configuration values.
11//!
12//! # Usage
13//!
14//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are
15//! the producer and sender halves of the channel. The channel is
16//! created with an initial value. The **latest** value stored in the channel is accessed with
17//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
18//! value to sent by the [`Sender`] half. Awaiting [`Receiver::recv()`] combines
19//! [`Receiver::changed()`] and [`Receiver::borrow()`] where the borrowed value
20//! is cloned and returned.
21//!
22//!
23//! # Examples
24//!
25//! ```
26//! # let executor = async_executor::LocalExecutor::new();
27//! # executor.run(async {
28//! let (tx, mut rx) = just_watch::channel("hello");
29//! let mut rx2 = rx.clone();
30//!
31//! // First variant
32//! executor.spawn(async move {
33//! while let Ok(value) = rx.recv().await {
34//! println!("received = {:?}", value);
35//! }
36//! });
37//!
38//! // Second variant
39//! executor.spawn(async move {
40//! while rx2.changed().await.is_ok() {
41//! println!("received = {:?}", *rx2.borrow());
42//! }
43//! });
44//!
45//! tx.send("world").unwrap();
46//! # });
47//! ```
48//!
49//! # Closing
50//!
51//! [`Sender::closed`] allows the producer to detect when all [`Receiver`]
52//! handles have been dropped. This indicates that there is no further interest
53//! in the values being produced and work can be stopped.
54//!
55//! # Thread safety
56//!
57//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
58//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
59//! handles may be moved to separate threads and also used concurrently.
60//!
61//! [`Sender`]: crate::Sender
62//! [`Receiver`]: crate::Receiver
63//! [`Receiver::recv`]: crate::Receiver::recv
64//! [`channel`]: crate::channel
65//! [`Sender::closed`]: crate::Sender::closed
66
67#![warn(missing_docs)]
68
69pub mod error;
70
71use event_listener::Event;
72
73use std::ops;
74use std::sync::atomic::AtomicUsize;
75use std::sync::atomic::Ordering::{Relaxed, SeqCst};
76use std::sync::{Arc, RwLock, RwLockReadGuard};
77
78/// The initial version starts at zero.
79const VERSION_0: usize = 0b00;
80
81/// The version counter shifted by one position to the left to leave space for the closed bit.
82const VERSION_1: usize = 0b10;
83
84/// The least significant bit signifies a closed channel.
85const CLOSED: usize = 0b01;
86
87/// Receives values from the associated [`Sender`](struct@Sender).
88///
89/// Instances are created by the [`channel`](fn@channel) function.
90#[derive(Debug)]
91pub struct Receiver<T> {
92 /// Pointer to the shared state
93 shared: Arc<Shared<T>>,
94
95 /// Last observed version.
96 version: usize,
97}
98
99/// Sends values to the associated [`Receiver`](struct@Receiver).
100///
101/// Instances are created by the [`channel`](fn@channel) function.
102#[derive(Debug)]
103pub struct Sender<T> {
104 shared: Arc<Shared<T>>,
105}
106
107/// Returns a reference to the inner value.
108///
109/// Outstanding borrows hold a read lock on the inner value. This means that
110/// long lived borrows could cause the produce half to block. It is recommended
111/// to keep the borrow as short lived as possible.
112#[derive(Debug)]
113pub struct Ref<'a, T> {
114 inner: RwLockReadGuard<'a, T>,
115}
116
117#[derive(Debug)]
118struct Shared<T> {
119 /// The most recent value
120 value: RwLock<T>,
121
122 /// The current version
123 ///
124 /// The lowest bit represents a "closed" state. The rest of the bits
125 /// represent the current version.
126 version: AtomicUsize,
127
128 /// Tracks the number of `Receiver` instances.
129 ref_count_rx: AtomicUsize,
130
131 /// Event when the value has changed or the `Sender` has been dropped.
132 event_value_changed: Event,
133
134 /// Event when all `Receiver`s have been dropped.
135 event_all_recv_dropped: Event,
136}
137
138/// Creates a new watch channel, returning the "send" and "receive" handles.
139///
140/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
141/// Only the last value sent is made available to the [`Receiver`] half. All
142/// intermediate values are dropped.
143///
144/// # Examples
145///
146/// ```
147/// # let executor = async_executor::LocalExecutor::new();
148/// # executor.run(async {
149/// let (tx, mut rx) = just_watch::channel("hello");
150///
151/// executor.spawn(async move {
152/// while let Ok(value) = rx.recv().await {
153/// println!("received = {:?}", value);
154/// }
155/// });
156///
157/// tx.send("world").unwrap();
158/// # });
159/// ```
160///
161/// [`Sender`]: struct@Sender
162/// [`Receiver`]: struct@Receiver
163pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
164 let shared = Arc::new(Shared {
165 value: RwLock::new(init),
166 version: AtomicUsize::new(VERSION_0),
167 ref_count_rx: AtomicUsize::new(1),
168 event_value_changed: Event::new(),
169 event_all_recv_dropped: Event::new(),
170 });
171
172 let tx = Sender {
173 shared: shared.clone(),
174 };
175
176 let rx = Receiver {
177 shared,
178 version: VERSION_0,
179 };
180
181 (tx, rx)
182}
183
184impl<T> Receiver<T> {
185 /// Returns a reference to the most recently sent value.
186 ///
187 /// Outstanding borrows hold a read lock. This means that long lived borrows
188 /// could cause the send half to block. It is recommended to keep the borrow
189 /// as short lived as possible.
190 ///
191 /// # Examples
192 ///
193 /// ```
194 /// let (_, rx) = just_watch::channel("hello");
195 /// assert_eq!(*rx.borrow(), "hello");
196 /// ```
197 pub fn borrow(&self) -> Ref<'_, T> {
198 let inner = self.shared.value.read().unwrap();
199 Ref { inner }
200 }
201
202 /// Wait for a change notification.
203 ///
204 /// Returns when a new value has been sent by the [`Sender`] since the last
205 /// time `changed()` was called. When the `Sender` half is dropped, `Err` is
206 /// returned.
207 ///
208 /// [`Sender`]: struct@Sender
209 ///
210 /// # Examples
211 ///
212 /// ```
213 /// # let executor = async_executor::LocalExecutor::new();
214 /// # executor.run(async {
215 /// let (tx, mut rx) = just_watch::channel("hello");
216 ///
217 /// let task = executor.spawn(async move {
218 /// tx.send("goodbye").unwrap();
219 /// });
220 ///
221 /// assert!(rx.changed().await.is_ok());
222 /// assert_eq!(*rx.borrow(), "goodbye");
223 ///
224 /// // The `tx` handle has been dropped
225 /// assert!(rx.changed().await.is_err());
226 ///
227 /// task.await;
228 /// });
229 /// ```
230 pub async fn changed(&mut self) -> Result<(), error::RecvError> {
231 // Fast path: Check the state first.
232 if let Some(ret) = self.maybe_changed() {
233 return ret;
234 }
235
236 // In order to avoid a race condition, we first request a notification,
237 // **then** check the current value's version. If a new version exists,
238 // the notification request is dropped.
239 let listener = self.shared.event_value_changed.listen();
240
241 if let Some(ret) = self.maybe_changed() {
242 return ret;
243 }
244
245 listener.await;
246
247 self.maybe_changed()
248 .expect("[bug] failed to observe change after notificaton.")
249 }
250
251 fn maybe_changed(&mut self) -> Option<Result<(), error::RecvError>> {
252 // Load the version from the state
253 let state = self.shared.version.load(SeqCst);
254 let new_version = state & !CLOSED;
255
256 if self.version != new_version {
257 // Observe the new version and return
258 self.version = new_version;
259 return Some(Ok(()));
260 }
261
262 if CLOSED == state & CLOSED {
263 // All receivers have dropped.
264 return Some(Err(error::RecvError {}));
265 }
266
267 // No changes.
268 None
269 }
270}
271
272impl<T: Clone> Receiver<T> {
273 /// A convenience helper which combines calling [`Receiver::changed()`] and
274 /// [`Receiver::borrow()`] where the borrowed value is cloned and returned.
275 ///
276 /// Note: If this is the first time the function is called on a `Receiver`
277 /// instance, then the function **will wait** until a new value is sent into the channel.
278 ///
279 /// `None` is returned if the `Sender` half is dropped.
280 ///
281 /// # Examples
282 ///
283 /// ```
284 /// # let executor = async_executor::LocalExecutor::new();
285 /// # executor.run(async {
286 /// let (tx, mut rx) = just_watch::channel("hello");
287 ///
288 /// let task = executor.spawn(async move {
289 /// tx.send("goodbye").unwrap();
290 /// });
291 ///
292 /// assert_eq!(*rx.borrow(), "hello");
293 ///
294 /// // Waits for the new task to spawn and send the value.
295 /// let v = rx.recv().await.unwrap();
296 /// assert_eq!(v, "goodbye");
297 ///
298 /// let v = rx.recv().await;
299 /// assert!(v.is_err());
300 ///
301 /// task.await;
302 /// # });
303 /// ```
304 pub async fn recv(&mut self) -> Result<T, error::RecvError> {
305 self.changed().await?;
306 Ok(self.borrow().clone())
307 }
308}
309
310impl<T> Clone for Receiver<T> {
311 fn clone(&self) -> Self {
312 self.shared.ref_count_rx.fetch_add(1, Relaxed);
313 Receiver {
314 shared: self.shared.clone(),
315 version: self.version,
316 }
317 }
318}
319
320impl<T> Drop for Receiver<T> {
321 fn drop(&mut self) {
322 if self.shared.ref_count_rx.fetch_sub(1, Relaxed) == 1 {
323 // Notify the single sender.
324 self.shared.event_all_recv_dropped.notify(usize::MAX);
325 }
326 }
327}
328
329impl<T> Sender<T> {
330 /// Creates a new `Receiver` connected to a `Sender`.
331 pub fn subscribe(&self) -> Receiver<T> {
332 self.shared.ref_count_rx.fetch_add(1, Relaxed);
333 let version = self.shared.version.load(SeqCst);
334 Receiver {
335 shared: self.shared.clone(),
336 version,
337 }
338 }
339
340 /// Sends a new value via the channel, notifying all receivers.
341 pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
342 if self.shared.ref_count_rx.load(Relaxed) == 0 {
343 // All watchers (`Receiver`s) have been dropped.
344 return Err(error::SendError { inner: value });
345 }
346
347 // Replace the value.
348 *self.shared.value.write().unwrap() = value;
349
350 // Update the version. 2 (`VERSION_1`) is used so that the CLOSED bit is not set.
351 self.shared.version.fetch_add(VERSION_1, SeqCst);
352
353 // Notify all watchers.
354 self.shared.event_value_changed.notify(usize::MAX);
355
356 Ok(())
357 }
358
359 /// Applies a function to the existing value.
360 ///
361 /// A function gets a mutable reference that can be used to change a stored value.
362 pub fn send_modify<F>(&self, func: F)
363 where
364 F: FnOnce(&mut T),
365 {
366 // Replace the value.
367 func(&mut *self.shared.value.write().unwrap());
368
369 // Update the version. 2 (`VERSION_1`) is used so that the CLOSED bit is not set.
370 self.shared.version.fetch_add(VERSION_1, SeqCst);
371
372 // Notify all watchers.
373 self.shared.event_value_changed.notify(usize::MAX);
374 }
375
376 /// Completes when all receivers have dropped.
377 ///
378 /// This allows the producer to get notified when interest in the produced
379 /// values is canceled and immediately stop doing work.
380 pub async fn closed(&self) {
381 // Fast path.
382 if self.shared.ref_count_rx.load(Relaxed) == 0 {
383 return;
384 }
385
386 // Listen for events now and check the reference count afterwards to avoid race condition.
387 let listener = self.shared.event_all_recv_dropped.listen();
388
389 if self.shared.ref_count_rx.load(Relaxed) == 0 {
390 return;
391 }
392
393 listener.await;
394 }
395}
396
397impl<T> Drop for Sender<T> {
398 fn drop(&mut self) {
399 self.shared.version.fetch_or(CLOSED, SeqCst);
400 self.shared.event_value_changed.notify(usize::MAX);
401 }
402}
403
404// ===== impl Ref =====
405
406impl<T> ops::Deref for Ref<'_, T> {
407 type Target = T;
408
409 fn deref(&self) -> &T {
410 self.inner.deref()
411 }
412}