transform_stream/
stream_impl.rs1use crate::scope::enter_scope;
2use crate::{next_id, Yielder};
3
4use std::future::Future;
5use std::marker::PhantomData;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures_core::{FusedStream, Stream};
10
11#[derive(Debug)]
13pub struct AsyncStream<T, G> {
14 id: u64,
15 done: bool,
16 gen: G,
17 _marker: PhantomData<T>,
18}
19
20impl<T, G> AsyncStream<T, G> {
21 pub fn new<F>(f: F) -> Self
23 where
24 F: FnOnce(Yielder<T>) -> G,
25 {
26 let id = next_id();
27 let gen = f(Yielder::new(id));
28 Self {
29 id,
30 done: false,
31 gen,
32 _marker: PhantomData,
33 }
34 }
35}
36
37impl<T, G> Stream for AsyncStream<T, G>
38where
39 G: Future<Output = ()>,
40{
41 type Item = T;
42
43 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44 let this = unsafe { self.get_unchecked_mut() };
45 if this.done {
46 return Poll::Ready(None);
47 }
48
49 let mut place: Option<T> = None;
50 enter_scope(this.id, &mut place, || {
51 let gen = unsafe { Pin::new_unchecked(&mut this.gen) };
52 if let Poll::Ready(()) = gen.poll(cx) {
53 this.done = true;
54 }
55 });
56
57 if place.is_some() {
58 return Poll::Ready(place);
59 }
60
61 if this.done {
62 Poll::Ready(None)
63 } else {
64 Poll::Pending
65 }
66 }
67}
68
69impl<T, G> FusedStream for AsyncStream<T, G>
70where
71 G: Future<Output = ()>,
72{
73 fn is_terminated(&self) -> bool {
74 self.done
75 }
76}