use async_channel;
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use sugars_async_task::NotResult;
pub struct AsyncStream<T>
where
T: NotResult, {
receiver: async_channel::Receiver<T>,
}
impl<T> AsyncStream<T>
where
T: NotResult, {
pub fn new(receiver: channel::Receiver<T>) -> Self {
Self {
receiver,
waker: Arc::new(Mutex::new(None)),
}
}
pub fn from_stream<S>(stream: S) -> super::AsyncTask<Vec<T>>
where
S: Stream<Item = T> + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = channel::unbounded();
std::thread::spawn(move || {
use futures::StreamExt;
let rt = tokio::runtime::Handle::current();
rt.block_on(async move {
let mut stream = std::pin::pin!(stream);
while let Some(item) = stream.next().await {
if tx.send(item).is_err() {
break;
}
}
});
});
AsyncStream::new(rx).collect_async()
}
pub fn collect_async(self) -> super::AsyncTask<Vec<T>>
where
T: Send + 'static,
{
super::AsyncTask::new(async move {
let mut items = Vec::new();
let receiver = self.receiver;
while let Ok(item) = receiver.recv() {
items.push(item);
}
items
})
}
}
impl<T> Stream for AsyncStream<T>
where
T: NotResult,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Ok(mut waker_guard) = self.waker.lock() {
*waker_guard = Some(cx.waker().clone());
}
match self.receiver.try_recv() {
Ok(item) => Poll::Ready(Some(item)),
Err(channel::TryRecvError::Empty) => Poll::Pending,
Err(channel::TryRecvError::Disconnected) => Poll::Ready(None),
}
}
}