bee_runtime/
shutdown_stream.rs

1// Copyright 2020-2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! A module to simplify selecting between a shutdown signal and a stream.
5//!
6//! The `ShutdownStream` type can be used to replace this pattern:
7//! ```ignore
8//! loop {
9//!     select! {
10//!         _ = shutdown => break,
11//!         item = stream.next() => { /* actual logic */ },
12//!     }
13//! }
14//! ```
15//! by this one:
16//! ```ignore
17//! let mut shutdown_stream = ShutdownStream::new(shutdown, stream);
18//!
19//! while let Some(item) = shutdown_stream.next().await {
20//!     /* actual logic */
21//! }
22//! ```
23
24use std::{marker::Unpin, pin::Pin};
25
26use futures::{
27    channel::oneshot,
28    future::{self, FusedFuture},
29    stream::{self, FusedStream},
30    task::{Context, Poll},
31    FutureExt, Stream, StreamExt,
32};
33
34type Shutdown = oneshot::Receiver<()>;
35type FusedShutdown = future::Fuse<Shutdown>;
36
37/// A stream with a shutdown.
38///
39/// This type wraps a shutdown receiver and a stream to produce a new stream that ends when the
40/// shutdown receiver is triggered or when the stream ends.
41pub struct ShutdownStream<S> {
42    shutdown: FusedShutdown,
43    stream: S,
44}
45
46impl<S: Stream> ShutdownStream<stream::Fuse<S>> {
47    /// Create a new `ShutdownStream` from a shutdown receiver and an unfused stream.
48    ///
49    /// This method receives the stream to be wrapped and a `oneshot::Receiver` for the shutdown.
50    /// Both the stream and the shutdown receiver are fused to avoid polling already completed
51    /// futures.
52    pub fn new(shutdown: Shutdown, stream: S) -> Self {
53        Self {
54            shutdown: shutdown.fuse(),
55            stream: stream.fuse(),
56        }
57    }
58
59    /// Create a new `ShutdownStream` from a fused shutdown receiver and a fused stream.
60    ///
61    /// This method receives the fused stream to be wrapped and a fused `oneshot::Receiver` for the shutdown.
62    pub fn from_fused(shutdown: FusedShutdown, stream: stream::Fuse<S>) -> Self {
63        Self { shutdown, stream }
64    }
65
66    /// Consume and split the `ShutdownStream` into its shutdown receiver and stream.
67    pub fn split(self) -> (FusedShutdown, stream::Fuse<S>) {
68        (self.shutdown, self.stream)
69    }
70}
71
72impl<S: Stream<Item = T> + FusedStream + Unpin, T> Stream for ShutdownStream<S> {
73    type Item = T;
74    /// The shutdown receiver is polled first, if it is not ready, the stream is polled. This
75    /// guarantees that checking for shutdown always happens first.
76    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
77        if !self.shutdown.is_terminated() {
78            if self.shutdown.poll_unpin(cx).is_ready() {
79                return Poll::Ready(None);
80            }
81
82            if !self.stream.is_terminated() {
83                return self.stream.poll_next_unpin(cx);
84            }
85        }
86
87        Poll::Ready(None)
88    }
89}
90
91impl<S: Stream<Item = T> + FusedStream + Unpin, T> FusedStream for ShutdownStream<S> {
92    fn is_terminated(&self) -> bool {
93        self.shutdown.is_terminated() || self.stream.is_terminated()
94    }
95}