use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures_core::future::BoxFuture;
use futures_core::stream::Stream;
use futures_core::FusedFuture;
use futures_util::future::Fuse;
use futures_util::FutureExt;
use crate::error::Error;
pub struct TryAsyncStream<'a, T> {
yielder: Yielder<T>,
future: Fuse<BoxFuture<'a, Result<(), Error>>>,
}
impl<'a, T> TryAsyncStream<'a, T> {
pub fn new<F, Fut>(f: F) -> Self
where
F: FnOnce(Yielder<T>) -> Fut + Send,
Fut: 'a + Future<Output = Result<(), Error>> + Send,
T: 'a + Send,
{
let yielder = Yielder::new();
let future = f(yielder.duplicate()).boxed().fuse();
Self { future, yielder }
}
}
pub struct Yielder<T> {
value: Arc<Mutex<Option<T>>>,
}
impl<T> Yielder<T> {
fn new() -> Self {
Yielder {
value: Arc::new(Mutex::new(None)),
}
}
fn duplicate(&self) -> Self {
Yielder {
value: self.value.clone(),
}
}
pub async fn r#yield(&self, val: T) {
let replaced = self
.value
.lock()
.expect("BUG: panicked while holding a lock")
.replace(val);
debug_assert!(
replaced.is_none(),
"BUG: previously yielded value not taken"
);
let mut yielded = false;
futures_util::future::poll_fn(|_cx| {
if !yielded {
yielded = true;
Poll::Pending
} else {
Poll::Ready(())
}
})
.await
}
fn take(&self) -> Option<T> {
self.value
.lock()
.expect("BUG: panicked while holding a lock")
.take()
}
}
impl<'a, T> Stream for TryAsyncStream<'a, T> {
type Item = Result<T, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.future.is_terminated() {
return Poll::Ready(None);
}
match self.future.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
Poll::Ready(None)
}
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => self
.yielder
.take()
.map_or(Poll::Pending, |val| Poll::Ready(Some(Ok(val)))),
}
}
}
#[macro_export]
macro_rules! try_stream {
($($block:tt)*) => {
$crate::ext::async_stream::TryAsyncStream::new(move |yielder| ::tracing::Instrument::in_current_span(async move {
let yielder = &yielder;
macro_rules! r#yield {
($v:expr) => {{
yielder.r#yield($v).await;
}}
}
$($block)*
}))
}
}