completion/stream/adapters/
step_by.rs

1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use completion_core::CompletionStream;
5use futures_core::{ready, Stream};
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Stream for [`CompletionStreamExt::step_by`](crate::CompletionStreamExt::step_by).
10    #[derive(Debug, Clone)]
11    pub struct StepBy<S> {
12        #[pin]
13        stream: S,
14        step: usize,
15        i: usize,
16    }
17}
18
19impl<S> StepBy<S> {
20    pub(crate) fn new(stream: S, step: usize) -> Self {
21        assert!(step != 0, "cannot step by zero");
22        Self { stream, step, i: 0 }
23    }
24}
25
26impl<S: CompletionStream> CompletionStream for StepBy<S> {
27    type Item = S::Item;
28
29    unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30        let mut this = self.project();
31
32        loop {
33            match ready!(this.stream.as_mut().poll_next(cx)) {
34                Some(v) => {
35                    if *this.i == 0 {
36                        *this.i = *this.step - 1;
37                        break Poll::Ready(Some(v));
38                    }
39                    *this.i -= 1;
40                }
41                None => break Poll::Ready(None),
42            }
43        }
44    }
45    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
46        self.project().stream.poll_cancel(cx)
47    }
48    fn size_hint(&self) -> (usize, Option<usize>) {
49        let (low, high) = self.stream.size_hint();
50        let f = |n: usize| {
51            n.saturating_sub(self.i)
52                .checked_sub(1)
53                .map_or(0, |n| n / self.step + 1)
54        };
55        (f(low), high.map(f))
56    }
57}
58
59impl<S> Stream for StepBy<S>
60where
61    S: CompletionStream + Stream<Item = <S as CompletionStream>::Item>,
62{
63    type Item = <S as CompletionStream>::Item;
64
65    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66        unsafe { CompletionStream::poll_next(self, cx) }
67    }
68    fn size_hint(&self) -> (usize, Option<usize>) {
69        CompletionStream::size_hint(self)
70    }
71}