1use crate::op_prelude::*;
2use std::ops::SubAssign;
3
4pin_project! {
5 #[must_use = "streams do nothing unless polled"]
11 pub struct TryStreamNth<S> {
12 #[pin]
13 src: S,
14 fused: bool,
15 remaining: usize,
16 }
17}
18
19impl<S> Future for TryStreamNth<S>
20where
21 S: TryStream,
22{
23 type Output = Result<Option<S::Ok>, S::Error>;
24
25 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
26 let mut this = self.project();
27 if *this.fused {
28 panic!("poll() called after future was already completed...")
29 }
30
31 let remaining: &mut usize = this.remaining;
32
33 Poll::Ready(loop {
34 match ready!(this.src.as_mut().try_poll_next(cx)) {
35 Some(Ok(value)) => {
36 if *remaining == 0 {
37 *this.fused = true;
38 break Ok(Some(value));
39 } else {
40 remaining.sub_assign(1);
41 }
42 },
43 Some(Err(err)) => break Err(err),
44 None => break Ok(None),
45 }
46 })
47 }
48}
49
50#[cfg(feature="sink")]
51impl<S, Item, E> Sink<Item> for TryStreamNth<S>
52where
53 S: TryStream + Sink<Item, Error=E>,
54{
55 delegate_sink!(src, E, Item);
56}
57
58impl<S> TryStreamNth<S>
59where
60 S: TryStream,
61{
62 pub(crate) fn first(src: S) -> Self {
63 Self::new(src, 0)
64 }
65
66 pub(crate) fn new(src: S, remaining: usize) -> Self {
67 Self { src, fused: false, remaining }
68 }
69}
70
71pin_project! {
72 #[must_use = "streams do nothing unless polled"]
78 pub struct StreamNth<S> {
79 #[pin]
80 src: S,
81 fused: bool,
82 remaining: usize,
83 }
84}
85
86impl<S> Future for StreamNth<S>
87where
88 S: Stream
89{
90 type Output = S::Item;
91
92 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
93 let mut this = self.project();
94 Poll::Ready(loop {
95 if let Some(next) = ready!(this.src.as_mut().poll_next(cx)) {
96 if *this.remaining == 0 {
97 break next;
98 } else {
99 this.remaining.sub_assign(1);
100 }
101 }
102 })
103 }
104}
105
106#[cfg(feature="sink")]
107impl<S, Item> Sink<Item> for StreamNth<S>
108where
109 S: Stream + Sink<Item>,
110{
111 delegate_sink!(src, S::Error, Item);
112}
113
114impl<S> StreamNth<S>
115where
116 S: Stream
117{
118 pub(crate) fn first(src: S) -> Self {
119 Self::new(src, 0)
120 }
121
122 pub(crate) fn new(src: S, remaining: usize) -> Self {
123 Self { src, fused: false, remaining }
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::TryStreamNth;
130 use futures::executor::block_on;
131
132 #[test]
133 fn test_try_stream_first() {
134 let items: Vec<Result<&str, ()>> = vec![Ok("hello!"), Ok("should not show up")];
135 let src = futures::stream::iter(items);
136 let raised = TryStreamNth::first(src);
137 assert_eq!(block_on(raised), Ok(Some("hello!")));
138 }
139
140 #[test]
141 fn test_try_stream_nothing() {
142 let src = futures::stream::empty::<Result<(), ()>>();
143 let raised = TryStreamNth::first(src);
144 assert_eq!(block_on(raised), Ok(None));
145 }
146}