jstream_ext/
fuse_on_fail.rs

1use crate::op_prelude::*;
2
3pin_project! {
4    /// Stream for the [`fuse_on_fail`](super::JTryStreamExt::fuse_on_fail) method
5    #[must_use = "streams do nothing unless polled"]
6    pub struct FuseOnFail<S> {
7        #[pin]
8        src: S,
9        fused: bool,
10        failed: bool,
11    }
12}
13
14impl<S> Stream for FuseOnFail<S>
15where
16    S: TryStream,
17{
18    type Item = Result<S::Ok, S::Error>;
19
20    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
21        let this = self.project();
22        if *this.fused {
23            panic!("poll after fused!")
24        }
25
26        Poll::Ready({
27            if *this.failed {
28                *this.fused = true;
29                None
30            } else {
31                match ready!(this.src.try_poll_next(cx)) {
32                    r @ Some(Ok(_)) => r,
33                    None => {
34                        *this.fused = true;
35                        None
36                    }
37                    Some(Err(err)) => {
38                        *this.failed = true;
39                        Some(Err(err))
40                    }
41                }
42            }
43        })
44    }
45
46    fn size_hint(&self) -> (usize, Option<usize>) {
47        self.src.size_hint()
48    }
49}
50
51impl<S> FusedStream for FuseOnFail<S> where S: TryStream {
52    fn is_terminated(&self) -> bool {
53        self.fused
54    }
55}
56
57#[cfg(feature="sink")]
58impl<S, Item, E> Sink<Item> for FuseOnFail<S>
59where
60    S: TryStream + Sink<Item, Error=E>
61{
62    delegate_sink!(src, E, Item);
63}
64
65impl<S> FuseOnFail<S>
66where
67    S: TryStream,
68{
69    pub(crate) fn new(src: S) -> Self {
70        Self { src, fused: false, failed: false }
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::FuseOnFail;
77    use futures::executor::block_on;
78    use futures::TryStreamExt;
79
80    #[test]
81    fn test_fuse_on_fail() {
82        let src = futures::stream::iter(vec![
83            Ok("a"),
84            Ok("b"),
85            Err("oh no!"),
86            Ok("shouldn't be there"),
87        ]);
88
89        let mut lifted = FuseOnFail::new(src);
90
91        assert_eq!(block_on(lifted.try_next()), Ok(Some("a")));
92        assert_eq!(block_on(lifted.try_next()), Ok(Some("b")));
93        assert_eq!(block_on(lifted.try_next()), Err("oh no!"));
94        assert_eq!(block_on(lifted.try_next()), Ok(None));
95    }
96
97    #[test]
98    fn test_fuse_on_none() {
99        let items: Vec<Result<&str, ()>> = vec![
100            Ok("a"),
101            Ok("b"),
102            Ok("hello"),
103        ];
104
105        let src = futures::stream::iter(items);
106
107        let mut lifted = FuseOnFail::new(src);
108
109        assert_eq!(block_on(lifted.try_next()), Ok(Some("a")));
110        assert_eq!(block_on(lifted.try_next()), Ok(Some("b")));
111        assert_eq!(block_on(lifted.try_next()), Ok(Some("hello")));
112        assert_eq!(block_on(lifted.try_next()), Ok(None));
113    }
114}