parallel_stream/par_stream/
take.rs

1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use async_std::task::ready;
5use pin_project_lite::pin_project;
6
7use crate::ParallelStream;
8
9pin_project! {
10    /// A stream that yields the first `n` items of another stream.
11    ///
12    /// This `struct` is created by the [`take`] method on [`ParallelStream`]. See its
13    /// documentation for more.
14    ///
15    /// [`take`]: trait.ParallelStream.html#method.take
16    /// [`ParallelStream`]: trait.ParallelStream.html
17    #[derive(Clone, Debug)]
18    pub struct Take<S> {
19        #[pin]
20        stream: S,
21        remaining: usize,
22        limit: Option<usize>,
23    }
24}
25
26impl<S: ParallelStream> Take<S> {
27    pub(super) fn new(stream: S, remaining: usize) -> Self {
28        Self {
29            limit: stream.get_limit(),
30            remaining,
31            stream,
32        }
33    }
34}
35
36impl<S: ParallelStream> ParallelStream for Take<S> {
37    type Item = S::Item;
38
39    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
40        let this = self.project();
41        if *this.remaining == 0 {
42            Poll::Ready(None)
43        } else {
44            let next = ready!(this.stream.poll_next(cx));
45            match next {
46                Some(_) => *this.remaining -= 1,
47                None => *this.remaining = 0,
48            }
49            Poll::Ready(next)
50        }
51    }
52
53    fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
54        self.limit = limit.into();
55        self
56    }
57
58    fn get_limit(&self) -> Option<usize> {
59        self.limit
60    }
61}