1use async_trait::async_trait;
2use futures::{stream::BoxStream, StreamExt, TryStreamExt};
3use serde::{de::DeserializeOwned, Serialize};
4
5use crate::types::Pagination;
6
7use super::{
8 endpoint::Endpoint,
9 query::{AsyncQuery, Query},
10 utils::{build_paged_request, deserialize_response},
11 ApiError, AsyncClient, Client, RestClient,
12};
13
14pub trait Pageable {}
18
19pub trait PagedEndpointExt<'a, E> {
21 fn iter<T, C>(&'a self, client: &'a C) -> PagedIter<'a, E, C, T>
23 where
24 C: Client,
25 T: DeserializeOwned;
26
27 fn single_page(&'a self) -> SinglePageBuilder<'a, E>;
29
30 fn stream<T, C>(&'a self, client: &'a C) -> BoxStream<'a, Result<T, ApiError<C::Error>>>
32 where
33 T: DeserializeOwned + Send + 'static,
34 C: AsyncClient + Sync,
35 E: Sync + Send;
36}
37
38pub struct PagedIter<'a, E, C, T> {
42 client: &'a C,
43 state: SinglePage<'a, E>,
44 last_page: bool,
45 current_page: Vec<T>,
46}
47
48#[derive(Debug)]
50pub struct SinglePageBuilder<'a, E> {
51 inner: &'a E,
52 offset: Option<usize>,
53 max: Option<usize>,
54}
55
56#[derive(Debug, Serialize)]
58pub struct SinglePage<'a, E> {
59 #[serde(skip)]
60 pub(crate) inner: &'a E,
61 offset: usize,
62 max: Option<usize>,
63}
64
65impl<'a, E, C, T> PagedIter<'a, E, C, T>
66where
67 E: Endpoint + Pageable,
68{
69 pub(crate) fn new(paged: &'a E, client: &'a C) -> Self {
70 let state = SinglePage::<E>::builder(paged).offset(0).build();
71 Self {
72 client,
73 state,
74 last_page: false,
75 current_page: Vec::new(),
76 }
77 }
78}
79
80impl<'a, E> SinglePageBuilder<'a, E>
81where
82 E: Pageable + Endpoint,
83{
84 pub fn new(paged: &'a E) -> Self {
86 Self {
87 inner: paged,
88 offset: None,
89 max: None,
90 }
91 }
92
93 pub fn offset<T>(mut self, value: T) -> Self
95 where
96 T: Into<Option<usize>>,
97 {
98 self.offset = value.into();
99 self
100 }
101
102 pub fn page_size<T>(mut self, value: T) -> Self
104 where
105 T: Into<Option<usize>>,
106 {
107 self.max = value.into();
109 self
110 }
111
112 pub fn build(self) -> SinglePage<'a, E>
114 where
115 E: Pageable,
116 {
117 SinglePage {
118 inner: self.inner,
119 offset: self.offset.unwrap_or(0),
120 max: self.max,
121 }
122 }
123}
124
125impl<'a, E> SinglePage<'a, E>
126where
127 E: Endpoint + Pageable,
128{
129 pub fn builder(paged: &'a E) -> SinglePageBuilder<'a, E> {
131 SinglePageBuilder::new(paged)
132 }
133
134 pub(crate) fn page_url<C: RestClient>(
135 &self,
136 client: &C,
137 ) -> Result<url::Url, ApiError<C::Error>> {
138 let mut url = client.rest_endpoint(&self.inner.endpoint())?;
139
140 let mut params = self.inner.query_parameters()?;
141 params.extend_from(&self)?;
142 params.apply_to(&mut url);
143
144 Ok(url)
145 }
146}
147
148#[async_trait]
149impl<'a, T, C, E> AsyncQuery<(Vec<T>, Pagination), C> for SinglePage<'a, E>
150where
151 T: DeserializeOwned + Send + 'static,
152 C: AsyncClient + Sync,
153 E: Endpoint + Pageable + Sync,
154{
155 async fn query_async(&self, client: &C) -> Result<(Vec<T>, Pagination), ApiError<C::Error>> {
156 let (req, data) = build_paged_request(self, client)?;
157
158 let url = req.uri_ref().cloned().unwrap_or_default();
159
160 let rsp = client.rest_async(req, data).await?;
161
162 deserialize_response::<_>(rsp)
163 .map(|value| (value.data, value.pagination.unwrap_or_default()))
164 .map_err(|err| ApiError::from_http_response(err, url))
165 }
166}
167
168impl<'a, E, C, T> Iterator for PagedIter<'a, E, C, T>
169where
170 E: Endpoint + Pageable,
171 T: DeserializeOwned,
172 C: Client,
173{
174 type Item = Result<T, ApiError<C::Error>>;
175
176 fn next(&mut self) -> Option<Self::Item> {
177 if self.current_page.is_empty() {
178 if self.last_page {
179 return None;
180 }
181 self.current_page = match self.state.query(self.client) {
182 Ok((data, _pagination)) => data,
183 Err(err) => return Some(Err(err)),
184 };
185 self.state.offset += self.current_page.len();
186
187 if self.current_page.len() < self.state.max.unwrap_or(20) {
189 self.last_page = true;
190 }
191 self.current_page.reverse();
192 }
193
194 self.current_page.pop().map(Ok)
195 }
196}
197
198impl<'a, E> PagedEndpointExt<'a, E> for E
199where
200 E: Endpoint + Pageable,
201{
202 fn iter<T, C>(&'a self, client: &'a C) -> PagedIter<'a, E, C, T>
203 where
204 C: Client,
205 T: DeserializeOwned,
206 {
207 PagedIter::new(self, client)
208 }
209
210 fn single_page(&self) -> SinglePageBuilder<'_, E> {
211 SinglePageBuilder::new(self)
212 }
213
214 fn stream<T, C>(&'a self, client: &'a C) -> BoxStream<'_, Result<T, ApiError<C::Error>>>
215 where
216 T: DeserializeOwned + Send + 'static,
217 C: AsyncClient + Sync,
218 E: Sync + Send,
219 {
220 futures::stream::try_unfold(Some(0), move |state| async move {
221 let offset = if let Some(offset) = state {
222 offset
223 } else {
224 return Ok(None);
225 };
226 let page = SinglePageBuilder::new(self).offset(offset).build();
227 let (data, pagination) = page.query_async(client).await?;
228 if data.is_empty() {
229 Ok::<_, ApiError<C::Error>>(None)
230 } else {
231 let next_state = if data.len() < pagination.max {
232 None
233 } else {
234 Some(offset + pagination.max)
236 };
237 Ok(Some((
238 futures::stream::iter(data.into_iter().map(Ok)),
239 next_state,
240 )))
241 }
242 })
243 .try_flatten()
244 .boxed()
245 }
246}
247
248impl<'a, E, T, C> Query<(Vec<T>, Pagination), C> for SinglePage<'a, E>
249where
250 E: Endpoint + Pageable,
251 T: DeserializeOwned,
252 C: Client,
253{
254 fn query(&self, client: &C) -> Result<(Vec<T>, Pagination), ApiError<C::Error>> {
255 let (req, data) = build_paged_request(self, client)?;
256
257 let url = req.uri_ref().cloned().unwrap_or_default();
258
259 let rsp = client.rest(req, data)?;
260
261 deserialize_response::<_>(rsp)
262 .map(|value| (value.data, value.pagination.unwrap_or_default()))
263 .map_err(|err| ApiError::from_http_response(err, url))
264 }
265}