parallel_stream/
from_stream.rs

1use core::pin::Pin;
2
3use async_std::stream::{IntoStream, Stream};
4use async_std::task::{Context, Poll};
5use pin_project_lite::pin_project;
6
7use crate::ParallelStream;
8
9pin_project! {
10    /// A parallel stream that was created from sequential stream.
11    ///
12    /// This stream is created by the [`from_stream`] function.
13    /// See it documentation for more.
14    ///
15    /// [`from_stream`]: fn.from_stream.html
16    #[derive(Clone, Debug)]
17    pub struct FromStream<S> {
18        #[pin]
19        stream: S,
20        limit: Option<usize>,
21    }
22}
23
24/// Converts a stream into a parallel stream.
25pub fn from_stream<S: IntoStream>(stream: S) -> FromStream<S::IntoStream>
26where
27    S: Send + Sync,
28{
29    FromStream {
30        limit: None,
31        stream: stream.into_stream(),
32    }
33}
34
35impl<S: Stream + Send + Sync + Unpin + 'static> ParallelStream for FromStream<S>
36where
37    S::Item: Send,
38{
39    type Item = S::Item;
40
41    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42        let this = self.project();
43        this.stream.poll_next(cx)
44    }
45
46    fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
47        self.limit = limit.into();
48        self
49    }
50
51    fn get_limit(&self) -> Option<usize> {
52        self.limit
53    }
54}