tokio_stream_util/try_stream/ext/
and_then.rs

1use super::{FusedStream, TryStream};
2#[cfg(feature = "sink")]
3use async_sink::Sink;
4use core::fmt;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use futures_core::future::TryFuture;
8use tokio_stream::Stream;
9
10/// Stream for the [`and_then`](super::TryStreamExt::and_then) method.
11#[must_use = "streams do nothing unless polled"]
12pub struct AndThen<St, Fut, F> {
13    stream: St,
14    future: Option<Fut>,
15    f: F,
16}
17
18impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F>
19where
20    St: fmt::Debug,
21    Fut: fmt::Debug,
22{
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        f.debug_struct("AndThen")
25            .field("stream", &self.stream)
26            .field("future", &self.future)
27            .finish()
28    }
29}
30
31impl<St, Fut, F> AndThen<St, Fut, F>
32where
33    St: TryStream,
34    F: FnMut(St::Ok) -> Fut,
35    Fut: TryFuture<Error = St::Error>,
36{
37    pub(super) fn new(stream: St, f: F) -> Self {
38        Self {
39            stream,
40            future: None,
41            f,
42        }
43    }
44}
45
46impl<St, Fut, F> AndThen<St, Fut, F> {
47    /// Acquires a reference to the underlying stream that this combinator is
48    /// pulling from.
49    pub fn get_ref(&self) -> &St {
50        &self.stream
51    }
52
53    /// Acquires a mutable reference to the underlying stream that this
54    /// combinator is pulling from.
55    ///
56    /// Note that care must be taken to avoid tampering with the state of the
57    /// stream which may otherwise confuse this combinator.
58    pub fn get_mut(&mut self) -> &mut St {
59        &mut self.stream
60    }
61
62    /// Consumes this combinator, returning the underlying stream.
63    ///
64    /// Note that this may discard intermediate state of this combinator, so
65    /// care should be taken to avoid losing resources when this is called.
66    pub fn into_inner(self) -> St {
67        self.stream
68    }
69}
70
71impl<St, Fut, F> Stream for AndThen<St, Fut, F>
72where
73    St: TryStream,
74    F: FnMut(St::Ok) -> Fut,
75    Fut: TryFuture<Error = St::Error>,
76{
77    type Item = Result<Fut::Ok, St::Error>;
78
79    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80        let this = unsafe { self.get_unchecked_mut() };
81        let mut stream = unsafe { Pin::new_unchecked(&mut this.stream) };
82        let mut future = unsafe { Pin::new_unchecked(&mut this.future) };
83        let f = &mut this.f;
84
85        loop {
86            if let Some(fut) = future.as_mut().as_pin_mut() {
87                let item = match fut.try_poll(cx) {
88                    Poll::Ready(result) => result,
89                    Poll::Pending => return Poll::Pending,
90                };
91                future.set(None);
92                return Poll::Ready(Some(item));
93            }
94
95            let next_item_res = match stream.as_mut().try_poll_next(cx) {
96                Poll::Ready(res) => res,
97                Poll::Pending => return Poll::Pending,
98            };
99
100            match next_item_res {
101                Some(Ok(item)) => {
102                    future.set(Some(f(item)));
103                }
104                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
105                None => return Poll::Ready(None),
106            }
107        }
108    }
109
110    fn size_hint(&self) -> (usize, Option<usize>) {
111        let future_len = if self.future.is_some() { 1 } else { 0 };
112        let (lower, upper) = self.stream.size_hint();
113        let lower = lower.saturating_add(future_len);
114        let upper = match upper {
115            Some(x) => x.checked_add(future_len),
116            None => None,
117        };
118        (lower, upper)
119    }
120}
121
122impl<St, Fut, F> FusedStream for AndThen<St, Fut, F>
123where
124    St: TryStream + FusedStream,
125    F: FnMut(St::Ok) -> Fut,
126    Fut: TryFuture<Error = St::Error>,
127{
128    fn is_terminated(&self) -> bool {
129        self.future.is_none() && self.stream.is_terminated()
130    }
131}
132
133// Forwarding impl of Sink from the underlying stream
134#[cfg(feature = "sink")]
135impl<St, Fut, F, Item> Sink<Item> for AndThen<St, Fut, F>
136where
137    St: Sink<Item>,
138{
139    type Error = St::Error;
140
141    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
142        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().stream) }.poll_ready(cx)
143    }
144
145    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
146        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().stream) }.start_send(item)
147    }
148
149    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
150        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().stream) }.poll_flush(cx)
151    }
152
153    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
154        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().stream) }.poll_close(cx)
155    }
156}