transform_stream/
try_stream_impl.rs

1use crate::scope::enter_scope;
2use crate::{next_id, Yielder};
3
4use std::future::Future;
5use std::marker::PhantomData;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures_core::{FusedStream, Stream};
10
11/// Asynchronous stream of results
12#[derive(Debug)]
13pub struct AsyncTryStream<T, E, G> {
14    id: u64,
15    done: bool,
16    err: Option<E>,
17    gen: G,
18    _marker: PhantomData<Result<T, E>>,
19}
20
21impl<T, E, G> AsyncTryStream<T, E, G> {
22    /// Constructs an [`AsyncTryStream`] by a factory function which returns a future.
23    pub fn new<F>(f: F) -> Self
24    where
25        F: FnOnce(Yielder<Result<T, E>>) -> G,
26    {
27        let id = next_id();
28        let gen = f(Yielder::new(id));
29        Self {
30            id,
31            done: false,
32            err: None,
33            gen,
34            _marker: PhantomData,
35        }
36    }
37}
38
39impl<T, E, G> Stream for AsyncTryStream<T, E, G>
40where
41    G: Future<Output = Result<(), E>>,
42{
43    type Item = Result<T, E>;
44
45    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46        let this = unsafe { self.get_unchecked_mut() };
47        if this.done {
48            return Poll::Ready(this.err.take().map(Err));
49        }
50
51        let mut place: Option<Result<T, E>> = None;
52        enter_scope(this.id, &mut place, || {
53            let gen = unsafe { Pin::new_unchecked(&mut this.gen) };
54            if let Poll::Ready(ret) = gen.poll(cx) {
55                this.done = true;
56                if let Err(e) = ret {
57                    this.err = Some(e)
58                }
59            }
60        });
61
62        if place.is_some() {
63            return Poll::Ready(place);
64        }
65
66        if this.done {
67            return Poll::Ready(this.err.take().map(Err));
68        }
69        Poll::Pending
70    }
71}
72
73impl<T, E, G> FusedStream for AsyncTryStream<T, E, G>
74where
75    G: Future<Output = Result<(), E>>,
76{
77    fn is_terminated(&self) -> bool {
78        self.done && self.err.is_none()
79    }
80}