maybe-borrow 0.1.2

Macros for conditionally returning borrowed data.
Documentation
use std::{
    pin::{pin, Pin},
    task::{ready, Context, Poll},
};

use futures::{executor::block_on, prelude::*};

use pin_project_lite::pin_project;

use maybe_borrow::prelude::*;

trait LendingStreamBase<'iter, _Bound = &'iter Self>: 'iter {
    type Item: 'iter;
    fn poll_next_base(self: Pin<&'iter mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>;
}

trait LendingStreamLt<'iter>: 'iter {
    type ItemLt: 'iter;
    fn poll_next_lt(self: Pin<&'iter mut Self>, cx: &mut Context) -> Poll<Option<Self::ItemLt>>;
}

impl<'iter, I: ?Sized + LendingStreamBase<'iter>> LendingStreamLt<'iter> for I {
    type ItemLt = I::Item;
    fn poll_next_lt(self: Pin<&'iter mut Self>, cx: &mut Context) -> Poll<Option<Self::ItemLt>> {
        self.poll_next_base(cx)
    }
}

trait LendingStream: for<'iter> LendingStreamLt<'iter, ItemLt = Self::Item<'iter>> {
    type Item<'iter>;

    fn poll_next<'iter>(
        self: Pin<&'iter mut Self>,
        cx: &mut Context,
    ) -> Poll<Option<<Self as LendingStreamLt<'iter>>::ItemLt>> {
        self.poll_next_lt(cx)
    }
}

impl<I: ?Sized + for<'iter> LendingStreamLt<'iter>> LendingStream for I {
    type Item<'iter> = <Self as LendingStreamLt<'iter>>::ItemLt;
}

pin_project!(
    struct Windows<T, I> {
        items: Vec<T>,
        size: usize,
        #[pin]
        iter: I,
    }
);

impl<T, I> Windows<T, I> {
    pub fn new(iter: I, size: usize) -> Self {
        let iter = iter;
        Self {
            items: Vec::with_capacity(size),
            size,
            iter,
        }
    }
}

impl<'iter, T, I> LendingStreamBase<'iter> for Windows<T, I>
where
    I: Stream<Item = T>,
{
    type Item = &'iter mut [T];
    fn poll_next_base(
        self: Pin<&'iter mut Self>,
        cx: &mut Context,
    ) -> Poll<Option<&'iter mut [T]>> {
        let mut this = self.project();

        while let Some(item) = ready!(this.iter.as_mut().poll_next(cx)) {
            if this.items.len() < *this.size {
                this.items.push(item);

                if this.items.len() < *this.size {
                    continue;
                }
            } else if let Some(slot) = this.items.first_mut() {
                *slot = item;
                this.items.rotate_left(1);
            }

            return Poll::Ready(Some(&mut this.items[..]));
        }

        this.items.clear();
        Poll::Ready(None)
    }
}

fn poll_next_filtered<'iter, I: LendingStream>(
    mut iter: Pin<&'iter mut I>,
    mut predicate: impl FnMut(&I::Item<'_>) -> bool,
    cx: &mut Context,
) -> Poll<Option<I::Item<'iter>>> {
    loop {
        maybe_borrow!(for<'x> |iter| -> Poll<Option<I::Item<'x>>> {
            match ready!(iter.poll_next(cx)) {
                Some(ref item) if !predicate(item) => {}
                out => return_borrowed!(Poll::Ready(out)),
            }
        });
    }
}

fn poll_next_filtered_with_try<'iter, I: LendingStream>(
    mut iter: Pin<&'iter mut I>,
    mut predicate: impl FnMut(&I::Item<'_>) -> bool,
    cx: &mut Context,
) -> Poll<Option<I::Item<'iter>>> {
    loop {
        maybe_borrow!(for<'x> |iter| -> Poll<Option<I::Item<'x>>> {
            match ready!(iter.poll_next(cx)) {
                Some(ref item) if !predicate(item) => {}
                out => return_borrowed!(Poll::Ready(out)),
            }
        });
    }
}

#[test]
fn test_next_filtered() {
    block_on(async {
        let mut stream = pin!(Windows::new(
            stream::iter([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
            3
        ));

        let items = futures::stream::poll_fn(move |cx| {
            poll_next_filtered(stream.as_mut(), |x| x[0] % 2 != 0, cx)
                .map(|x| x.map(|x| x.to_vec()))
        })
        .collect::<Vec<_>>()
        .await;

        assert_eq!(items, [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8, 9]])
    });
}

#[test]
fn test_next_filtered_with_try() {
    block_on(async {
        let mut stream = pin!(Windows::new(
            stream::iter([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
            3
        ));

        let items = futures::stream::poll_fn(move |cx| {
            poll_next_filtered_with_try(stream.as_mut(), |x| x[0] % 2 != 0, cx)
                .map(|x| x.map(|x| x.to_vec()))
        })
        .collect::<Vec<_>>()
        .await;

        assert_eq!(items, [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8, 9]])
    });
}