use crate::thr::wake::WakeRoot;
use core::{
future::Future,
iter::FusedIterator,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use futures::stream::Stream;
pub trait FutureRootExt: Future {
fn root_wait(self) -> Self::Output;
}
pub trait StreamRootExt<'a>: Stream {
fn root_wait(self) -> StreamRootWait<'a, Self>
where
Self: Sized;
}
pub struct StreamRootWait<'a, T: Stream> {
stream: T,
exhausted: bool,
_marker: PhantomData<&'a &'a mut ()>,
}
impl<T: Future> FutureRootExt for T {
fn root_wait(mut self) -> Self::Output {
let waker = WakeRoot::new().to_waker();
let mut cx = Context::from_waker(&waker);
loop {
match unsafe { Pin::new_unchecked(&mut self) }.poll(&mut cx) {
Poll::Pending => WakeRoot::wait(),
Poll::Ready(value) => break value,
}
}
}
}
impl<'a, T: Stream> StreamRootExt<'a> for T {
#[inline]
fn root_wait(self) -> StreamRootWait<'a, Self>
where
Self: Sized,
{
StreamRootWait { stream: self, exhausted: false, _marker: PhantomData }
}
}
impl<'a, T: Stream> Iterator for StreamRootWait<'a, T> {
type Item = T::Item;
fn next(&mut self) -> Option<Self::Item> {
if self.exhausted {
return None;
}
let waker = WakeRoot::new().to_waker();
let mut cx = Context::from_waker(&waker);
loop {
match unsafe { Pin::new_unchecked(&mut self.stream) }.poll_next(&mut cx) {
Poll::Pending => WakeRoot::wait(),
Poll::Ready(Some(item)) => break Some(item),
Poll::Ready(None) => {
self.exhausted = true;
break None;
}
}
}
}
}
impl<'a, T: Stream> FusedIterator for StreamRootWait<'a, T> {}