1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
// Copyright 2020-2021 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0
//! A module to simplify selecting between a shutdown signal and a stream.
//!
//! The `ShutdownStream` type can be used to replace this pattern:
//! ```ignore
//! loop {
//! select! {
//! _ = shutdown => break,
//! item = stream.next() => { /* actual logic */ },
//! }
//! }
//! ```
//! by this one:
//! ```ignore
//! let mut shutdown_stream = ShutdownStream::new(shutdown, stream);
//!
//! while let Some(item) = shutdown_stream.next().await {
//! /* actual logic */
//! }
//! ```
use std::{marker::Unpin, pin::Pin};
use futures::{
channel::oneshot,
future::{self, FusedFuture},
stream::{self, FusedStream},
task::{Context, Poll},
FutureExt, Stream, StreamExt,
};
type Shutdown = oneshot::Receiver<()>;
type FusedShutdown = future::Fuse<Shutdown>;
/// A stream with a shutdown.
///
/// This type wraps a shutdown receiver and a stream to produce a new stream that ends when the
/// shutdown receiver is triggered or when the stream ends.
pub struct ShutdownStream<S> {
shutdown: FusedShutdown,
stream: S,
}
impl<S: Stream> ShutdownStream<stream::Fuse<S>> {
/// Create a new `ShutdownStream` from a shutdown receiver and an unfused stream.
///
/// This method receives the stream to be wrapped and a `oneshot::Receiver` for the shutdown.
/// Both the stream and the shutdown receiver are fused to avoid polling already completed
/// futures.
pub fn new(shutdown: Shutdown, stream: S) -> Self {
Self {
shutdown: shutdown.fuse(),
stream: stream.fuse(),
}
}
/// Create a new `ShutdownStream` from a fused shutdown receiver and a fused stream.
///
/// This method receives the fused stream to be wrapped and a fused `oneshot::Receiver` for the shutdown.
pub fn from_fused(shutdown: FusedShutdown, stream: stream::Fuse<S>) -> Self {
Self { shutdown, stream }
}
/// Consume and split the `ShutdownStream` into its shutdown receiver and stream.
pub fn split(self) -> (FusedShutdown, stream::Fuse<S>) {
(self.shutdown, self.stream)
}
}
impl<S: Stream<Item = T> + FusedStream + Unpin, T> Stream for ShutdownStream<S> {
type Item = T;
/// The shutdown receiver is polled first, if it is not ready, the stream is polled. This
/// guarantees that checking for shutdown always happens first.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if !self.shutdown.is_terminated() {
if self.shutdown.poll_unpin(cx).is_ready() {
return Poll::Ready(None);
}
if !self.stream.is_terminated() {
return self.stream.poll_next_unpin(cx);
}
}
Poll::Ready(None)
}
}
impl<S: Stream<Item = T> + FusedStream + Unpin, T> FusedStream for ShutdownStream<S> {
fn is_terminated(&self) -> bool {
self.shutdown.is_terminated() || self.stream.is_terminated()
}
}