bee_runtime/shutdown_stream.rs
1// Copyright 2020-2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! A module to simplify selecting between a shutdown signal and a stream.
5//!
6//! The `ShutdownStream` type can be used to replace this pattern:
7//! ```ignore
8//! loop {
9//! select! {
10//! _ = shutdown => break,
11//! item = stream.next() => { /* actual logic */ },
12//! }
13//! }
14//! ```
15//! by this one:
16//! ```ignore
17//! let mut shutdown_stream = ShutdownStream::new(shutdown, stream);
18//!
19//! while let Some(item) = shutdown_stream.next().await {
20//! /* actual logic */
21//! }
22//! ```
23
24use std::{marker::Unpin, pin::Pin};
25
26use futures::{
27 channel::oneshot,
28 future::{self, FusedFuture},
29 stream::{self, FusedStream},
30 task::{Context, Poll},
31 FutureExt, Stream, StreamExt,
32};
33
34type Shutdown = oneshot::Receiver<()>;
35type FusedShutdown = future::Fuse<Shutdown>;
36
37/// A stream with a shutdown.
38///
39/// This type wraps a shutdown receiver and a stream to produce a new stream that ends when the
40/// shutdown receiver is triggered or when the stream ends.
41pub struct ShutdownStream<S> {
42 shutdown: FusedShutdown,
43 stream: S,
44}
45
46impl<S: Stream> ShutdownStream<stream::Fuse<S>> {
47 /// Create a new `ShutdownStream` from a shutdown receiver and an unfused stream.
48 ///
49 /// This method receives the stream to be wrapped and a `oneshot::Receiver` for the shutdown.
50 /// Both the stream and the shutdown receiver are fused to avoid polling already completed
51 /// futures.
52 pub fn new(shutdown: Shutdown, stream: S) -> Self {
53 Self {
54 shutdown: shutdown.fuse(),
55 stream: stream.fuse(),
56 }
57 }
58
59 /// Create a new `ShutdownStream` from a fused shutdown receiver and a fused stream.
60 ///
61 /// This method receives the fused stream to be wrapped and a fused `oneshot::Receiver` for the shutdown.
62 pub fn from_fused(shutdown: FusedShutdown, stream: stream::Fuse<S>) -> Self {
63 Self { shutdown, stream }
64 }
65
66 /// Consume and split the `ShutdownStream` into its shutdown receiver and stream.
67 pub fn split(self) -> (FusedShutdown, stream::Fuse<S>) {
68 (self.shutdown, self.stream)
69 }
70}
71
72impl<S: Stream<Item = T> + FusedStream + Unpin, T> Stream for ShutdownStream<S> {
73 type Item = T;
74 /// The shutdown receiver is polled first, if it is not ready, the stream is polled. This
75 /// guarantees that checking for shutdown always happens first.
76 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
77 if !self.shutdown.is_terminated() {
78 if self.shutdown.poll_unpin(cx).is_ready() {
79 return Poll::Ready(None);
80 }
81
82 if !self.stream.is_terminated() {
83 return self.stream.poll_next_unpin(cx);
84 }
85 }
86
87 Poll::Ready(None)
88 }
89}
90
91impl<S: Stream<Item = T> + FusedStream + Unpin, T> FusedStream for ShutdownStream<S> {
92 fn is_terminated(&self) -> bool {
93 self.shutdown.is_terminated() || self.stream.is_terminated()
94 }
95}