speedrun_api/api/
pagination.rs

1use async_trait::async_trait;
2use futures::{stream::BoxStream, StreamExt, TryStreamExt};
3use serde::{de::DeserializeOwned, Serialize};
4
5use crate::types::Pagination;
6
7use super::{
8    endpoint::Endpoint,
9    query::{AsyncQuery, Query},
10    utils::{build_paged_request, deserialize_response},
11    ApiError, AsyncClient, Client, RestClient,
12};
13
14// TODO: Use provided "next" link for pagination
15
16/// Marker trait to indicate that an endpoint is pageable.
17pub trait Pageable {}
18
19/// Adapters specific to [`Pageable`] endpoints.
20pub trait PagedEndpointExt<'a, E> {
21    /// Create an Iterator over the results of the paginated endpoint.
22    fn iter<T, C>(&'a self, client: &'a C) -> PagedIter<'a, E, C, T>
23    where
24        C: Client,
25        T: DeserializeOwned;
26
27    /// Retrieves a single page of results for the paginated endpoint.
28    fn single_page(&'a self) -> SinglePageBuilder<'a, E>;
29
30    /// Create an async Stream over the results of the paginated endpoint.
31    fn stream<T, C>(&'a self, client: &'a C) -> BoxStream<'a, Result<T, ApiError<C::Error>>>
32    where
33        T: DeserializeOwned + Send + 'static,
34        C: AsyncClient + Sync,
35        E: Sync + Send;
36}
37
38/// Iterator type for the [`iter`] method on [`PagedEndpointExt`].
39///
40/// [`iter`]: PagedEndpointExt::iter
41pub struct PagedIter<'a, E, C, T> {
42    client: &'a C,
43    state: SinglePage<'a, E>,
44    last_page: bool,
45    current_page: Vec<T>,
46}
47
48/// Builder for the [`SinglePage`] endpoint
49#[derive(Debug)]
50pub struct SinglePageBuilder<'a, E> {
51    inner: &'a E,
52    offset: Option<usize>,
53    max: Option<usize>,
54}
55
56/// Represents a single page of elements.
57#[derive(Debug, Serialize)]
58pub struct SinglePage<'a, E> {
59    #[serde(skip)]
60    pub(crate) inner: &'a E,
61    offset: usize,
62    max: Option<usize>,
63}
64
65impl<'a, E, C, T> PagedIter<'a, E, C, T>
66where
67    E: Endpoint + Pageable,
68{
69    pub(crate) fn new(paged: &'a E, client: &'a C) -> Self {
70        let state = SinglePage::<E>::builder(paged).offset(0).build();
71        Self {
72            client,
73            state,
74            last_page: false,
75            current_page: Vec::new(),
76        }
77    }
78}
79
80impl<'a, E> SinglePageBuilder<'a, E>
81where
82    E: Pageable + Endpoint,
83{
84    /// Create a new [`SinglePageBuilder`].
85    pub fn new(paged: &'a E) -> Self {
86        Self {
87            inner: paged,
88            offset: None,
89            max: None,
90        }
91    }
92
93    /// Request set of elements beginning at `offset`
94    pub fn offset<T>(mut self, value: T) -> Self
95    where
96        T: Into<Option<usize>>,
97    {
98        self.offset = value.into();
99        self
100    }
101
102    /// Number of elements per request. Valid values are between 1 and 200.
103    pub fn page_size<T>(mut self, value: T) -> Self
104    where
105        T: Into<Option<usize>>,
106    {
107        // TODO: Validate that value is between 1 and 200.
108        self.max = value.into();
109        self
110    }
111
112    /// Returns a [`SinglePage`] that can be querired for a set of elements.
113    pub fn build(self) -> SinglePage<'a, E>
114    where
115        E: Pageable,
116    {
117        SinglePage {
118            inner: self.inner,
119            offset: self.offset.unwrap_or(0),
120            max: self.max,
121        }
122    }
123}
124
125impl<'a, E> SinglePage<'a, E>
126where
127    E: Endpoint + Pageable,
128{
129    /// Create a builder for a [`SinglePage`]
130    pub fn builder(paged: &'a E) -> SinglePageBuilder<'a, E> {
131        SinglePageBuilder::new(paged)
132    }
133
134    pub(crate) fn page_url<C: RestClient>(
135        &self,
136        client: &C,
137    ) -> Result<url::Url, ApiError<C::Error>> {
138        let mut url = client.rest_endpoint(&self.inner.endpoint())?;
139
140        let mut params = self.inner.query_parameters()?;
141        params.extend_from(&self)?;
142        params.apply_to(&mut url);
143
144        Ok(url)
145    }
146}
147
148#[async_trait]
149impl<'a, T, C, E> AsyncQuery<(Vec<T>, Pagination), C> for SinglePage<'a, E>
150where
151    T: DeserializeOwned + Send + 'static,
152    C: AsyncClient + Sync,
153    E: Endpoint + Pageable + Sync,
154{
155    async fn query_async(&self, client: &C) -> Result<(Vec<T>, Pagination), ApiError<C::Error>> {
156        let (req, data) = build_paged_request(self, client)?;
157
158        let url = req.uri_ref().cloned().unwrap_or_default();
159
160        let rsp = client.rest_async(req, data).await?;
161
162        deserialize_response::<_>(rsp)
163            .map(|value| (value.data, value.pagination.unwrap_or_default()))
164            .map_err(|err| ApiError::from_http_response(err, url))
165    }
166}
167
168impl<'a, E, C, T> Iterator for PagedIter<'a, E, C, T>
169where
170    E: Endpoint + Pageable,
171    T: DeserializeOwned,
172    C: Client,
173{
174    type Item = Result<T, ApiError<C::Error>>;
175
176    fn next(&mut self) -> Option<Self::Item> {
177        if self.current_page.is_empty() {
178            if self.last_page {
179                return None;
180            }
181            self.current_page = match self.state.query(self.client) {
182                Ok((data, _pagination)) => data,
183                Err(err) => return Some(Err(err)),
184            };
185            self.state.offset += self.current_page.len();
186
187            // FIXME: 20 may not always be correct.
188            if self.current_page.len() < self.state.max.unwrap_or(20) {
189                self.last_page = true;
190            }
191            self.current_page.reverse();
192        }
193
194        self.current_page.pop().map(Ok)
195    }
196}
197
198impl<'a, E> PagedEndpointExt<'a, E> for E
199where
200    E: Endpoint + Pageable,
201{
202    fn iter<T, C>(&'a self, client: &'a C) -> PagedIter<'a, E, C, T>
203    where
204        C: Client,
205        T: DeserializeOwned,
206    {
207        PagedIter::new(self, client)
208    }
209
210    fn single_page(&self) -> SinglePageBuilder<'_, E> {
211        SinglePageBuilder::new(self)
212    }
213
214    fn stream<T, C>(&'a self, client: &'a C) -> BoxStream<'_, Result<T, ApiError<C::Error>>>
215    where
216        T: DeserializeOwned + Send + 'static,
217        C: AsyncClient + Sync,
218        E: Sync + Send,
219    {
220        futures::stream::try_unfold(Some(0), move |state| async move {
221            let offset = if let Some(offset) = state {
222                offset
223            } else {
224                return Ok(None);
225            };
226            let page = SinglePageBuilder::new(self).offset(offset).build();
227            let (data, pagination) = page.query_async(client).await?;
228            if data.is_empty() {
229                Ok::<_, ApiError<C::Error>>(None)
230            } else {
231                let next_state = if data.len() < pagination.max {
232                    None
233                } else {
234                    // TODO: Dynamic page size
235                    Some(offset + pagination.max)
236                };
237                Ok(Some((
238                    futures::stream::iter(data.into_iter().map(Ok)),
239                    next_state,
240                )))
241            }
242        })
243        .try_flatten()
244        .boxed()
245    }
246}
247
248impl<'a, E, T, C> Query<(Vec<T>, Pagination), C> for SinglePage<'a, E>
249where
250    E: Endpoint + Pageable,
251    T: DeserializeOwned,
252    C: Client,
253{
254    fn query(&self, client: &C) -> Result<(Vec<T>, Pagination), ApiError<C::Error>> {
255        let (req, data) = build_paged_request(self, client)?;
256
257        let url = req.uri_ref().cloned().unwrap_or_default();
258
259        let rsp = client.rest(req, data)?;
260
261        deserialize_response::<_>(rsp)
262            .map(|value| (value.data, value.pagination.unwrap_or_default()))
263            .map_err(|err| ApiError::from_http_response(err, url))
264    }
265}