Skip to main content

atlassian_cli_api/
pagination.rs

1use crate::error::Result;
2use async_trait::async_trait;
3use futures::stream::{Stream, StreamExt};
4use serde::{Deserialize, Serialize};
5use std::pin::Pin;
6use tracing::debug;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct PagedResponse<T> {
10    pub values: Vec<T>,
11    #[serde(rename = "startAt")]
12    pub start_at: Option<u32>,
13    #[serde(rename = "maxResults")]
14    pub max_results: Option<u32>,
15    pub total: Option<u32>,
16    #[serde(rename = "isLast")]
17    pub is_last: Option<bool>,
18}
19
20impl<T> PagedResponse<T> {
21    pub fn has_next(&self) -> bool {
22        if let Some(is_last) = self.is_last {
23            return !is_last;
24        }
25
26        if let (Some(start), Some(max), Some(total)) = (self.start_at, self.max_results, self.total)
27        {
28            return start + max < total;
29        }
30
31        false
32    }
33
34    pub fn next_start(&self) -> Option<u32> {
35        if !self.has_next() {
36            return None;
37        }
38
39        match (self.start_at, self.max_results) {
40            (Some(start), Some(max)) => Some(start + max),
41            _ => None,
42        }
43    }
44}
45
46#[async_trait]
47pub trait Paginator<T>: Sync {
48    async fn fetch_page(&self, start_at: u32, max_results: u32) -> Result<PagedResponse<T>>;
49
50    async fn fetch_all(&self, max_results: u32) -> Result<Vec<T>>
51    where
52        T: Send,
53    {
54        let mut all_items = Vec::new();
55        let mut start_at = 0;
56
57        loop {
58            debug!(start_at, max_results, "Fetching page");
59            let page = self.fetch_page(start_at, max_results).await?;
60            let item_count = page.values.len();
61            let has_next = page.has_next();
62            let next_start = page.next_start();
63
64            all_items.extend(page.values);
65
66            if !has_next || item_count == 0 {
67                debug!(total_items = all_items.len(), "Finished pagination");
68                break;
69            }
70
71            start_at = next_start.unwrap_or(start_at + max_results);
72        }
73
74        Ok(all_items)
75    }
76
77    fn stream<'a>(
78        &'a self,
79        max_results: u32,
80    ) -> Pin<Box<dyn Stream<Item = Result<Vec<T>>> + Send + 'a>>
81    where
82        T: Send + 'a,
83    {
84        Box::pin(async_stream::stream! {
85            let mut start_at = 0;
86
87            loop {
88                debug!(start_at, max_results, "Fetching page in stream");
89                let page = self.fetch_page(start_at, max_results).await;
90
91                match page {
92                    Ok(page) => {
93                        let item_count = page.values.len();
94                        let has_next = page.has_next();
95                        let next_start = page.next_start();
96
97                        yield Ok(page.values);
98
99                        if !has_next || item_count == 0 {
100                            break;
101                        }
102
103                        start_at = next_start.unwrap_or(start_at + max_results);
104                    }
105                    Err(err) => {
106                        yield Err(err);
107                        break;
108                    }
109                }
110            }
111        })
112    }
113}
114
115pub async fn collect_pages<T, P: Paginator<T>>(
116    paginator: &P,
117    max_results: u32,
118    limit: Option<usize>,
119) -> Result<Vec<T>>
120where
121    T: Send,
122{
123    let mut stream = paginator.stream(max_results);
124    let mut all_items = Vec::new();
125
126    while let Some(result) = stream.next().await {
127        let items = result?;
128        all_items.extend(items);
129
130        if let Some(limit) = limit {
131            if all_items.len() >= limit {
132                all_items.truncate(limit);
133                break;
134            }
135        }
136    }
137
138    Ok(all_items)
139}