parallel_stream/par_stream/
take.rs1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use async_std::task::ready;
5use pin_project_lite::pin_project;
6
7use crate::ParallelStream;
8
9pin_project! {
10 #[derive(Clone, Debug)]
18 pub struct Take<S> {
19 #[pin]
20 stream: S,
21 remaining: usize,
22 limit: Option<usize>,
23 }
24}
25
26impl<S: ParallelStream> Take<S> {
27 pub(super) fn new(stream: S, remaining: usize) -> Self {
28 Self {
29 limit: stream.get_limit(),
30 remaining,
31 stream,
32 }
33 }
34}
35
36impl<S: ParallelStream> ParallelStream for Take<S> {
37 type Item = S::Item;
38
39 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
40 let this = self.project();
41 if *this.remaining == 0 {
42 Poll::Ready(None)
43 } else {
44 let next = ready!(this.stream.poll_next(cx));
45 match next {
46 Some(_) => *this.remaining -= 1,
47 None => *this.remaining = 0,
48 }
49 Poll::Ready(next)
50 }
51 }
52
53 fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
54 self.limit = limit.into();
55 self
56 }
57
58 fn get_limit(&self) -> Option<usize> {
59 self.limit
60 }
61}