use futures::channel::mpsc::{channel,Receiver};
use futures::SinkExt;
use futures::executor::ThreadPool;
use core::iter::Iterator;
pub trait IterStream: Iterator
where
Self: 'static + Sized + Send,
Self::Item: Send
{
fn to_stream(self, buffer_size: usize) -> Result<Receiver<Self::Item>, futures::io::Error> {
Ok(self.to_stream_with_pool(buffer_size, ThreadPool::new()?))
}
fn to_stream_with_pool(self, buffer_size: usize, pool: ThreadPool) -> Receiver<Self::Item> {
let (mut sender,receiver) = channel(buffer_size);
pool.spawn_ok(async move {
for value in self {
sender.send(value).await.ok();
}
});
receiver
}
}
impl<I> IterStream for I
where
I: 'static + Iterator + Send,
I::Item: Send,
{}
#[cfg(test)]
mod tests {
use super::IterStream;
use futures::stream::StreamExt;
#[test]
fn it_works() {
futures::executor::block_on(async move {
let vals = vec![1, 2, 3, 4, 5];
let stream = vals.into_iter().to_stream(10).unwrap();
let c: Vec<_> = stream.collect().await;
assert_eq!(vec![1,2,3,4, 5], c);
});
}
}