Skip to main content

infinite_stream/
try_from_future.rs

1use crate::internal_prelude::*;
2
3pub fn try_from_future<T, E, S: InfiniteStream<Item = Result<T, E>>, Fut: Future<Output = Result<S, E>>>(fut: Fut) -> TryFromFuture<S, Fut> {
4    assert_infinite_stream::<Result<T, E>, _>(TryFromFuture { state: TryFromFutureState::Init(fut) })
5}
6
7#[pin_project]
8#[must_use = "streams do nothing unless polled"]
9pub struct TryFromFuture<S: InfiniteStream, Fut> {
10    #[pin]
11    state: TryFromFutureState<S, Fut>,
12}
13
14#[pin_project(project = TryFromFutureStateProj)]
15enum TryFromFutureState<S, Fut> {
16    Init(#[pin] Fut),
17    Stream(#[pin] S),
18    Empty,
19}
20
21impl<T, E, S: InfiniteStream<Item = Result<T, E>>, Fut: Future<Output = Result<S, E>>> InfiniteStream for TryFromFuture<S, Fut> {
22    type Item = Result<T, E>;
23
24    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
25        let mut this = self.project();
26        let stream = match this.state.as_mut().project() {
27            TryFromFutureStateProj::Init(fut) => match ready!(fut.poll(cx)) {
28                Ok(stream) => {
29                    this.state.set(TryFromFutureState::Stream(stream));
30                    match this.state.as_mut().project() {
31                        TryFromFutureStateProj::Stream(stream) => stream,
32                        _ => unreachable!(),
33                    }
34                }
35                Err(e) => {
36                    this.state.set(TryFromFutureState::Empty);
37                    return Poll::Ready(Err(e))
38                }
39            },
40            TryFromFutureStateProj::Stream(stream) => stream,
41            TryFromFutureStateProj::Empty => return Poll::Pending,
42        };
43        stream.poll_next(cx)
44    }
45}