infinite_stream/
try_from_future.rs1use crate::internal_prelude::*;
2
3pub fn try_from_future<T, E, S: InfiniteStream<Item = Result<T, E>>, Fut: Future<Output = Result<S, E>>>(fut: Fut) -> TryFromFuture<S, Fut> {
4 assert_infinite_stream::<Result<T, E>, _>(TryFromFuture { state: TryFromFutureState::Init(fut) })
5}
6
7#[pin_project]
8#[must_use = "streams do nothing unless polled"]
9pub struct TryFromFuture<S: InfiniteStream, Fut> {
10 #[pin]
11 state: TryFromFutureState<S, Fut>,
12}
13
14#[pin_project(project = TryFromFutureStateProj)]
15enum TryFromFutureState<S, Fut> {
16 Init(#[pin] Fut),
17 Stream(#[pin] S),
18 Empty,
19}
20
21impl<T, E, S: InfiniteStream<Item = Result<T, E>>, Fut: Future<Output = Result<S, E>>> InfiniteStream for TryFromFuture<S, Fut> {
22 type Item = Result<T, E>;
23
24 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
25 let mut this = self.project();
26 let stream = match this.state.as_mut().project() {
27 TryFromFutureStateProj::Init(fut) => match ready!(fut.poll(cx)) {
28 Ok(stream) => {
29 this.state.set(TryFromFutureState::Stream(stream));
30 match this.state.as_mut().project() {
31 TryFromFutureStateProj::Stream(stream) => stream,
32 _ => unreachable!(),
33 }
34 }
35 Err(e) => {
36 this.state.set(TryFromFutureState::Empty);
37 return Poll::Ready(Err(e))
38 }
39 },
40 TryFromFutureStateProj::Stream(stream) => stream,
41 TryFromFutureStateProj::Empty => return Poll::Pending,
42 };
43 stream.poll_next(cx)
44 }
45}