1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
use futures::stream::Stream; use futures::task::waker; use pin_project::pin_project; use std::pin::Pin; use std::sync::Arc; use std::task; use crate::{Effect, Effectful, Event}; #[derive(Debug)] pub struct Item<T>(T); impl<T> Item<T> { pub fn new(v: T) -> Self { Self(v) } pub fn into_inner(self) -> T { self.0 } } #[pin_project] #[derive(Debug)] pub struct IntoStream<C>(#[pin] C); impl<C> Stream for IntoStream<C> where C: Effectful<Output = ()>, { type Item = C::Effect; fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> { let notify = Arc::new(super::FutureNotify { waker: cx.waker().clone(), }); let cx = crate::Context::from_notify(notify); let comp = self.project().0; match comp.poll(&cx) { crate::Poll::Event(Event::Complete(())) => task::Poll::Ready(None), crate::Poll::Event(Event::Effect(e)) => task::Poll::Ready(Some(e)), crate::Poll::Pending => task::Poll::Pending, } } } impl<T> Effect for Item<T> { type Output = (); } #[pin_project] #[derive(Debug)] pub struct FromStream<S>(#[pin] S); impl<S> Effectful for FromStream<S> where S: Stream, { type Output = (); type Effect = Item<S::Item>; fn poll(self: Pin<&mut Self>, cx: &crate::Context) -> crate::Poll<Self::Output, Self::Effect> { let waker = waker(Arc::new(cx.clone())); let mut cx = task::Context::from_waker(&waker); let stream = self.project().0; match stream.poll_next(&mut cx) { task::Poll::Ready(Some(v)) => crate::Poll::effect(Item(v)), task::Poll::Ready(None) => crate::Poll::complete(()), task::Poll::Pending => crate::Poll::Pending, } } } pub fn from_stream<S>(s: S) -> FromStream<S> { FromStream(s) }