1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use std::{collections::VecDeque, task::Poll};

use futures::{
    future::{BoxFuture, OptionFuture},
    Future,
};

use super::WasmerClient;

type PaginationFuture<I, P> = BoxFuture<'static, Result<(Vec<I>, Option<P>), anyhow::Error>>;

pub trait PaginatedQuery {
    type Vars;
    type Paginator;
    type Item;

    fn query(
        &self,
        client: WasmerClient,
        paginator: Option<Self::Paginator>,
    ) -> PaginationFuture<Self::Item, Self::Paginator>;
}

pin_project_lite::pin_project! {
    pub struct QueryStream<Q: PaginatedQuery> {
        query: Q,

        client: WasmerClient,
        page: usize,
        paginator: Option<Q::Paginator>,
        finished: bool,
        items: VecDeque<Q::Item>,

        #[pin]
        fut: OptionFuture<PaginationFuture<Q::Item, Q::Paginator>>,
    }
}

impl<Q: PaginatedQuery> QueryStream<Q> {
    pub fn new(query: Q, client: WasmerClient) -> Self {
        Self {
            query,
            client,
            page: 0,
            finished: false,
            paginator: None,
            items: VecDeque::new(),
            fut: None.into(),
        }
    }
}

impl<Q: PaginatedQuery> futures::Stream for QueryStream<Q> {
    type Item = Result<Q::Item, anyhow::Error>;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        if let Some(item) = this.items.pop_front() {
            return Poll::Ready(Some(Ok(item)));
        }

        match this.fut.as_mut().poll(cx) {
            Poll::Ready(None) => {}
            Poll::Ready(Some(Ok((items, paginator)))) => {
                *this.paginator = paginator;
                *this.page += 1;
                // *this.fut = None.into();
                this.items.extend(items);
                this.fut.set(None.into());

                if let Some(item) = this.items.pop_front() {
                    return Poll::Ready(Some(Ok(item)));
                }
            }
            Poll::Ready(Some(Err(err))) => {
                return Poll::Ready(Some(Err(err)));
            }
            Poll::Pending => {
                return Poll::Pending;
            }
        };

        let pager = match this.paginator.take() {
            Some(p) => Some(p),
            None if *this.page == 0 => None,
            None => {
                return Poll::Ready(None);
            }
        };

        let f = this.query.query(this.client.clone(), pager);
        this.fut.set(Some(f).into());

        match this.fut.as_mut().poll(cx) {
            Poll::Ready(None) => {
                unreachable!()
            }
            Poll::Ready(Some(Ok((items, paginator)))) => {
                *this.paginator = paginator;
                *this.page += 1;
                // *this.fut = None.into();
                this.items.extend(items);
                this.fut.set(None.into());

                if let Some(item) = this.items.pop_front() {
                    Poll::Ready(Some(Ok(item)))
                } else {
                    Poll::Ready(None)
                }
            }
            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
            Poll::Pending => Poll::Pending,
        }
    }
}