use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use sugars_async_task::{AsyncTask, NotResult};
use sugars_collections::ZeroOneOrMany;
use tokio::sync::mpsc;
pub struct AsyncStream<T>
where
T: NotResult, {
receiver: mpsc::UnboundedReceiver<T>,
}
impl<T> AsyncStream<T>
where
T: NotResult, {
pub fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
Self { receiver }
}
pub fn from_stream<S>(stream: S) -> AsyncTask<Vec<T>>
where
S: Stream<Item = T> + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
use futures::StreamExt;
let mut stream = std::pin::pin!(stream);
let mut items = Vec::new();
while let Some(item) = stream.next().await {
items.push(item);
}
let _ = tx.send(items);
});
AsyncTask::new(ZeroOneOrMany::one(rx))
}
pub fn collect_async(self) -> AsyncTask<Vec<T>>
where
T: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let mut receiver = self.receiver;
tokio::spawn(async move {
let mut items = Vec::new();
while let Some(item) = receiver.recv().await {
items.push(item);
}
let _ = tx.send(items);
});
AsyncTask::new(ZeroOneOrMany::one(rx))
}
}
impl<T> Stream for AsyncStream<T>
where
T: NotResult,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}