wasmer_api/
stream.rs

1use std::{collections::VecDeque, task::Poll};
2
3use futures::{
4    future::{BoxFuture, OptionFuture},
5    Future,
6};
7
8use super::WasmerClient;
9
10type PaginationFuture<I, P> = BoxFuture<'static, Result<(Vec<I>, Option<P>), anyhow::Error>>;
11
12pub trait PaginatedQuery {
13    type Vars;
14    type Paginator;
15    type Item;
16
17    fn query(
18        &self,
19        client: WasmerClient,
20        paginator: Option<Self::Paginator>,
21    ) -> PaginationFuture<Self::Item, Self::Paginator>;
22}
23
24pin_project_lite::pin_project! {
25    pub struct QueryStream<Q: PaginatedQuery> {
26        query: Q,
27
28        client: WasmerClient,
29        page: usize,
30        paginator: Option<Q::Paginator>,
31        finished: bool,
32        items: VecDeque<Q::Item>,
33
34        #[pin]
35        fut: OptionFuture<PaginationFuture<Q::Item, Q::Paginator>>,
36    }
37}
38
39impl<Q: PaginatedQuery> QueryStream<Q> {
40    pub fn new(query: Q, client: WasmerClient) -> Self {
41        Self {
42            query,
43            client,
44            page: 0,
45            finished: false,
46            paginator: None,
47            items: VecDeque::new(),
48            fut: None.into(),
49        }
50    }
51}
52
53impl<Q: PaginatedQuery> futures::Stream for QueryStream<Q> {
54    type Item = Result<Q::Item, anyhow::Error>;
55
56    fn poll_next(
57        self: std::pin::Pin<&mut Self>,
58        cx: &mut std::task::Context<'_>,
59    ) -> Poll<Option<Self::Item>> {
60        let mut this = self.project();
61
62        if let Some(item) = this.items.pop_front() {
63            return Poll::Ready(Some(Ok(item)));
64        }
65
66        match this.fut.as_mut().poll(cx) {
67            Poll::Ready(None) => {}
68            Poll::Ready(Some(Ok((items, paginator)))) => {
69                *this.paginator = paginator;
70                *this.page += 1;
71                // *this.fut = None.into();
72                this.items.extend(items);
73                this.fut.set(None.into());
74
75                if let Some(item) = this.items.pop_front() {
76                    return Poll::Ready(Some(Ok(item)));
77                }
78            }
79            Poll::Ready(Some(Err(err))) => {
80                return Poll::Ready(Some(Err(err)));
81            }
82            Poll::Pending => {
83                return Poll::Pending;
84            }
85        };
86
87        let pager = match this.paginator.take() {
88            Some(p) => Some(p),
89            None if *this.page == 0 => None,
90            None => {
91                return Poll::Ready(None);
92            }
93        };
94
95        let f = this.query.query(this.client.clone(), pager);
96        this.fut.set(Some(f).into());
97
98        match this.fut.as_mut().poll(cx) {
99            Poll::Ready(None) => {
100                unreachable!()
101            }
102            Poll::Ready(Some(Ok((items, paginator)))) => {
103                *this.paginator = paginator;
104                *this.page += 1;
105                // *this.fut = None.into();
106                this.items.extend(items);
107                this.fut.set(None.into());
108
109                if let Some(item) = this.items.pop_front() {
110                    Poll::Ready(Some(Ok(item)))
111                } else {
112                    Poll::Ready(None)
113                }
114            }
115            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
116            Poll::Pending => Poll::Pending,
117        }
118    }
119}