iterstream 0.1.2

Converts Iterator into real asynchronous Stream
Documentation
//! This crate provides a trait that can convert an `Iterator` to a
//! `Stream`. It uses the `futures` crate to create an executor that
//! gets the values from the iterator in a separate thread pool
//!
//! It differs from the [`iter()`](futures::stream::iter) function
//! because iterator consumption is done in a separate thread. The
//! stream is then really asynchronous if the iterator is blocking
//!
//! The `ThreadPool` needed to execute tasks can be either create
//! automatically (when using
//! [`to_stream`](crate::IterStream::to_stream)), or explicitly
//! specified using
//! [`to_stream_with_pool`](crate::IterStream::to_stream_with_pool). The
//! later is more flexible (and efficient) as it allows to share the
//! same pool for multiple streams
//!
//! # Example
//! ```rust
//! # futures::executor::block_on(async {
//! use iterstream::IterStream;
//! use futures::stream::StreamExt;
//! use futures::executor::ThreadPool;
//!
//! let vals = vec![1, 2, 3, 4, 5];
//! let stream = vals.into_iter().to_stream_with_pool(10, ThreadPool::new().unwrap());
//! let c: Vec<_> = stream.collect().await;
//! assert_eq!(vec![1,2,3,4, 5], c);
//! # });
//! ```
//!

use futures::channel::mpsc::{channel,Receiver};
use futures::SinkExt;
use futures::executor::ThreadPool;
use core::iter::Iterator;


/// A trait that converts an Iterator to an asynchronous Stream that
/// polls the iterator in a background task
pub trait IterStream: Iterator
where
    Self: 'static + Sized + Send,
    Self::Item: Send

{
    /// Creates a stream from an iterator. The stream is actually a
    /// `mpsc::Receiver`
    ///
    /// The `buffer_size` parameter is directly used in the creation
    /// of the underlying `channel`
    ///
    /// A new `ThreadPool` is created for each call to this
    /// function. If you need multiple streams, you probably want to
    /// use `to_stream_with_pool` instead
    ///
    /// # Example
    /// ```rust
    /// # futures::executor::block_on(async {
    /// use iterstream::IterStream;
    /// use futures::stream::StreamExt;
    ///
    /// let vals = vec![1, 2, 3, 4, 5];
    /// let stream = vals.into_iter().to_stream(10).unwrap();
    /// let c: Vec<_> = stream.collect().await;
    /// assert_eq!(vec![1,2,3,4, 5], c);
    /// # });
    /// ```
    fn to_stream(self, buffer_size: usize) -> Result<Receiver<Self::Item>, futures::io::Error> {
        Ok(self.to_stream_with_pool(buffer_size, ThreadPool::new()?))
    }

    /// Creates a stream from an iterator. The stream is actually a
    /// `mpsc::Receiver`
    ///
    /// The `buffer_size` parameter is directly used in the creation
    /// of the underlying `channel`
    ///
    /// The task that will poll the iterator is created from the given ThreadPool
    ///
    /// # Example
    /// ```rust
    /// # futures::executor::block_on(async {
    /// use iterstream::IterStream;
    /// use futures::stream::StreamExt;
    /// use futures::executor::ThreadPool;
    ///
    /// let vals = vec![1, 2, 3, 4, 5];
    /// let stream = vals.into_iter().to_stream_with_pool(10, ThreadPool::new().unwrap());
    /// let c: Vec<_> = stream.collect().await;
    /// assert_eq!(vec![1,2,3,4, 5], c);
    /// # });
    /// ```
    fn to_stream_with_pool(self, buffer_size: usize, pool: ThreadPool) -> Receiver<Self::Item> {
        let (mut sender,receiver) = channel(buffer_size);
        
        pool.spawn_ok(async move {
            for value in self {
                sender.send(value).await.ok();
            }
        });
        receiver
    }
}


/// Implemenation of the IterStream trait for all Iterators
impl<I> IterStream for I
where
    I: 'static + Iterator + Send,
    I::Item: Send,
{}



#[cfg(test)]
mod tests {
    use super::IterStream;
    use futures::stream::StreamExt;
    #[test]
    fn it_works() {
        futures::executor::block_on(async move {
            let vals = vec![1, 2, 3, 4, 5];
            let stream = vals.into_iter().to_stream(10).unwrap();
            let c: Vec<_> = stream.collect().await;
            assert_eq!(vec![1,2,3,4, 5], c);
        });
    }
}