parallel_stream/
from_stream.rs1use core::pin::Pin;
2
3use async_std::stream::{IntoStream, Stream};
4use async_std::task::{Context, Poll};
5use pin_project_lite::pin_project;
6
7use crate::ParallelStream;
8
9pin_project! {
10 #[derive(Clone, Debug)]
17 pub struct FromStream<S> {
18 #[pin]
19 stream: S,
20 limit: Option<usize>,
21 }
22}
23
24pub fn from_stream<S: IntoStream>(stream: S) -> FromStream<S::IntoStream>
26where
27 S: Send + Sync,
28{
29 FromStream {
30 limit: None,
31 stream: stream.into_stream(),
32 }
33}
34
35impl<S: Stream + Send + Sync + Unpin + 'static> ParallelStream for FromStream<S>
36where
37 S::Item: Send,
38{
39 type Item = S::Item;
40
41 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42 let this = self.project();
43 this.stream.poll_next(cx)
44 }
45
46 fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
47 self.limit = limit.into();
48 self
49 }
50
51 fn get_limit(&self) -> Option<usize> {
52 self.limit
53 }
54}