1use std::{collections::VecDeque, task::Poll};
2
3use futures::{
4 future::{BoxFuture, OptionFuture},
5 Future,
6};
7
8use super::WasmerClient;
9
10type PaginationFuture<I, P> = BoxFuture<'static, Result<(Vec<I>, Option<P>), anyhow::Error>>;
11
12pub trait PaginatedQuery {
13 type Vars;
14 type Paginator;
15 type Item;
16
17 fn query(
18 &self,
19 client: WasmerClient,
20 paginator: Option<Self::Paginator>,
21 ) -> PaginationFuture<Self::Item, Self::Paginator>;
22}
23
24pin_project_lite::pin_project! {
25 pub struct QueryStream<Q: PaginatedQuery> {
26 query: Q,
27
28 client: WasmerClient,
29 page: usize,
30 paginator: Option<Q::Paginator>,
31 finished: bool,
32 items: VecDeque<Q::Item>,
33
34 #[pin]
35 fut: OptionFuture<PaginationFuture<Q::Item, Q::Paginator>>,
36 }
37}
38
39impl<Q: PaginatedQuery> QueryStream<Q> {
40 pub fn new(query: Q, client: WasmerClient) -> Self {
41 Self {
42 query,
43 client,
44 page: 0,
45 finished: false,
46 paginator: None,
47 items: VecDeque::new(),
48 fut: None.into(),
49 }
50 }
51}
52
53impl<Q: PaginatedQuery> futures::Stream for QueryStream<Q> {
54 type Item = Result<Q::Item, anyhow::Error>;
55
56 fn poll_next(
57 self: std::pin::Pin<&mut Self>,
58 cx: &mut std::task::Context<'_>,
59 ) -> Poll<Option<Self::Item>> {
60 let mut this = self.project();
61
62 if let Some(item) = this.items.pop_front() {
63 return Poll::Ready(Some(Ok(item)));
64 }
65
66 match this.fut.as_mut().poll(cx) {
67 Poll::Ready(None) => {}
68 Poll::Ready(Some(Ok((items, paginator)))) => {
69 *this.paginator = paginator;
70 *this.page += 1;
71 this.items.extend(items);
73 this.fut.set(None.into());
74
75 if let Some(item) = this.items.pop_front() {
76 return Poll::Ready(Some(Ok(item)));
77 }
78 }
79 Poll::Ready(Some(Err(err))) => {
80 return Poll::Ready(Some(Err(err)));
81 }
82 Poll::Pending => {
83 return Poll::Pending;
84 }
85 };
86
87 let pager = match this.paginator.take() {
88 Some(p) => Some(p),
89 None if *this.page == 0 => None,
90 None => {
91 return Poll::Ready(None);
92 }
93 };
94
95 let f = this.query.query(this.client.clone(), pager);
96 this.fut.set(Some(f).into());
97
98 match this.fut.as_mut().poll(cx) {
99 Poll::Ready(None) => {
100 unreachable!()
101 }
102 Poll::Ready(Some(Ok((items, paginator)))) => {
103 *this.paginator = paginator;
104 *this.page += 1;
105 this.items.extend(items);
107 this.fut.set(None.into());
108
109 if let Some(item) = this.items.pop_front() {
110 Poll::Ready(Some(Ok(item)))
111 } else {
112 Poll::Ready(None)
113 }
114 }
115 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
116 Poll::Pending => Poll::Pending,
117 }
118 }
119}