tokio_stream_util/try_stream/ext/
and_then.rs1use super::{FusedStream, TryStream};
2#[cfg(feature = "sink")]
3use async_sink::Sink;
4use core::fmt;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use futures_core::future::TryFuture;
8use tokio_stream::Stream;
9
10#[must_use = "streams do nothing unless polled"]
12pub struct AndThen<St, Fut, F> {
13 stream: St,
14 future: Option<Fut>,
15 f: F,
16}
17
18impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F>
19where
20 St: fmt::Debug,
21 Fut: fmt::Debug,
22{
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 f.debug_struct("AndThen")
25 .field("stream", &self.stream)
26 .field("future", &self.future)
27 .finish()
28 }
29}
30
31impl<St, Fut, F> AndThen<St, Fut, F>
32where
33 St: TryStream,
34 F: FnMut(St::Ok) -> Fut,
35 Fut: TryFuture<Error = St::Error>,
36{
37 pub(super) fn new(stream: St, f: F) -> Self {
38 Self {
39 stream,
40 future: None,
41 f,
42 }
43 }
44}
45
46impl<St, Fut, F> AndThen<St, Fut, F> {
47 pub fn get_ref(&self) -> &St {
50 &self.stream
51 }
52
53 pub fn get_mut(&mut self) -> &mut St {
59 &mut self.stream
60 }
61
62 pub fn into_inner(self) -> St {
67 self.stream
68 }
69}
70
71impl<St, Fut, F> Stream for AndThen<St, Fut, F>
72where
73 St: TryStream,
74 F: FnMut(St::Ok) -> Fut,
75 Fut: TryFuture<Error = St::Error>,
76{
77 type Item = Result<Fut::Ok, St::Error>;
78
79 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80 let this = unsafe { self.get_unchecked_mut() };
81 let mut stream = unsafe { Pin::new_unchecked(&mut this.stream) };
82 let mut future = unsafe { Pin::new_unchecked(&mut this.future) };
83 let f = &mut this.f;
84
85 loop {
86 if let Some(fut) = future.as_mut().as_pin_mut() {
87 let item = match fut.try_poll(cx) {
88 Poll::Ready(result) => result,
89 Poll::Pending => return Poll::Pending,
90 };
91 future.set(None);
92 return Poll::Ready(Some(item));
93 }
94
95 let next_item_res = match stream.as_mut().try_poll_next(cx) {
96 Poll::Ready(res) => res,
97 Poll::Pending => return Poll::Pending,
98 };
99
100 match next_item_res {
101 Some(Ok(item)) => {
102 future.set(Some(f(item)));
103 }
104 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
105 None => return Poll::Ready(None),
106 }
107 }
108 }
109
110 fn size_hint(&self) -> (usize, Option<usize>) {
111 let future_len = if self.future.is_some() { 1 } else { 0 };
112 let (lower, upper) = self.stream.size_hint();
113 let lower = lower.saturating_add(future_len);
114 let upper = match upper {
115 Some(x) => x.checked_add(future_len),
116 None => None,
117 };
118 (lower, upper)
119 }
120}
121
122impl<St, Fut, F> FusedStream for AndThen<St, Fut, F>
123where
124 St: TryStream + FusedStream,
125 F: FnMut(St::Ok) -> Fut,
126 Fut: TryFuture<Error = St::Error>,
127{
128 fn is_terminated(&self) -> bool {
129 self.future.is_none() && self.stream.is_terminated()
130 }
131}
132
133#[cfg(feature = "sink")]
135impl<St, Fut, F, Item> Sink<Item> for AndThen<St, Fut, F>
136where
137 St: Sink<Item>,
138{
139 type Error = St::Error;
140
141 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
142 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().stream) }.poll_ready(cx)
143 }
144
145 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
146 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().stream) }.start_send(item)
147 }
148
149 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
150 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().stream) }.poll_flush(cx)
151 }
152
153 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
154 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().stream) }.poll_close(cx)
155 }
156}