jstream_ext/
nth.rs

1use crate::op_prelude::*;
2use std::ops::SubAssign;
3
4pin_project! {
5    ///
6    /// Stream for the [`try_nth`](super::JTryStreamExt::try_nth) method
7    ///
8    /// Also supports the [`try_first`](super::JTryStreamExt::try_first) method.
9    ///
10    #[must_use = "streams do nothing unless polled"]
11    pub struct TryStreamNth<S> {
12        #[pin]
13        src: S,
14        fused: bool,
15        remaining: usize,
16    }
17}
18
19impl<S> Future for TryStreamNth<S>
20where
21    S: TryStream,
22{
23    type Output = Result<Option<S::Ok>, S::Error>;
24
25    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
26        let mut this = self.project();
27        if *this.fused {
28            panic!("poll() called after future was already completed...")
29        }
30
31        let remaining: &mut usize = this.remaining;
32
33        Poll::Ready(loop {
34            match ready!(this.src.as_mut().try_poll_next(cx)) {
35                Some(Ok(value)) => {
36                    if *remaining == 0 {
37                        *this.fused = true;
38                        break Ok(Some(value));
39                    } else {
40                        remaining.sub_assign(1);
41                    }
42                },
43                Some(Err(err)) => break Err(err),
44                None => break Ok(None),
45            }
46        })
47    }
48}
49
50#[cfg(feature="sink")]
51impl<S, Item, E> Sink<Item> for TryStreamNth<S>
52where
53    S: TryStream + Sink<Item, Error=E>,
54{
55    delegate_sink!(src, E, Item);
56}
57
58impl<S> TryStreamNth<S>
59where
60    S: TryStream,
61{
62    pub(crate) fn first(src: S) -> Self {
63        Self::new(src, 0)
64    }
65
66    pub(crate) fn new(src: S, remaining: usize) -> Self {
67        Self { src, fused: false, remaining }
68    }
69}
70
71pin_project! {
72    ///
73    /// Stream for the [`nth`](super::JStreamExt::nth) method
74    ///
75    /// Also supports the [`first`](super::JStreamExt::first) method.
76    ///
77    #[must_use = "streams do nothing unless polled"]
78    pub struct StreamNth<S> {
79        #[pin]
80        src: S,
81        fused: bool,
82        remaining: usize,
83    }
84}
85
86impl<S> Future for StreamNth<S>
87where
88    S: Stream
89{
90    type Output = S::Item;
91
92    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
93        let mut this = self.project();
94        Poll::Ready(loop {
95            if let Some(next) = ready!(this.src.as_mut().poll_next(cx)) {
96                if *this.remaining == 0 {
97                    break next;
98                } else {
99                    this.remaining.sub_assign(1);
100                }
101            }
102        })
103    }
104}
105
106#[cfg(feature="sink")]
107impl<S, Item> Sink<Item> for StreamNth<S>
108where
109    S: Stream + Sink<Item>,
110{
111    delegate_sink!(src, S::Error, Item);
112}
113
114impl<S> StreamNth<S>
115where
116    S: Stream
117{
118    pub(crate) fn first(src: S) -> Self {
119        Self::new(src, 0)
120    }
121
122    pub(crate) fn new(src: S, remaining: usize) -> Self {
123        Self { src, fused: false, remaining }
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::TryStreamNth;
130    use futures::executor::block_on;
131
132    #[test]
133    fn test_try_stream_first() {
134        let items: Vec<Result<&str, ()>> = vec![Ok("hello!"), Ok("should not show up")];
135        let src = futures::stream::iter(items);
136        let raised = TryStreamNth::first(src);
137        assert_eq!(block_on(raised), Ok(Some("hello!")));
138    }
139
140    #[test]
141    fn test_try_stream_nothing() {
142        let src = futures::stream::empty::<Result<(), ()>>();
143        let raised = TryStreamNth::first(src);
144        assert_eq!(block_on(raised), Ok(None));
145    }
146}