Skip to main content

async_observe/
lib.rs

1#![cfg_attr(all(doc, not(doctest)), doc = include_str!("../README.md"))]
2#![cfg_attr(
3    any(not(doc), doctest),
4    doc = "Async single-producer, multi-consumer channel that only retains the last sent value"
5)]
6
7use {
8    event_listener::Event,
9    futures_lite::{Stream, StreamExt, stream},
10    std::{
11        error, fmt,
12        pin::Pin,
13        sync::{
14            Arc, RwLock, RwLockReadGuard, RwLockWriteGuard,
15            atomic::{AtomicUsize, Ordering},
16        },
17        task::{Context, Poll},
18    },
19};
20
21/// Creates a new observer channel, returning the sender and receiver halves.
22///
23/// All values sent by [`Sender`] will become visible to the [`Receiver`]
24/// handles. Only the last value sent is made available to the [`Receiver`]
25/// half. All intermediate values are dropped.
26///
27/// # Examples
28///
29/// This example prints numbers from 0 to 9:
30///
31/// ```
32/// # fn f() {
33/// use {
34///     futures_lite::future,
35///     std::{thread, time::Duration},
36/// };
37///
38/// let (tx, mut rx) = async_observe::channel(0);
39///
40/// // Perform computations in another thread
41/// thread::spawn(move || {
42///     for n in 1..10 {
43///         thread::sleep(Duration::from_secs(1));
44///
45///         // Send a new value without blocking the thread.
46///         // If sending fails, it means the sender was dropped.
47///         // In that case stop the computation.
48///         if tx.send(n).is_err() {
49///             break;
50///         }
51///     }
52/// });
53///
54/// // Print the initial value (0)
55/// let n = rx.observe(|n| *n);
56/// println!("{n}");
57///
58/// future::block_on(async {
59///     // Print the value whenever it changes
60///     while let Ok(n) = rx.recv().await {
61///         println!("{n}");
62///     }
63/// });
64/// # }
65/// ```
66///
67/// In this example a new thread is spawned, but you can also use the channel
68/// to update values from a future/async task.
69///
70/// Note that this channel does not have a message queue - it only stores the
71/// latest update. Therefore, it does *not* guarantee that you will observe
72/// every intermediate value. If you need to observe each change, use a
73/// message-queue channel such as [`async-channel`].
74///
75/// [`async-channel`]: https://docs.rs/async-channel/latest/async_channel
76pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
77    let shared = Arc::new(Shared {
78        value: RwLock::new(init),
79        state: State::new(),
80        rx_count: AtomicUsize::new(1),
81        changed: Event::new(),
82        all_receivers_dropped: Event::new(),
83    });
84
85    let tx = Sender {
86        shared: shared.clone(),
87    };
88
89    let rx = Receiver {
90        shared,
91        last_version: 0,
92    };
93
94    (tx, rx)
95}
96
97/// Sends values to the associated [`Receiver`]s.
98///
99/// Created by the [`channel`] function.
100#[derive(Debug)]
101pub struct Sender<T> {
102    /// The inner shared state.
103    shared: Arc<Shared<T>>,
104}
105
106impl<T> Sender<T> {
107    /// Sends a new value to the channel and notifies all receivers.
108    ///
109    /// # Examples
110    ///
111    /// ```
112    /// let (tx, rx) = async_observe::channel(0);
113    /// assert_eq!(rx.observe(|n| *n), 0);
114    ///
115    /// // Send a new value
116    /// tx.send(1);
117    ///
118    /// // Now the receiver can see it
119    /// assert_eq!(rx.observe(|n| *n), 1);
120    /// ```
121    ///
122    /// To wait until the value is updated, use the receiver's async methods
123    /// [`changed`](Receiver::changed) or [`recv`](Receiver::recv).
124    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
125        if self.shared.rx_count.load(Ordering::Relaxed) == 0 {
126            // All receivers have been dropped
127            return Err(SendError(value));
128        }
129
130        // Replace the value
131        *self.shared.write_value() = value;
132        self.shared.state.increment_version();
133
134        // Notify all receivers
135        self.shared.changed.notify(usize::MAX);
136
137        Ok(())
138    }
139
140    /// Waits until all receivers are dropped.
141    ///
142    /// # Examples
143    ///
144    /// A producer can wait until no consumers is interested in its updates
145    /// anymore and then stop working.
146    ///
147    /// ```
148    /// # futures_lite::future::block_on(async {
149    /// # use futures_lite::future::yield_now as wait_long_time;
150    /// use futures_lite::future;
151    ///
152    /// let (tx, mut rx) = async_observe::channel(0);
153    ///
154    /// // The producer runs concurrently and waits until the channel
155    /// // is closed, then cancels the main future
156    /// let producer = future::or(
157    ///     async {
158    ///         let mut n = 0;
159    ///         loop {
160    ///             wait_long_time().await;
161    ///
162    ///             // The producer could check if the channel is closed on send,
163    ///             // but computing a new value might take a long time.
164    ///             // Instead, we cancel the whole working future.
165    ///             _ = tx.send(n);
166    ///             n += 1;
167    ///         }
168    ///     },
169    ///     tx.closed(),
170    /// );
171    ///
172    /// let consumer = async move {
173    /// //                   ^^^^
174    /// // Note: rx is moved into this future,
175    /// // so it will be dropped when the loop ends.
176    /// // Alternatively, you could call `drop(rx);` explicitly.
177    ///
178    ///     while let Ok(n) = rx.recv().await {
179    ///         // After receiving number 5,
180    ///         // the consumer is no longer interested.
181    ///         if n == 5 {
182    ///             break;
183    ///         }
184    ///     }
185    /// };
186    ///
187    /// future::zip(producer, consumer).await;
188    /// # });
189    /// ```
190    ///
191    /// The receiver is dropped after getting the number 5. Since there are no
192    /// other receivers, this makes `tx.closed()` finish and the sender stops
193    /// sending new values.
194    pub async fn closed(&self) {
195        if self.shared.rx_count.load(Ordering::Relaxed) == 0 {
196            return;
197        }
198
199        // In order to avoid a notification loss, we first request a notification,
200        // **then** check the current `rx_count`.
201        // If `rx_count` is 0, the notification request is dropped.
202        event_listener::listener!(self.shared.all_receivers_dropped => listener);
203
204        if self.shared.rx_count.load(Ordering::Relaxed) == 0 {
205            return;
206        }
207
208        listener.await;
209
210        debug_assert_eq!(self.shared.rx_count.load(Ordering::Relaxed), 0);
211    }
212}
213
214impl<T> Drop for Sender<T> {
215    fn drop(&mut self) {
216        self.shared.state.close();
217        self.shared.changed.notify(usize::MAX);
218    }
219}
220
221/// Error produced when sending a value fails.
222#[derive(PartialEq, Eq)]
223pub struct SendError<T>(pub T);
224
225impl<T> fmt::Display for SendError<T> {
226    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227        f.write_str("sending on a closed channel")
228    }
229}
230
231impl<T> fmt::Debug for SendError<T> {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        f.write_str("sending on a closed channel")
234    }
235}
236
237impl<T> error::Error for SendError<T> {}
238
239/// Receives values from the associated [`Sender`].
240///
241/// Created by the [`channel`] function.
242#[derive(Debug)]
243pub struct Receiver<T> {
244    /// The inner shared state.
245    shared: Arc<Shared<T>>,
246
247    /// Last observed version.
248    last_version: usize,
249}
250
251impl<T> Receiver<T> {
252    /// Observes the latest value sent to the channel.
253    ///
254    /// This method takes a closure and calls it with a reference to the value.
255    /// While the closure is running [`send`](Sender::send) calls are blocked.
256    /// Because of this, the closure should run only as long as needed.
257    /// A common pattern is to copy or clone the value inside the closure, then
258    /// return and work with the copy outside.
259    ///
260    /// You can observe the value at any time, but usually you want to wait
261    /// until it changes. For that, use the [`changed`](Receiver::changed)
262    /// async method.
263    ///
264    /// # Examples
265    ///
266    /// ```
267    /// # futures_lite::future::block_on(async {
268    /// let (tx, mut rx) = async_observe::channel(0);
269    ///
270    /// // Send a new value
271    /// tx.send(1);
272    ///
273    /// // Wait until the value changes
274    /// rx.changed().await?;
275    ///
276    /// // Now we can read the new value
277    /// let n = rx.observe(|n| *n);
278    /// assert_eq!(n, 1);
279    /// # Ok::<_, async_observe::RecvError>(())
280    /// # });
281    /// ```
282    ///
283    /// If the value type implements `Clone`, you can use
284    /// [`recv`](Receiver::recv) instead, which waits for a change and returns
285    /// the new value.
286    ///
287    /// ```
288    /// # futures_lite::future::block_on(async {
289    /// let (tx, mut rx) = async_observe::channel(0);
290    ///
291    /// // Send a new value
292    /// tx.send(1);
293    ///
294    /// // Wait until the value changes and read it
295    /// let n = rx.recv().await?;
296    /// assert_eq!(n, 1);
297    /// # Ok::<_, async_observe::RecvError>(())
298    /// # });
299    /// ```
300    ///
301    /// # Possible deadlock
302    ///
303    /// Calling [`send`](Sender::send) inside the closure will deadlock:
304    ///
305    /// ```no_run
306    /// let (tx, rx) = async_observe::channel(0);
307    /// rx.observe(|n| {
308    ///     _ = tx.send(n + 1);
309    /// });
310    /// ```
311    pub fn observe<F, R>(&self, f: F) -> R
312    where
313        F: FnOnce(&T) -> R,
314    {
315        f(&self.shared.read_value())
316    }
317
318    /// Waits for the value to change.
319    ///
320    /// Call [`observe`](Receiver::observe) to read the new value.
321    pub async fn changed(&mut self) -> Result<(), RecvError> {
322        if self
323            .shared
324            .state
325            .version_changed(&mut self.last_version)
326            .ok_or(RecvError)?
327        {
328            return Ok(());
329        }
330
331        // In order to avoid a notification loss, we first request a notification,
332        // **then** check the current value's version.
333        // If a new version exists, the notification request is dropped.
334        event_listener::listener!(self.shared.changed => listener);
335
336        if self
337            .shared
338            .state
339            .version_changed(&mut self.last_version)
340            .ok_or(RecvError)?
341        {
342            return Ok(());
343        }
344
345        listener.await;
346
347        let changed = self
348            .shared
349            .state
350            .version_changed(&mut self.last_version)
351            .ok_or(RecvError)?;
352
353        debug_assert!(changed);
354        Ok(())
355    }
356
357    /// Waits for the value to change and then returns a clone of it.
358    ///
359    /// # Examples
360    ///
361    /// ```
362    /// # futures_lite::future::block_on(async {
363    /// let (tx, mut rx) = async_observe::channel(0);
364    ///
365    /// // Send a new value
366    /// tx.send(1);
367    ///
368    /// // Wait until the value changes and read it
369    /// let n = rx.recv().await?;
370    /// assert_eq!(n, 1);
371    /// # Ok::<_, async_observe::RecvError>(())
372    /// # });
373    /// ```
374    pub async fn recv(&mut self) -> Result<T, RecvError>
375    where
376        T: Clone,
377    {
378        self.changed().await?;
379        Ok(self.observe(T::clone))
380    }
381
382    /// Creates a [stream](Stream) from the receiver.
383    ///
384    /// The stream ends when the [`Sender`] is dropped.
385    ///
386    /// # Examples
387    ///
388    /// ```
389    /// # futures_lite::future::block_on(async {
390    /// # use futures_lite::future::yield_now as wait_long_time;
391    /// use futures_lite::{StreamExt, future};
392    ///
393    /// let (tx, rx) = async_observe::channel(0);
394    ///
395    /// let producer = async move {
396    /// //                   ^^^^
397    /// // Move tx into the future so it is dropped after the loop ends.
398    /// // Dropping the sender is important so the receiver can
399    /// // see it and stop the stream.
400    ///
401    ///     for n in 1..10 {
402    ///         wait_long_time().await;
403    ///         _ = tx.send(n);
404    ///     }
405    /// };
406    ///
407    /// // Create a stream from the receiver
408    /// let consumer = rx
409    ///     .into_stream()
410    ///     .for_each(|n| println!("{n}"));
411    ///
412    /// future::zip(producer, consumer).await;
413    /// # });
414    /// ```
415    pub fn into_stream<'item>(self) -> Updates<'item, T>
416    where
417        T: Clone + Send + Sync + 'item,
418    {
419        Updates {
420            inner: Box::pin(stream::unfold(self, async |mut me| {
421                let value = me.recv().await.ok()?;
422                Some((value, me))
423            })),
424        }
425    }
426}
427
428impl<T> Clone for Receiver<T> {
429    fn clone(&self) -> Self {
430        self.shared.rx_count.fetch_add(1, Ordering::Relaxed);
431        Self {
432            shared: self.shared.clone(),
433            last_version: self.last_version,
434        }
435    }
436}
437
438impl<T> Drop for Receiver<T> {
439    fn drop(&mut self) {
440        if self.shared.rx_count.fetch_sub(1, Ordering::Relaxed) == 1 {
441            // Notify all senders.
442            // Even though `Sender` is not `Clone`, it can still wait for
443            // the channel to close from multiple places, since the `closed`
444            // method takes `&self`.
445            self.shared.all_receivers_dropped.notify(usize::MAX);
446        }
447    }
448}
449
450/// Error produced when receiving a change notification.
451#[derive(PartialEq, Eq)]
452pub struct RecvError;
453
454impl fmt::Display for RecvError {
455    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
456        f.write_str("receiving on a closed channel")
457    }
458}
459
460impl fmt::Debug for RecvError {
461    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462        f.write_str("receiving on a closed channel")
463    }
464}
465
466impl error::Error for RecvError {}
467
468/// The stream type from [`into_stream`](Receiver::into_stream) method.
469pub struct Updates<'item, T> {
470    inner: Pin<Box<dyn Stream<Item = T> + Send + Sync + 'item>>,
471}
472
473impl<T> Stream for Updates<'_, T> {
474    type Item = T;
475
476    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
477        self.inner.poll_next(cx)
478    }
479}
480
481#[derive(Debug)]
482struct Shared<T> {
483    /// The most recent value.
484    value: RwLock<T>,
485
486    /// The current state.
487    state: State,
488
489    /// Tracks the number of `Receiver` instances.
490    rx_count: AtomicUsize,
491
492    /// Event when the value has changed or the `Sender` has been dropped.
493    changed: Event,
494
495    /// Event when all `Receiver`s have been dropped.
496    all_receivers_dropped: Event,
497}
498
499impl<T> Shared<T> {
500    fn read_value(&self) -> RwLockReadGuard<'_, T> {
501        // The `RwLock` has no poisoned state
502        match self.value.read() {
503            Ok(guard) => guard,
504            Err(e) => e.into_inner(),
505        }
506    }
507
508    fn write_value(&self) -> RwLockWriteGuard<'_, T> {
509        // The `RwLock` has no poisoned state
510        match self.value.write() {
511            Ok(guard) => guard,
512            Err(e) => e.into_inner(),
513        }
514    }
515}
516
517#[derive(Debug)]
518struct State(AtomicUsize);
519
520impl State {
521    /// Using 2 as the version step preserves the `CLOSED_BIT`.
522    const VERSION_STEP: usize = 2;
523
524    /// The least significant bit signifies a closed channel.
525    const CLOSED_BIT: usize = 1;
526
527    fn new() -> Self {
528        Self(AtomicUsize::new(0))
529    }
530
531    fn increment_version(&self) {
532        self.0.fetch_add(Self::VERSION_STEP, Ordering::Release);
533    }
534
535    fn version_changed(&self, last_version: &mut usize) -> Option<bool> {
536        let state = self.0.load(Ordering::Acquire);
537        let new_version = state & !Self::CLOSED_BIT;
538
539        if *last_version != new_version {
540            *last_version = new_version;
541            return Some(true);
542        }
543
544        if Self::CLOSED_BIT == state & Self::CLOSED_BIT {
545            return None;
546        }
547
548        Some(false)
549    }
550
551    fn close(&self) {
552        self.0.fetch_or(Self::CLOSED_BIT, Ordering::Release);
553    }
554}