Skip to main content

gitlab/api/paged/
lazy.rs

1// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4// option. This file may not be copied, modified, or distributed
5// except according to those terms.
6
7use std::num::NonZeroU64;
8use std::sync::RwLock;
9
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures_util::Stream;
13use http::request::Builder as RequestBuilder;
14use http::{header, Request, Response};
15use query::AsyncQuery;
16use serde::de::DeserializeOwned;
17use url::Url;
18
19use crate::api::paged::link_header;
20use crate::api::{
21    query, ApiError, AsyncClient, Client, Endpoint, Pageable, Paged, Query, RestClient,
22};
23
24impl<E> Paged<E>
25where
26    E: Endpoint,
27    E: Pageable,
28{
29    /// Create an iterator over the results of paginated results for with a client.
30    pub fn iter<'a, C, T>(&'a self, client: &'a C) -> LazilyPagedIter<'a, &'a E, C, T> {
31        let borrowed = Paged::<&E> {
32            endpoint: &self.endpoint,
33            pagination: self.pagination,
34        };
35        LazilyPagedIter::new(borrowed, client)
36    }
37
38    /// Create an iterator over the results of paginated results for with a client.
39    pub fn into_iter<C, T>(self, client: &C) -> LazilyPagedIter<'_, E, C, T> {
40        LazilyPagedIter::new(self, client)
41    }
42}
43
44impl<E> Paged<E>
45where
46    E: Endpoint + Pageable + Sync,
47{
48    /// Create a stream over the results of paginated results for with a client.
49    pub fn iter_async<'a, C, T>(
50        &'a self,
51        client: &'a C,
52    ) -> impl Stream<Item = Result<T, ApiError<C::Error>>> + 'a
53    where
54        T: DeserializeOwned + 'static,
55        C: AsyncClient + Sync,
56    {
57        let borrowed = Paged::<&E> {
58            endpoint: &self.endpoint,
59            pagination: self.pagination,
60        };
61        let iter = LazilyPagedIter::new(borrowed, client);
62        futures_util::stream::unfold(iter, |mut iter| {
63            async move { iter.next_async().await.map(|item| (item, iter)) }
64        })
65    }
66
67    /// Create a stream over the results of paginated results for with a client.
68    pub fn into_iter_async<'a, C, T>(
69        self,
70        client: &'a C,
71    ) -> impl Stream<Item = Result<T, ApiError<C::Error>>> + 'a
72    where
73        E: 'a,
74        T: DeserializeOwned + 'static,
75        C: AsyncClient + Sync,
76    {
77        let iter = LazilyPagedIter::new(self, client);
78        futures_util::stream::unfold(iter, |mut iter| {
79            async move { iter.next_async().await.map(|item| (item, iter)) }
80        })
81    }
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85enum KeysetPage {
86    First,
87    Next(Url),
88}
89
90#[derive(Debug, Clone, PartialEq, Eq)]
91enum Page {
92    Number(u64),
93    Keyset(KeysetPage),
94    Done,
95}
96
97impl Page {
98    fn next_url(&self) -> Option<&Url> {
99        if let Self::Keyset(KeysetPage::Next(url)) = self {
100            Some(url)
101        } else {
102            None
103        }
104    }
105
106    fn next_page(&mut self, next_url: Option<Url>) {
107        let next_page = match *self {
108            Self::Number(page) => Self::Number(page + 1),
109            Self::Keyset(_) => {
110                if let Some(next_url) = next_url {
111                    Self::Keyset(KeysetPage::Next(next_url))
112                } else {
113                    Self::Done
114                }
115            },
116            Self::Done => Self::Done,
117        };
118
119        *self = next_page;
120    }
121
122    /// Returns true if setting the page number was successful
123    fn set_page(&mut self, page_num: NonZeroU64) -> bool {
124        if let Self::Number(ref mut page) = self {
125            *page = page_num.into();
126            true
127        } else {
128            false
129        }
130    }
131
132    fn apply_to(&self, pairs: &mut url::form_urlencoded::Serializer<url::UrlQuery>) {
133        match self {
134            Self::Number(page) => {
135                let page_str = page.to_string();
136                pairs.append_pair("page", &page_str);
137            },
138            Self::Keyset(_) => {
139                pairs.append_pair("pagination", "keyset");
140            },
141            Self::Done => {
142                unreachable!("The `Done` state should not be applied to any url")
143            },
144        }
145    }
146}
147
148struct PageState {
149    total_results: usize,
150    next_page: Page,
151}
152
153struct LazilyPagedState<E> {
154    paged: Paged<E>,
155    page_state: RwLock<PageState>,
156}
157
158impl<E> LazilyPagedState<E>
159where
160    E: Pageable,
161{
162    fn new(paged: Paged<E>) -> Self {
163        let next_page = if paged.endpoint.use_keyset_pagination() {
164            Page::Keyset(KeysetPage::First)
165        } else {
166            Page::Number(1)
167        };
168
169        let page_state = PageState {
170            total_results: 0,
171            next_page,
172        };
173
174        Self {
175            paged,
176            page_state: RwLock::new(page_state),
177        }
178    }
179}
180
181impl<E> LazilyPagedState<E> {
182    fn next_page(&self, last_page_size: usize, next_url: Option<Url>) {
183        let mut page_state = self.page_state.write().expect("poisoned next_page");
184        page_state.total_results += last_page_size;
185
186        // Gitlab used to have issues returning paginated results; these have been fixed since, but
187        // if it is needed, the bug manifests as Gitlab returning *all* results instead of just the
188        // requested results. This can cause an infinite loop here if the number of total results
189        // is exactly equal to `per_page`.
190        if self
191            .paged
192            .pagination
193            .is_last_page(last_page_size, page_state.total_results)
194        {
195            page_state.next_page = Page::Done;
196        } else {
197            page_state.next_page.next_page(next_url);
198        }
199    }
200
201    /// Jumps to the [`page_num`] page, if possible.
202    fn set_page(&self, page_num: NonZeroU64) {
203        let mut page_state = self.page_state.write().expect("poisoned next_page");
204        if page_state.next_page.set_page(page_num) {
205            page_state.total_results = 0;
206        }
207    }
208}
209
210impl<E> LazilyPagedState<E>
211where
212    E: Endpoint,
213{
214    fn page_url<C>(&self, client: &C) -> Result<Option<Url>, ApiError<C::Error>>
215    where
216        C: RestClient,
217    {
218        let page_state = self.page_state.read().expect("poisoned next_page");
219        let next_page = &page_state.next_page;
220
221        if *next_page == Page::Done {
222            return Ok(None);
223        }
224
225        let url = if let Some(next_url) = next_page.next_url() {
226            next_url.clone()
227        } else {
228            let mut url = self
229                .paged
230                .endpoint
231                .url_base()
232                .endpoint_for(client, &self.paged.endpoint.endpoint())?;
233            self.paged.endpoint.parameters().add_to_url(&mut url);
234
235            let per_page = self.paged.pagination.page_limit();
236            let per_page_str = per_page.to_string();
237
238            {
239                let mut pairs = url.query_pairs_mut();
240                pairs.append_pair("per_page", &per_page_str);
241
242                next_page.apply_to(&mut pairs);
243            }
244
245            url
246        };
247
248        Ok(Some(url))
249    }
250
251    fn build_request<C>(&self, url: Url) -> Result<(RequestBuilder, Vec<u8>), ApiError<C::Error>>
252    where
253        C: RestClient,
254    {
255        let body = self.paged.endpoint.body()?;
256
257        let req = Request::builder()
258            .method(self.paged.endpoint.method())
259            .uri(query::url_to_http_uri(url));
260        Ok(if let Some((mime, data)) = body.as_ref() {
261            let req = req.header(header::CONTENT_TYPE, *mime);
262            (req, data.clone())
263        } else {
264            (req, Vec::new())
265        })
266    }
267
268    fn process_response<C, T>(&self, rsp: Response<Bytes>) -> Result<Vec<T>, ApiError<C::Error>>
269    where
270        E: Pageable,
271        T: DeserializeOwned,
272        C: RestClient,
273    {
274        let status = rsp.status();
275
276        let next_url = if self.paged.endpoint.use_keyset_pagination() {
277            link_header::next_page_from_headers(rsp.headers())?
278        } else {
279            None
280        };
281
282        let v = if let Ok(v) = serde_json::from_slice(rsp.body()) {
283            v
284        } else {
285            return Err(ApiError::server_error(status, rsp.body()));
286        };
287        if !status.is_success() {
288            return Err(ApiError::from_gitlab_with_status(status, v));
289        } else if status == http::StatusCode::MOVED_PERMANENTLY {
290            return Err(ApiError::moved_permanently(
291                rsp.headers().get(http::header::LOCATION),
292            ));
293        }
294
295        let page = serde_json::from_value::<Vec<T>>(v).map_err(ApiError::data_type::<Vec<T>>)?;
296        self.next_page(page.len(), next_url);
297
298        Ok(page)
299    }
300}
301
302impl<E, T, C> Query<Vec<T>, C> for LazilyPagedState<E>
303where
304    E: Endpoint,
305    E: Pageable,
306    T: DeserializeOwned,
307    T: DeserializeOwned,
308    C: Client,
309{
310    fn query(&self, client: &C) -> Result<Vec<T>, ApiError<C::Error>> {
311        let url = if let Some(url) = self.page_url(client)? {
312            url
313        } else {
314            // Just return empty data.
315            // XXX: Return a new kind of PaginationError here?
316            return Ok(Vec::new());
317        };
318        let (req, data) = self.build_request::<C>(url)?;
319        let rsp = client.rest(req, data)?;
320        self.process_response::<C, _>(rsp)
321    }
322}
323
324#[async_trait]
325impl<E, T, C> AsyncQuery<Vec<T>, C> for LazilyPagedState<E>
326where
327    E: Endpoint + Pageable + Sync,
328    T: DeserializeOwned + 'static,
329    C: AsyncClient + Sync,
330{
331    async fn query_async(&self, client: &C) -> Result<Vec<T>, ApiError<C::Error>> {
332        let url = if let Some(url) = self.page_url(client)? {
333            url
334        } else {
335            // Just return empty data.
336            // XXX: Return a new kind of PaginationError here?
337            return Ok(Vec::new());
338        };
339        let (req, data) = self.build_request::<C>(url)?;
340        let rsp = client.rest_async(req, data).await?;
341        self.process_response::<C, _>(rsp)
342    }
343}
344
345/// An iterator which yields items from a paginated result.
346///
347/// The pages are fetched lazily, so endpoints not using keyset pagination may observe duplicate or
348/// missing items (depending on sorting) if new objects are created or removed while iterating.
349pub struct LazilyPagedIter<'a, E, C, T> {
350    client: &'a C,
351    state: LazilyPagedState<E>,
352    current_page: Vec<T>,
353}
354
355impl<'a, E, C, T> LazilyPagedIter<'a, E, C, T>
356where
357    E: Endpoint,
358    E: Pageable,
359{
360    fn new(paged: Paged<E>, client: &'a C) -> Self {
361        let state = LazilyPagedState::new(paged);
362
363        Self {
364            client,
365            state,
366            current_page: Vec::new(),
367        }
368    }
369
370    /// Invalidates the [`Page`] that is in memory and sets the page to fetch from to `page_number`.
371    ///
372    /// Note that page numbers are 1-indexed.
373    pub fn set_page_number(mut self, page_number: NonZeroU64) -> Self {
374        self.current_page.clear();
375        self.state.set_page(page_number);
376        self
377    }
378}
379
380impl<E, C, T> Iterator for LazilyPagedIter<'_, E, C, T>
381where
382    E: Endpoint,
383    E: Pageable,
384    T: DeserializeOwned,
385    C: Client,
386{
387    type Item = Result<T, ApiError<C::Error>>;
388
389    fn next(&mut self) -> Option<Self::Item> {
390        if self.current_page.is_empty() {
391            self.current_page = match self.state.query(self.client) {
392                Ok(data) => data,
393                Err(err) => return Some(Err(err)),
394            };
395
396            // Reverse the page order so that `.pop()` works.
397            self.current_page.reverse();
398        }
399
400        self.current_page.pop().map(Ok)
401    }
402}
403
404// Instead of implementing Stream directly, we implement this "async" next method and use it with
405// `stream::unfold` to return an anonymous Stream impl.
406impl<'a, E, C, T> LazilyPagedIter<'a, E, C, T>
407where
408    E: Endpoint + Pageable + Sync,
409    T: DeserializeOwned + 'static,
410    C: AsyncClient + Sync,
411{
412    async fn next_async(&mut self) -> Option<Result<T, ApiError<C::Error>>> {
413        if self.current_page.is_empty() {
414            self.current_page = match self.state.query_async(self.client).await {
415                Ok(data) => data,
416                Err(err) => return Some(Err(err)),
417            };
418
419            // Reverse the page order so that `.pop()` works.
420            self.current_page.reverse();
421        }
422
423        self.current_page.pop().map(Ok)
424    }
425
426    /// Converts a "normal iterator" into an async iterator
427    pub fn into_async(self) -> impl Stream<Item = Result<T, ApiError<C::Error>>> + 'a
428    where
429        E: 'a,
430    {
431        futures_util::stream::unfold(self, |mut iter| {
432            async move { iter.next_async().await.map(|item| (item, iter)) }
433        })
434    }
435}
436
437#[cfg(test)]
438mod tests {
439    use std::num::NonZeroU64;
440
441    use futures_util::TryStreamExt;
442    use http::StatusCode;
443    use serde::{Deserialize, Serialize};
444    use serde_json::json;
445    use url::Url;
446
447    use crate::api::endpoint_prelude::*;
448    use crate::api::{self, ApiError, Pagination};
449    use crate::test::client::{ExpectedUrl, PagedTestClient, SingleTestClient};
450
451    #[test]
452    fn test_page_next_url() {
453        use super::{KeysetPage, Page};
454
455        let url = Url::parse("https://example.com/1").unwrap();
456        let page_number = Page::Number(1);
457        let page_keyset_first = Page::Keyset(KeysetPage::First);
458        let page_keyset_next = Page::Keyset(KeysetPage::Next(url.clone()));
459        let page_done = Page::Done;
460
461        assert_eq!(page_number.next_url(), None);
462        assert_eq!(page_keyset_first.next_url(), None);
463        assert_eq!(page_keyset_next.next_url(), Some(&url));
464        assert_eq!(page_done.next_url(), None);
465    }
466
467    #[test]
468    fn test_page_next_page() {
469        use super::{KeysetPage, Page};
470
471        let page_number = Page::Number(1);
472        let page_number_next = Page::Number(2);
473        let url = Url::parse("https://example.com/1").unwrap();
474        let other_url = Url::parse("https://example.com/2").unwrap();
475        let page_keyset = Page::Keyset(KeysetPage::First);
476        let page_done = Page::Done;
477        let page_keyset_next_url = Page::Keyset(KeysetPage::Next(url.clone()));
478        let page_keyset_next_other_url = Page::Keyset(KeysetPage::Next(other_url.clone()));
479
480        let items = &[
481            (&page_number, None, &page_number_next),
482            (&page_number, Some(&url), &page_number_next),
483            (&page_keyset, None, &page_done),
484            (&page_keyset, Some(&url), &page_keyset_next_url),
485            (&page_keyset_next_url, None, &page_done),
486            (
487                &page_keyset_next_url,
488                Some(&other_url),
489                &page_keyset_next_other_url,
490            ),
491            (&page_done, None, &page_done),
492            (&page_done, Some(&url), &page_done),
493        ];
494
495        for (p, u, n) in items {
496            let mut page = (*p).clone();
497            page.next_page(u.cloned());
498            assert_eq!(&page, *n);
499        }
500    }
501
502    #[test]
503    fn test_page_apply_to() {
504        use super::{KeysetPage, Page};
505
506        let page_number = Page::Number(1);
507        let page_keyset = Page::Keyset(KeysetPage::First);
508        let url = Url::parse("https://example.com/1").unwrap();
509
510        {
511            let mut url = url.clone();
512            {
513                let pairs = url.query_pairs();
514                assert_eq!(pairs.count(), 0);
515            }
516            {
517                let mut pairs = url.query_pairs_mut();
518                page_number.apply_to(&mut pairs);
519            }
520            itertools::assert_equal(url.query_pairs(), [("page".into(), "1".into())]);
521        }
522
523        {
524            let mut url = url.clone();
525            {
526                let pairs = url.query_pairs();
527                assert_eq!(pairs.count(), 0);
528            }
529            {
530                let mut pairs = url.query_pairs_mut();
531                page_keyset.apply_to(&mut pairs);
532            }
533            itertools::assert_equal(url.query_pairs(), [("pagination".into(), "keyset".into())]);
534        }
535    }
536
537    fn test_lazily_paged_state_next_page_instance<E>(
538        endpoint: &E,
539        last_page_size: usize,
540        next_url: Option<&Url>,
541        expected: super::Page,
542    ) where
543        E: Endpoint,
544        E: Pageable,
545    {
546        use super::LazilyPagedState;
547
548        let paged = api::paged(endpoint, Pagination::All);
549        let state = LazilyPagedState::new(paged);
550
551        state.next_page(last_page_size, next_url.cloned());
552
553        let page_state = state.page_state.read().expect("poisoned next_page");
554        assert_eq!(page_state.total_results, last_page_size);
555        assert_eq!(page_state.next_page, expected);
556    }
557
558    #[test]
559    fn test_lazily_paged_state_next_page() {
560        use super::{KeysetPage, Page};
561
562        const MAX_PAGE_SIZE: usize = 100;
563        const SMALL_PAGE_SIZE: usize = 10;
564
565        let endpoint = api::deploy_keys::DeployKeys::builder().build().unwrap();
566        let endpoint_keyset = api::users::Users::builder().build().unwrap();
567        let url = Url::parse("https://example.com/1").unwrap();
568
569        test_lazily_paged_state_next_page_instance(&endpoint, MAX_PAGE_SIZE, None, Page::Number(2));
570        test_lazily_paged_state_next_page_instance(&endpoint, SMALL_PAGE_SIZE, None, Page::Done);
571        test_lazily_paged_state_next_page_instance(
572            &endpoint,
573            MAX_PAGE_SIZE,
574            Some(&url),
575            Page::Number(2),
576        );
577        test_lazily_paged_state_next_page_instance(
578            &endpoint,
579            SMALL_PAGE_SIZE,
580            Some(&url),
581            Page::Done,
582        );
583
584        test_lazily_paged_state_next_page_instance(
585            &endpoint_keyset,
586            MAX_PAGE_SIZE,
587            None,
588            Page::Done,
589        );
590        test_lazily_paged_state_next_page_instance(
591            &endpoint_keyset,
592            SMALL_PAGE_SIZE,
593            None,
594            Page::Done,
595        );
596        test_lazily_paged_state_next_page_instance(
597            &endpoint_keyset,
598            MAX_PAGE_SIZE,
599            Some(&url),
600            Page::Keyset(KeysetPage::Next(url.clone())),
601        );
602        test_lazily_paged_state_next_page_instance(
603            &endpoint_keyset,
604            SMALL_PAGE_SIZE,
605            Some(&url),
606            Page::Done,
607        );
608    }
609
610    #[derive(Debug, Default)]
611    struct Dummy {
612        with_keyset: bool,
613    }
614
615    impl Endpoint for Dummy {
616        fn method(&self) -> Method {
617            Method::GET
618        }
619
620        fn endpoint(&self) -> Cow<'static, str> {
621            "paged_dummy".into()
622        }
623    }
624
625    impl Pageable for Dummy {
626        fn use_keyset_pagination(&self) -> bool {
627            self.with_keyset
628        }
629    }
630
631    #[derive(Debug, Deserialize, Serialize)]
632    struct DummyResult {
633        value: u8,
634    }
635
636    #[test]
637    fn test_gitlab_non_json_response() {
638        let endpoint = ExpectedUrl::builder()
639            .endpoint("paged_dummy")
640            .add_query_params(&[("page", "1"), ("per_page", "100")])
641            .build()
642            .unwrap();
643        let client = SingleTestClient::new_raw(endpoint, "not json");
644        let endpoint = Dummy::default();
645
646        let res: Result<Vec<DummyResult>, _> = api::paged(endpoint, Pagination::All)
647            .iter(&client)
648            .collect();
649        let err = res.unwrap_err();
650        if let ApiError::GitlabService {
651            status, ..
652        } = err
653        {
654            assert_eq!(status, StatusCode::OK);
655        } else {
656            panic!("unexpected error: {}", err);
657        }
658    }
659
660    #[tokio::test]
661    async fn test_gitlab_non_json_response_async() {
662        let endpoint = ExpectedUrl::builder()
663            .endpoint("paged_dummy")
664            .add_query_params(&[("page", "1"), ("per_page", "100")])
665            .build()
666            .unwrap();
667        let client = SingleTestClient::new_raw(endpoint, "not json");
668        let endpoint = Dummy::default();
669
670        let res: Result<Vec<DummyResult>, _> = api::paged(endpoint, Pagination::All)
671            .iter_async(&client)
672            .try_collect()
673            .await;
674        let err = res.unwrap_err();
675        if let ApiError::GitlabService {
676            status, ..
677        } = err
678        {
679            assert_eq!(status, StatusCode::OK);
680        } else {
681            panic!("unexpected error: {}", err);
682        }
683    }
684
685    #[test]
686    fn test_gitlab_error_bad_json() {
687        let endpoint = ExpectedUrl::builder()
688            .endpoint("paged_dummy")
689            .add_query_params(&[("page", "1"), ("per_page", "100")])
690            .status(StatusCode::NOT_FOUND)
691            .build()
692            .unwrap();
693        let client = SingleTestClient::new_raw(endpoint, "");
694        let endpoint = Dummy::default();
695
696        let res: Result<Vec<DummyResult>, _> = api::paged(endpoint, Pagination::All)
697            .iter(&client)
698            .collect();
699        let err = res.unwrap_err();
700        if let ApiError::GitlabService {
701            status, ..
702        } = err
703        {
704            assert_eq!(status, StatusCode::NOT_FOUND);
705        } else {
706            panic!("unexpected error: {}", err);
707        }
708    }
709
710    #[tokio::test]
711    async fn test_gitlab_error_bad_json_async() {
712        let endpoint = ExpectedUrl::builder()
713            .endpoint("paged_dummy")
714            .add_query_params(&[("page", "1"), ("per_page", "100")])
715            .status(StatusCode::NOT_FOUND)
716            .build()
717            .unwrap();
718        let client = SingleTestClient::new_raw(endpoint, "");
719        let endpoint = Dummy::default();
720
721        let res: Result<Vec<DummyResult>, _> = api::paged(endpoint, Pagination::All)
722            .iter_async(&client)
723            .try_collect()
724            .await;
725        let err = res.unwrap_err();
726        if let ApiError::GitlabService {
727            status, ..
728        } = err
729        {
730            assert_eq!(status, StatusCode::NOT_FOUND);
731        } else {
732            panic!("unexpected error: {}", err);
733        }
734    }
735
736    #[test]
737    fn test_gitlab_error_detection() {
738        let endpoint = ExpectedUrl::builder()
739            .endpoint("paged_dummy")
740            .add_query_params(&[("page", "1"), ("per_page", "100")])
741            .status(StatusCode::NOT_FOUND)
742            .build()
743            .unwrap();
744        let client = SingleTestClient::new_json(
745            endpoint,
746            &json!({
747                "message": "dummy error message",
748            }),
749        );
750        let endpoint = Dummy::default();
751
752        let res: Result<Vec<DummyResult>, _> = api::paged(endpoint, Pagination::All)
753            .iter(&client)
754            .collect();
755        let err = res.unwrap_err();
756        if let ApiError::GitlabWithStatus {
757            status,
758            msg,
759        } = err
760        {
761            assert_eq!(status, StatusCode::NOT_FOUND);
762            assert_eq!(msg, "dummy error message");
763        } else {
764            panic!("unexpected error: {}", err);
765        }
766    }
767
768    #[tokio::test]
769    async fn test_gitlab_error_detection_async() {
770        let endpoint = ExpectedUrl::builder()
771            .endpoint("paged_dummy")
772            .add_query_params(&[("page", "1"), ("per_page", "100")])
773            .status(StatusCode::NOT_FOUND)
774            .build()
775            .unwrap();
776        let client = SingleTestClient::new_json(
777            endpoint,
778            &json!({
779                "message": "dummy error message",
780            }),
781        );
782        let endpoint = Dummy::default();
783
784        let res: Result<Vec<DummyResult>, _> = api::paged(endpoint, Pagination::All)
785            .iter_async(&client)
786            .try_collect()
787            .await;
788        let err = res.unwrap_err();
789        if let ApiError::GitlabWithStatus {
790            status,
791            msg,
792        } = err
793        {
794            assert_eq!(status, StatusCode::NOT_FOUND);
795            assert_eq!(msg, "dummy error message");
796        } else {
797            panic!("unexpected error: {}", err);
798        }
799    }
800
801    #[test]
802    fn test_gitlab_error_detection_legacy() {
803        let endpoint = ExpectedUrl::builder()
804            .endpoint("paged_dummy")
805            .add_query_params(&[("page", "1"), ("per_page", "100")])
806            .status(StatusCode::NOT_FOUND)
807            .build()
808            .unwrap();
809        let client = SingleTestClient::new_json(
810            endpoint,
811            &json!({
812                "error": "dummy error message",
813            }),
814        );
815        let endpoint = Dummy::default();
816
817        let res: Result<Vec<DummyResult>, _> = api::paged(endpoint, Pagination::All)
818            .iter(&client)
819            .collect();
820        let err = res.unwrap_err();
821        if let ApiError::GitlabWithStatus {
822            status,
823            msg,
824        } = err
825        {
826            assert_eq!(status, StatusCode::NOT_FOUND);
827            assert_eq!(msg, "dummy error message");
828        } else {
829            panic!("unexpected error: {}", err);
830        }
831    }
832
833    #[tokio::test]
834    async fn test_gitlab_error_detection_legacy_async() {
835        let endpoint = ExpectedUrl::builder()
836            .endpoint("paged_dummy")
837            .add_query_params(&[("page", "1"), ("per_page", "100")])
838            .status(StatusCode::NOT_FOUND)
839            .build()
840            .unwrap();
841        let client = SingleTestClient::new_json(
842            endpoint,
843            &json!({
844                "error": "dummy error message",
845            }),
846        );
847        let endpoint = Dummy::default();
848
849        let res: Result<Vec<DummyResult>, _> = api::paged(endpoint, Pagination::All)
850            .iter_async(&client)
851            .try_collect()
852            .await;
853        let err = res.unwrap_err();
854        if let ApiError::GitlabWithStatus {
855            status,
856            msg,
857        } = err
858        {
859            assert_eq!(status, StatusCode::NOT_FOUND);
860            assert_eq!(msg, "dummy error message");
861        } else {
862            panic!("unexpected error: {}", err);
863        }
864    }
865
866    #[test]
867    fn test_gitlab_error_detection_unknown() {
868        let endpoint = ExpectedUrl::builder()
869            .endpoint("paged_dummy")
870            .add_query_params(&[("page", "1"), ("per_page", "100")])
871            .status(StatusCode::NOT_FOUND)
872            .build()
873            .unwrap();
874        let err_obj = json!({
875            "bogus": "dummy error message",
876        });
877        let client = SingleTestClient::new_json(endpoint, &err_obj);
878        let endpoint = Dummy::default();
879
880        let res: Result<Vec<DummyResult>, _> = api::paged(endpoint, Pagination::All)
881            .iter(&client)
882            .collect();
883        let err = res.unwrap_err();
884        if let ApiError::GitlabUnrecognizedWithStatus {
885            status,
886            obj,
887        } = err
888        {
889            assert_eq!(status, StatusCode::NOT_FOUND);
890            assert_eq!(obj, err_obj);
891        } else {
892            panic!("unexpected error: {}", err);
893        }
894    }
895
896    #[tokio::test]
897    async fn test_gitlab_error_detection_unknown_async() {
898        let endpoint = ExpectedUrl::builder()
899            .endpoint("paged_dummy")
900            .add_query_params(&[("page", "1"), ("per_page", "100")])
901            .status(StatusCode::NOT_FOUND)
902            .build()
903            .unwrap();
904        let err_obj = json!({
905            "bogus": "dummy error message",
906        });
907        let client = SingleTestClient::new_json(endpoint, &err_obj);
908        let endpoint = Dummy::default();
909
910        let res: Result<Vec<DummyResult>, _> = api::paged(endpoint, Pagination::All)
911            .iter_async(&client)
912            .try_collect()
913            .await;
914        let err = res.unwrap_err();
915        if let ApiError::GitlabUnrecognizedWithStatus {
916            status,
917            obj,
918        } = err
919        {
920            assert_eq!(status, StatusCode::NOT_FOUND);
921            assert_eq!(obj, err_obj);
922        } else {
923            panic!("unexpected error: {}", err);
924        }
925    }
926
927    #[test]
928    fn test_pagination_limit() {
929        let endpoint = ExpectedUrl::builder()
930            .endpoint("paged_dummy")
931            .paginated(true)
932            .build()
933            .unwrap();
934        let client = PagedTestClient::new_raw(
935            endpoint,
936            (0..=255).map(|value| {
937                DummyResult {
938                    value,
939                }
940            }),
941        );
942        let query = Dummy {
943            with_keyset: false,
944        };
945
946        let res: Vec<DummyResult> = api::paged(query, Pagination::Limit(25))
947            .iter(&client)
948            .collect::<Result<Vec<_>, _>>()
949            .unwrap();
950        assert_eq!(res.len(), 25);
951        for (i, value) in res.iter().enumerate() {
952            assert_eq!(value.value, i as u8);
953        }
954    }
955
956    #[tokio::test]
957    async fn test_pagination_limit_async() {
958        let endpoint = ExpectedUrl::builder()
959            .endpoint("paged_dummy")
960            .paginated(true)
961            .build()
962            .unwrap();
963        let client = PagedTestClient::new_raw(
964            endpoint,
965            (0..=255).map(|value| {
966                DummyResult {
967                    value,
968                }
969            }),
970        );
971        let query = Dummy {
972            with_keyset: false,
973        };
974
975        let res: Vec<DummyResult> = api::paged(query, Pagination::Limit(25))
976            .iter_async(&client)
977            .try_collect()
978            .await
979            .unwrap();
980        assert_eq!(res.len(), 25);
981        for (i, value) in res.iter().enumerate() {
982            assert_eq!(value.value, i as u8);
983        }
984    }
985
986    #[test]
987    fn test_pagination_set_page() {
988        let endpoint = ExpectedUrl::builder()
989            .endpoint("paged_dummy")
990            .paginated(true)
991            .build()
992            .unwrap();
993        let client = PagedTestClient::new_raw(
994            endpoint,
995            (0..=255).map(|value| {
996                DummyResult {
997                    value,
998                }
999            }),
1000        );
1001        let query = Dummy {
1002            with_keyset: false,
1003        };
1004
1005        let res: Vec<DummyResult> = api::paged(query, Pagination::Limit(25))
1006            .iter(&client)
1007            .set_page_number(NonZeroU64::new(4).unwrap())
1008            .collect::<Result<Vec<_>, _>>()
1009            .unwrap();
1010        assert_eq!(res.len(), 25);
1011        for (i, value) in res.iter().enumerate() {
1012            assert_eq!(value.value, (i + 75) as u8);
1013        }
1014    }
1015
1016    #[tokio::test]
1017    async fn test_pagination_set_page_async() {
1018        let endpoint = ExpectedUrl::builder()
1019            .endpoint("paged_dummy")
1020            .paginated(true)
1021            .build()
1022            .unwrap();
1023        let client = PagedTestClient::new_raw(
1024            endpoint,
1025            (0..=255).map(|value| {
1026                DummyResult {
1027                    value,
1028                }
1029            }),
1030        );
1031        let query = Dummy {
1032            with_keyset: false,
1033        };
1034
1035        let res: Vec<DummyResult> = api::paged(query, Pagination::Limit(25))
1036            .iter(&client)
1037            .set_page_number(NonZeroU64::new(4).unwrap())
1038            .into_async()
1039            .try_collect()
1040            .await
1041            .unwrap();
1042        assert_eq!(res.len(), 25);
1043        for (i, value) in res.iter().enumerate() {
1044            assert_eq!(value.value, (i + 75) as u8);
1045        }
1046    }
1047
1048    #[test]
1049    fn test_pagination_all() {
1050        let endpoint = ExpectedUrl::builder()
1051            .endpoint("paged_dummy")
1052            .paginated(true)
1053            .build()
1054            .unwrap();
1055        let client = PagedTestClient::new_raw(
1056            endpoint,
1057            (0..=255).map(|value| {
1058                DummyResult {
1059                    value,
1060                }
1061            }),
1062        );
1063        let query = Dummy::default();
1064
1065        let res: Vec<DummyResult> = api::paged(query, Pagination::All)
1066            .iter(&client)
1067            .collect::<Result<Vec<_>, _>>()
1068            .unwrap();
1069        assert_eq!(res.len(), 256);
1070        for (i, value) in res.iter().enumerate() {
1071            assert_eq!(value.value, i as u8);
1072        }
1073    }
1074
1075    #[tokio::test]
1076    async fn test_pagination_all_async() {
1077        let endpoint = ExpectedUrl::builder()
1078            .endpoint("paged_dummy")
1079            .paginated(true)
1080            .build()
1081            .unwrap();
1082        let client = PagedTestClient::new_raw(
1083            endpoint,
1084            (0..=255).map(|value| {
1085                DummyResult {
1086                    value,
1087                }
1088            }),
1089        );
1090        let query = Dummy::default();
1091
1092        let res: Vec<DummyResult> = api::paged(query, Pagination::All)
1093            .iter_async(&client)
1094            .try_collect()
1095            .await
1096            .unwrap();
1097        assert_eq!(res.len(), 256);
1098        for (i, value) in res.iter().enumerate() {
1099            assert_eq!(value.value, i as u8);
1100        }
1101    }
1102
1103    #[test]
1104    fn test_keyset_pagination_limit() {
1105        let endpoint = ExpectedUrl::builder()
1106            .endpoint("paged_dummy")
1107            .paginated(true)
1108            .build()
1109            .unwrap();
1110        let client = PagedTestClient::new_raw(
1111            endpoint,
1112            (0..=255).map(|value| {
1113                DummyResult {
1114                    value,
1115                }
1116            }),
1117        );
1118        let query = Dummy {
1119            with_keyset: true,
1120        };
1121
1122        let res: Vec<DummyResult> = api::paged(query, Pagination::Limit(25))
1123            .iter(&client)
1124            .collect::<Result<Vec<_>, _>>()
1125            .unwrap();
1126        assert_eq!(res.len(), 25);
1127        for (i, value) in res.iter().enumerate() {
1128            assert_eq!(value.value, i as u8);
1129        }
1130    }
1131
1132    #[tokio::test]
1133    async fn test_keyset_pagination_limit_async() {
1134        let endpoint = ExpectedUrl::builder()
1135            .endpoint("paged_dummy")
1136            .paginated(true)
1137            .build()
1138            .unwrap();
1139        let client = PagedTestClient::new_raw(
1140            endpoint,
1141            (0..=255).map(|value| {
1142                DummyResult {
1143                    value,
1144                }
1145            }),
1146        );
1147        let query = Dummy {
1148            with_keyset: true,
1149        };
1150
1151        let res: Vec<DummyResult> = api::paged(query, Pagination::Limit(25))
1152            .iter_async(&client)
1153            .try_collect()
1154            .await
1155            .unwrap();
1156        assert_eq!(res.len(), 25);
1157        for (i, value) in res.iter().enumerate() {
1158            assert_eq!(value.value, i as u8);
1159        }
1160    }
1161
1162    #[test]
1163    fn test_keyset_pagination_all() {
1164        let endpoint = ExpectedUrl::builder()
1165            .endpoint("paged_dummy")
1166            .paginated(true)
1167            .build()
1168            .unwrap();
1169        let client = PagedTestClient::new_raw(
1170            endpoint,
1171            (0..=255).map(|value| {
1172                DummyResult {
1173                    value,
1174                }
1175            }),
1176        );
1177        let query = Dummy {
1178            with_keyset: true,
1179        };
1180
1181        let res: Vec<DummyResult> = api::paged(query, Pagination::All)
1182            .iter(&client)
1183            .collect::<Result<Vec<_>, _>>()
1184            .unwrap();
1185        assert_eq!(res.len(), 256);
1186        for (i, value) in res.iter().enumerate() {
1187            assert_eq!(value.value, i as u8);
1188        }
1189    }
1190
1191    #[tokio::test]
1192    async fn test_keyset_pagination_all_async() {
1193        let endpoint = ExpectedUrl::builder()
1194            .endpoint("paged_dummy")
1195            .paginated(true)
1196            .build()
1197            .unwrap();
1198        let client = PagedTestClient::new_raw(
1199            endpoint,
1200            (0..=255).map(|value| {
1201                DummyResult {
1202                    value,
1203                }
1204            }),
1205        );
1206        let query = Dummy {
1207            with_keyset: true,
1208        };
1209
1210        let res: Vec<DummyResult> = api::paged(query, Pagination::All)
1211            .iter_async(&client)
1212            .try_collect::<Vec<_>>()
1213            .await
1214            .unwrap();
1215        assert_eq!(res.len(), 256);
1216        for (i, value) in res.iter().enumerate() {
1217            assert_eq!(value.value, i as u8);
1218        }
1219    }
1220
1221    #[test]
1222    fn test_keyset_pagination_missing_header() {
1223        let endpoint = ExpectedUrl::builder()
1224            .endpoint("paged_dummy")
1225            .add_query_params(&[("pagination", "keyset"), ("per_page", "100")])
1226            .build()
1227            .unwrap();
1228        let data: Vec<_> = (0..=255)
1229            .map(|value| {
1230                DummyResult {
1231                    value,
1232                }
1233            })
1234            .collect();
1235        let client = SingleTestClient::new_raw(endpoint, serde_json::to_vec(&data).unwrap());
1236        let query = Dummy {
1237            with_keyset: true,
1238        };
1239
1240        let res: Vec<DummyResult> = api::paged(query, Pagination::Limit(300))
1241            .iter(&client)
1242            .collect::<Result<Vec<_>, _>>()
1243            .unwrap();
1244        assert_eq!(res.len(), 256);
1245        for (i, value) in res.iter().enumerate() {
1246            assert_eq!(value.value, i as u8);
1247        }
1248    }
1249
1250    #[tokio::test]
1251    async fn test_keyset_pagination_missing_header_async() {
1252        let endpoint = ExpectedUrl::builder()
1253            .endpoint("paged_dummy")
1254            .add_query_params(&[("pagination", "keyset"), ("per_page", "100")])
1255            .build()
1256            .unwrap();
1257        let data: Vec<_> = (0..=255)
1258            .map(|value| {
1259                DummyResult {
1260                    value,
1261                }
1262            })
1263            .collect();
1264        let client = SingleTestClient::new_raw(endpoint, serde_json::to_vec(&data).unwrap());
1265        let query = Dummy {
1266            with_keyset: true,
1267        };
1268
1269        let res: Vec<DummyResult> = api::paged(query, Pagination::Limit(300))
1270            .iter_async(&client)
1271            .try_collect()
1272            .await
1273            .unwrap();
1274        assert_eq!(res.len(), 256);
1275        for (i, value) in res.iter().enumerate() {
1276            assert_eq!(value.value, i as u8);
1277        }
1278    }
1279}