async_observe/lib.rs
1#![cfg_attr(all(doc, not(doctest)), doc = include_str!("../README.md"))]
2#![cfg_attr(
3 any(not(doc), doctest),
4 doc = "Async single-producer, multi-consumer channel that only retains the last sent value"
5)]
6
7use {
8 event_listener::Event,
9 futures_lite::{Stream, StreamExt, stream},
10 std::{
11 error, fmt,
12 pin::Pin,
13 sync::{
14 Arc, RwLock, RwLockReadGuard, RwLockWriteGuard,
15 atomic::{AtomicUsize, Ordering},
16 },
17 task::{Context, Poll},
18 },
19};
20
21/// Creates a new observer channel, returning the sender and receiver halves.
22///
23/// All values sent by [`Sender`] will become visible to the [`Receiver`]
24/// handles. Only the last value sent is made available to the [`Receiver`]
25/// half. All intermediate values are dropped.
26///
27/// # Examples
28///
29/// This example prints numbers from 0 to 9:
30///
31/// ```
32/// # fn f() {
33/// use {
34/// futures_lite::future,
35/// std::{thread, time::Duration},
36/// };
37///
38/// let (tx, mut rx) = async_observe::channel(0);
39///
40/// // Perform computations in another thread
41/// thread::spawn(move || {
42/// for n in 1..10 {
43/// thread::sleep(Duration::from_secs(1));
44///
45/// // Send a new value without blocking the thread.
46/// // If sending fails, it means the sender was dropped.
47/// // In that case stop the computation.
48/// if tx.send(n).is_err() {
49/// break;
50/// }
51/// }
52/// });
53///
54/// // Print the initial value (0)
55/// let n = rx.observe(|n| *n);
56/// println!("{n}");
57///
58/// future::block_on(async {
59/// // Print the value whenever it changes
60/// while let Ok(n) = rx.recv().await {
61/// println!("{n}");
62/// }
63/// });
64/// # }
65/// ```
66///
67/// In this example a new thread is spawned, but you can also use the channel
68/// to update values from a future/async task.
69///
70/// Note that this channel does not have a message queue - it only stores the
71/// latest update. Therefore, it does *not* guarantee that you will observe
72/// every intermediate value. If you need to observe each change, use a
73/// message-queue channel such as [`async-channel`].
74///
75/// [`async-channel`]: https://docs.rs/async-channel/latest/async_channel
76pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
77 let shared = Arc::new(Shared {
78 value: RwLock::new(init),
79 state: State::new(),
80 rx_count: AtomicUsize::new(1),
81 changed: Event::new(),
82 all_receivers_dropped: Event::new(),
83 });
84
85 let tx = Sender {
86 shared: shared.clone(),
87 };
88
89 let rx = Receiver {
90 shared,
91 last_version: 0,
92 };
93
94 (tx, rx)
95}
96
97/// Sends values to the associated [`Receiver`]s.
98///
99/// Created by the [`channel`] function.
100#[derive(Debug)]
101pub struct Sender<T> {
102 /// The inner shared state.
103 shared: Arc<Shared<T>>,
104}
105
106impl<T> Sender<T> {
107 /// Sends a new value to the channel and notifies all receivers.
108 ///
109 /// # Examples
110 ///
111 /// ```
112 /// let (tx, rx) = async_observe::channel(0);
113 /// assert_eq!(rx.observe(|n| *n), 0);
114 ///
115 /// // Send a new value
116 /// tx.send(1);
117 ///
118 /// // Now the receiver can see it
119 /// assert_eq!(rx.observe(|n| *n), 1);
120 /// ```
121 ///
122 /// To wait until the value is updated, use the receiver's async methods
123 /// [`changed`](Receiver::changed) or [`recv`](Receiver::recv).
124 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
125 if self.shared.rx_count.load(Ordering::Relaxed) == 0 {
126 // All receivers have been dropped
127 return Err(SendError(value));
128 }
129
130 // Replace the value
131 *self.shared.write_value() = value;
132 self.shared.state.increment_version();
133
134 // Notify all receivers
135 self.shared.changed.notify(usize::MAX);
136
137 Ok(())
138 }
139
140 /// Waits until all receivers are dropped.
141 ///
142 /// # Examples
143 ///
144 /// A producer can wait until no consumers is interested in its updates
145 /// anymore and then stop working.
146 ///
147 /// ```
148 /// # futures_lite::future::block_on(async {
149 /// # use futures_lite::future::yield_now as wait_long_time;
150 /// use futures_lite::future;
151 ///
152 /// let (tx, mut rx) = async_observe::channel(0);
153 ///
154 /// // The producer runs concurrently and waits until the channel
155 /// // is closed, then cancels the main future
156 /// let producer = future::or(
157 /// async {
158 /// let mut n = 0;
159 /// loop {
160 /// wait_long_time().await;
161 ///
162 /// // The producer could check if the channel is closed on send,
163 /// // but computing a new value might take a long time.
164 /// // Instead, we cancel the whole working future.
165 /// _ = tx.send(n);
166 /// n += 1;
167 /// }
168 /// },
169 /// tx.closed(),
170 /// );
171 ///
172 /// let consumer = async move {
173 /// // ^^^^
174 /// // Note: rx is moved into this future,
175 /// // so it will be dropped when the loop ends.
176 /// // Alternatively, you could call `drop(rx);` explicitly.
177 ///
178 /// while let Ok(n) = rx.recv().await {
179 /// // After receiving number 5,
180 /// // the consumer is no longer interested.
181 /// if n == 5 {
182 /// break;
183 /// }
184 /// }
185 /// };
186 ///
187 /// future::zip(producer, consumer).await;
188 /// # });
189 /// ```
190 ///
191 /// The receiver is dropped after getting the number 5. Since there are no
192 /// other receivers, this makes `tx.closed()` finish and the sender stops
193 /// sending new values.
194 pub async fn closed(&self) {
195 if self.shared.rx_count.load(Ordering::Relaxed) == 0 {
196 return;
197 }
198
199 // In order to avoid a notification loss, we first request a notification,
200 // **then** check the current `rx_count`.
201 // If `rx_count` is 0, the notification request is dropped.
202 event_listener::listener!(self.shared.all_receivers_dropped => listener);
203
204 if self.shared.rx_count.load(Ordering::Relaxed) == 0 {
205 return;
206 }
207
208 listener.await;
209
210 debug_assert_eq!(self.shared.rx_count.load(Ordering::Relaxed), 0);
211 }
212}
213
214impl<T> Drop for Sender<T> {
215 fn drop(&mut self) {
216 self.shared.state.close();
217 self.shared.changed.notify(usize::MAX);
218 }
219}
220
221/// Error produced when sending a value fails.
222#[derive(PartialEq, Eq)]
223pub struct SendError<T>(pub T);
224
225impl<T> fmt::Display for SendError<T> {
226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227 f.write_str("sending on a closed channel")
228 }
229}
230
231impl<T> fmt::Debug for SendError<T> {
232 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233 f.write_str("sending on a closed channel")
234 }
235}
236
237impl<T> error::Error for SendError<T> {}
238
239/// Receives values from the associated [`Sender`].
240///
241/// Created by the [`channel`] function.
242#[derive(Debug)]
243pub struct Receiver<T> {
244 /// The inner shared state.
245 shared: Arc<Shared<T>>,
246
247 /// Last observed version.
248 last_version: usize,
249}
250
251impl<T> Receiver<T> {
252 /// Observes the latest value sent to the channel.
253 ///
254 /// This method takes a closure and calls it with a reference to the value.
255 /// While the closure is running [`send`](Sender::send) calls are blocked.
256 /// Because of this, the closure should run only as long as needed.
257 /// A common pattern is to copy or clone the value inside the closure, then
258 /// return and work with the copy outside.
259 ///
260 /// You can observe the value at any time, but usually you want to wait
261 /// until it changes. For that, use the [`changed`](Receiver::changed)
262 /// async method.
263 ///
264 /// # Examples
265 ///
266 /// ```
267 /// # futures_lite::future::block_on(async {
268 /// let (tx, mut rx) = async_observe::channel(0);
269 ///
270 /// // Send a new value
271 /// tx.send(1);
272 ///
273 /// // Wait until the value changes
274 /// rx.changed().await?;
275 ///
276 /// // Now we can read the new value
277 /// let n = rx.observe(|n| *n);
278 /// assert_eq!(n, 1);
279 /// # Ok::<_, async_observe::RecvError>(())
280 /// # });
281 /// ```
282 ///
283 /// If the value type implements `Clone`, you can use
284 /// [`recv`](Receiver::recv) instead, which waits for a change and returns
285 /// the new value.
286 ///
287 /// ```
288 /// # futures_lite::future::block_on(async {
289 /// let (tx, mut rx) = async_observe::channel(0);
290 ///
291 /// // Send a new value
292 /// tx.send(1);
293 ///
294 /// // Wait until the value changes and read it
295 /// let n = rx.recv().await?;
296 /// assert_eq!(n, 1);
297 /// # Ok::<_, async_observe::RecvError>(())
298 /// # });
299 /// ```
300 ///
301 /// # Possible deadlock
302 ///
303 /// Calling [`send`](Sender::send) inside the closure will deadlock:
304 ///
305 /// ```no_run
306 /// let (tx, rx) = async_observe::channel(0);
307 /// rx.observe(|n| {
308 /// _ = tx.send(n + 1);
309 /// });
310 /// ```
311 pub fn observe<F, R>(&self, f: F) -> R
312 where
313 F: FnOnce(&T) -> R,
314 {
315 f(&self.shared.read_value())
316 }
317
318 /// Waits for the value to change.
319 ///
320 /// Call [`observe`](Receiver::observe) to read the new value.
321 pub async fn changed(&mut self) -> Result<(), RecvError> {
322 if self
323 .shared
324 .state
325 .version_changed(&mut self.last_version)
326 .ok_or(RecvError)?
327 {
328 return Ok(());
329 }
330
331 // In order to avoid a notification loss, we first request a notification,
332 // **then** check the current value's version.
333 // If a new version exists, the notification request is dropped.
334 event_listener::listener!(self.shared.changed => listener);
335
336 if self
337 .shared
338 .state
339 .version_changed(&mut self.last_version)
340 .ok_or(RecvError)?
341 {
342 return Ok(());
343 }
344
345 listener.await;
346
347 let changed = self
348 .shared
349 .state
350 .version_changed(&mut self.last_version)
351 .ok_or(RecvError)?;
352
353 debug_assert!(changed);
354 Ok(())
355 }
356
357 /// Waits for the value to change and then returns a clone of it.
358 ///
359 /// # Examples
360 ///
361 /// ```
362 /// # futures_lite::future::block_on(async {
363 /// let (tx, mut rx) = async_observe::channel(0);
364 ///
365 /// // Send a new value
366 /// tx.send(1);
367 ///
368 /// // Wait until the value changes and read it
369 /// let n = rx.recv().await?;
370 /// assert_eq!(n, 1);
371 /// # Ok::<_, async_observe::RecvError>(())
372 /// # });
373 /// ```
374 pub async fn recv(&mut self) -> Result<T, RecvError>
375 where
376 T: Clone,
377 {
378 self.changed().await?;
379 Ok(self.observe(T::clone))
380 }
381
382 /// Creates a [stream](Stream) from the receiver.
383 ///
384 /// The stream ends when the [`Sender`] is dropped.
385 ///
386 /// # Examples
387 ///
388 /// ```
389 /// # futures_lite::future::block_on(async {
390 /// # use futures_lite::future::yield_now as wait_long_time;
391 /// use futures_lite::{StreamExt, future};
392 ///
393 /// let (tx, rx) = async_observe::channel(0);
394 ///
395 /// let producer = async move {
396 /// // ^^^^
397 /// // Move tx into the future so it is dropped after the loop ends.
398 /// // Dropping the sender is important so the receiver can
399 /// // see it and stop the stream.
400 ///
401 /// for n in 1..10 {
402 /// wait_long_time().await;
403 /// _ = tx.send(n);
404 /// }
405 /// };
406 ///
407 /// // Create a stream from the receiver
408 /// let consumer = rx
409 /// .into_stream()
410 /// .for_each(|n| println!("{n}"));
411 ///
412 /// future::zip(producer, consumer).await;
413 /// # });
414 /// ```
415 pub fn into_stream<'item>(self) -> Updates<'item, T>
416 where
417 T: Clone + Send + Sync + 'item,
418 {
419 Updates {
420 inner: Box::pin(stream::unfold(self, async |mut me| {
421 let value = me.recv().await.ok()?;
422 Some((value, me))
423 })),
424 }
425 }
426}
427
428impl<T> Clone for Receiver<T> {
429 fn clone(&self) -> Self {
430 self.shared.rx_count.fetch_add(1, Ordering::Relaxed);
431 Self {
432 shared: self.shared.clone(),
433 last_version: self.last_version,
434 }
435 }
436}
437
438impl<T> Drop for Receiver<T> {
439 fn drop(&mut self) {
440 if self.shared.rx_count.fetch_sub(1, Ordering::Relaxed) == 1 {
441 // Notify all senders.
442 // Even though `Sender` is not `Clone`, it can still wait for
443 // the channel to close from multiple places, since the `closed`
444 // method takes `&self`.
445 self.shared.all_receivers_dropped.notify(usize::MAX);
446 }
447 }
448}
449
450/// Error produced when receiving a change notification.
451#[derive(PartialEq, Eq)]
452pub struct RecvError;
453
454impl fmt::Display for RecvError {
455 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
456 f.write_str("receiving on a closed channel")
457 }
458}
459
460impl fmt::Debug for RecvError {
461 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462 f.write_str("receiving on a closed channel")
463 }
464}
465
466impl error::Error for RecvError {}
467
468/// The stream type from [`into_stream`](Receiver::into_stream) method.
469pub struct Updates<'item, T> {
470 inner: Pin<Box<dyn Stream<Item = T> + Send + Sync + 'item>>,
471}
472
473impl<T> Stream for Updates<'_, T> {
474 type Item = T;
475
476 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
477 self.inner.poll_next(cx)
478 }
479}
480
481#[derive(Debug)]
482struct Shared<T> {
483 /// The most recent value.
484 value: RwLock<T>,
485
486 /// The current state.
487 state: State,
488
489 /// Tracks the number of `Receiver` instances.
490 rx_count: AtomicUsize,
491
492 /// Event when the value has changed or the `Sender` has been dropped.
493 changed: Event,
494
495 /// Event when all `Receiver`s have been dropped.
496 all_receivers_dropped: Event,
497}
498
499impl<T> Shared<T> {
500 fn read_value(&self) -> RwLockReadGuard<'_, T> {
501 // The `RwLock` has no poisoned state
502 match self.value.read() {
503 Ok(guard) => guard,
504 Err(e) => e.into_inner(),
505 }
506 }
507
508 fn write_value(&self) -> RwLockWriteGuard<'_, T> {
509 // The `RwLock` has no poisoned state
510 match self.value.write() {
511 Ok(guard) => guard,
512 Err(e) => e.into_inner(),
513 }
514 }
515}
516
517#[derive(Debug)]
518struct State(AtomicUsize);
519
520impl State {
521 /// Using 2 as the version step preserves the `CLOSED_BIT`.
522 const VERSION_STEP: usize = 2;
523
524 /// The least significant bit signifies a closed channel.
525 const CLOSED_BIT: usize = 1;
526
527 fn new() -> Self {
528 Self(AtomicUsize::new(0))
529 }
530
531 fn increment_version(&self) {
532 self.0.fetch_add(Self::VERSION_STEP, Ordering::Release);
533 }
534
535 fn version_changed(&self, last_version: &mut usize) -> Option<bool> {
536 let state = self.0.load(Ordering::Acquire);
537 let new_version = state & !Self::CLOSED_BIT;
538
539 if *last_version != new_version {
540 *last_version = new_version;
541 return Some(true);
542 }
543
544 if Self::CLOSED_BIT == state & Self::CLOSED_BIT {
545 return None;
546 }
547
548 Some(false)
549 }
550
551 fn close(&self) {
552 self.0.fetch_or(Self::CLOSED_BIT, Ordering::Release);
553 }
554}