jstream_ext/
fuse_on_fail.rs1use crate::op_prelude::*;
2
3pin_project! {
4 #[must_use = "streams do nothing unless polled"]
6 pub struct FuseOnFail<S> {
7 #[pin]
8 src: S,
9 fused: bool,
10 failed: bool,
11 }
12}
13
14impl<S> Stream for FuseOnFail<S>
15where
16 S: TryStream,
17{
18 type Item = Result<S::Ok, S::Error>;
19
20 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
21 let this = self.project();
22 if *this.fused {
23 panic!("poll after fused!")
24 }
25
26 Poll::Ready({
27 if *this.failed {
28 *this.fused = true;
29 None
30 } else {
31 match ready!(this.src.try_poll_next(cx)) {
32 r @ Some(Ok(_)) => r,
33 None => {
34 *this.fused = true;
35 None
36 }
37 Some(Err(err)) => {
38 *this.failed = true;
39 Some(Err(err))
40 }
41 }
42 }
43 })
44 }
45
46 fn size_hint(&self) -> (usize, Option<usize>) {
47 self.src.size_hint()
48 }
49}
50
51impl<S> FusedStream for FuseOnFail<S> where S: TryStream {
52 fn is_terminated(&self) -> bool {
53 self.fused
54 }
55}
56
57#[cfg(feature="sink")]
58impl<S, Item, E> Sink<Item> for FuseOnFail<S>
59where
60 S: TryStream + Sink<Item, Error=E>
61{
62 delegate_sink!(src, E, Item);
63}
64
65impl<S> FuseOnFail<S>
66where
67 S: TryStream,
68{
69 pub(crate) fn new(src: S) -> Self {
70 Self { src, fused: false, failed: false }
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use super::FuseOnFail;
77 use futures::executor::block_on;
78 use futures::TryStreamExt;
79
80 #[test]
81 fn test_fuse_on_fail() {
82 let src = futures::stream::iter(vec![
83 Ok("a"),
84 Ok("b"),
85 Err("oh no!"),
86 Ok("shouldn't be there"),
87 ]);
88
89 let mut lifted = FuseOnFail::new(src);
90
91 assert_eq!(block_on(lifted.try_next()), Ok(Some("a")));
92 assert_eq!(block_on(lifted.try_next()), Ok(Some("b")));
93 assert_eq!(block_on(lifted.try_next()), Err("oh no!"));
94 assert_eq!(block_on(lifted.try_next()), Ok(None));
95 }
96
97 #[test]
98 fn test_fuse_on_none() {
99 let items: Vec<Result<&str, ()>> = vec![
100 Ok("a"),
101 Ok("b"),
102 Ok("hello"),
103 ];
104
105 let src = futures::stream::iter(items);
106
107 let mut lifted = FuseOnFail::new(src);
108
109 assert_eq!(block_on(lifted.try_next()), Ok(Some("a")));
110 assert_eq!(block_on(lifted.try_next()), Ok(Some("b")));
111 assert_eq!(block_on(lifted.try_next()), Ok(Some("hello")));
112 assert_eq!(block_on(lifted.try_next()), Ok(None));
113 }
114}