async_std/stream/stream/
step_by.rs1use core::pin::Pin;
2
3use pin_project_lite::pin_project;
4
5use crate::stream::Stream;
6use crate::task::{Context, Poll};
7
8pin_project! {
9 #[derive(Debug)]
17 pub struct StepBy<S> {
18 #[pin]
19 stream: S,
20 step: usize,
21 i: usize,
22 }
23}
24
25impl<S> StepBy<S> {
26 pub(crate) fn new(stream: S, step: usize) -> Self {
27 Self {
28 stream,
29 step: step.checked_sub(1).unwrap(),
30 i: 0,
31 }
32 }
33}
34
35impl<S> Stream for StepBy<S>
36where
37 S: Stream,
38{
39 type Item = S::Item;
40
41 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42 let mut this = self.project();
43 loop {
44 let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
45
46 match next {
47 Some(v) => match this.i {
48 0 => {
49 *this.i = *this.step;
50 return Poll::Ready(Some(v));
51 }
52 _ => *this.i -= 1,
53 },
54 None => return Poll::Ready(None),
55 }
56 }
57 }
58}