1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use futures::{
channel::oneshot,
future::{self, FusedFuture},
stream::{self, FusedStream},
task::{Context, Poll},
FutureExt, Stream, StreamExt,
};
use std::{marker::Unpin, pin::Pin};
type Shutdown = oneshot::Receiver<()>;
type FusedShutdown = future::Fuse<Shutdown>;
pub struct ShutdownStream<S> {
shutdown: FusedShutdown,
stream: S,
}
impl<S: Stream> ShutdownStream<stream::Fuse<S>> {
pub fn new(shutdown: Shutdown, stream: S) -> Self {
Self {
shutdown: shutdown.fuse(),
stream: stream.fuse(),
}
}
pub fn from_fused(shutdown: FusedShutdown, stream: stream::Fuse<S>) -> Self {
Self { shutdown, stream }
}
pub fn split(self) -> (FusedShutdown, stream::Fuse<S>) {
(self.shutdown, self.stream)
}
}
impl<S: Stream<Item = T> + FusedStream + Unpin, T> Stream for ShutdownStream<S> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if !self.shutdown.is_terminated() {
if self.shutdown.poll_unpin(cx).is_ready() {
return Poll::Ready(None);
}
if !self.stream.is_terminated() {
return self.stream.poll_next_unpin(cx);
}
}
Poll::Ready(None)
}
}
impl<S: Stream<Item = T> + FusedStream + Unpin, T> FusedStream for ShutdownStream<S> {
fn is_terminated(&self) -> bool {
self.shutdown.is_terminated() || self.stream.is_terminated()
}
}