transform_stream/
stream_impl.rs

1use crate::scope::enter_scope;
2use crate::{next_id, Yielder};
3
4use std::future::Future;
5use std::marker::PhantomData;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures_core::{FusedStream, Stream};
10
11/// Asynchronous stream of items
12#[derive(Debug)]
13pub struct AsyncStream<T, G> {
14    id: u64,
15    done: bool,
16    gen: G,
17    _marker: PhantomData<T>,
18}
19
20impl<T, G> AsyncStream<T, G> {
21    /// Constructs an [`AsyncStream`] by a factory function which returns a future.
22    pub fn new<F>(f: F) -> Self
23    where
24        F: FnOnce(Yielder<T>) -> G,
25    {
26        let id = next_id();
27        let gen = f(Yielder::new(id));
28        Self {
29            id,
30            done: false,
31            gen,
32            _marker: PhantomData,
33        }
34    }
35}
36
37impl<T, G> Stream for AsyncStream<T, G>
38where
39    G: Future<Output = ()>,
40{
41    type Item = T;
42
43    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44        let this = unsafe { self.get_unchecked_mut() };
45        if this.done {
46            return Poll::Ready(None);
47        }
48
49        let mut place: Option<T> = None;
50        enter_scope(this.id, &mut place, || {
51            let gen = unsafe { Pin::new_unchecked(&mut this.gen) };
52            if let Poll::Ready(()) = gen.poll(cx) {
53                this.done = true;
54            }
55        });
56
57        if place.is_some() {
58            return Poll::Ready(place);
59        }
60
61        if this.done {
62            Poll::Ready(None)
63        } else {
64            Poll::Pending
65        }
66    }
67}
68
69impl<T, G> FusedStream for AsyncStream<T, G>
70where
71    G: Future<Output = ()>,
72{
73    fn is_terminated(&self) -> bool {
74        self.done
75    }
76}