completion/stream/adapters/
step_by.rs1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use completion_core::CompletionStream;
5use futures_core::{ready, Stream};
6use pin_project_lite::pin_project;
7
8pin_project! {
9 #[derive(Debug, Clone)]
11 pub struct StepBy<S> {
12 #[pin]
13 stream: S,
14 step: usize,
15 i: usize,
16 }
17}
18
19impl<S> StepBy<S> {
20 pub(crate) fn new(stream: S, step: usize) -> Self {
21 assert!(step != 0, "cannot step by zero");
22 Self { stream, step, i: 0 }
23 }
24}
25
26impl<S: CompletionStream> CompletionStream for StepBy<S> {
27 type Item = S::Item;
28
29 unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30 let mut this = self.project();
31
32 loop {
33 match ready!(this.stream.as_mut().poll_next(cx)) {
34 Some(v) => {
35 if *this.i == 0 {
36 *this.i = *this.step - 1;
37 break Poll::Ready(Some(v));
38 }
39 *this.i -= 1;
40 }
41 None => break Poll::Ready(None),
42 }
43 }
44 }
45 unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
46 self.project().stream.poll_cancel(cx)
47 }
48 fn size_hint(&self) -> (usize, Option<usize>) {
49 let (low, high) = self.stream.size_hint();
50 let f = |n: usize| {
51 n.saturating_sub(self.i)
52 .checked_sub(1)
53 .map_or(0, |n| n / self.step + 1)
54 };
55 (f(low), high.map(f))
56 }
57}
58
59impl<S> Stream for StepBy<S>
60where
61 S: CompletionStream + Stream<Item = <S as CompletionStream>::Item>,
62{
63 type Item = <S as CompletionStream>::Item;
64
65 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66 unsafe { CompletionStream::poll_next(self, cx) }
67 }
68 fn size_hint(&self) -> (usize, Option<usize>) {
69 CompletionStream::size_hint(self)
70 }
71}