atlassian_cli_api/
pagination.rs1use crate::error::Result;
2use async_trait::async_trait;
3use futures::stream::{Stream, StreamExt};
4use serde::{Deserialize, Serialize};
5use std::pin::Pin;
6use tracing::debug;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct PagedResponse<T> {
10 pub values: Vec<T>,
11 #[serde(rename = "startAt")]
12 pub start_at: Option<u32>,
13 #[serde(rename = "maxResults")]
14 pub max_results: Option<u32>,
15 pub total: Option<u32>,
16 #[serde(rename = "isLast")]
17 pub is_last: Option<bool>,
18}
19
20impl<T> PagedResponse<T> {
21 pub fn has_next(&self) -> bool {
22 if let Some(is_last) = self.is_last {
23 return !is_last;
24 }
25
26 if let (Some(start), Some(max), Some(total)) = (self.start_at, self.max_results, self.total)
27 {
28 return start + max < total;
29 }
30
31 false
32 }
33
34 pub fn next_start(&self) -> Option<u32> {
35 if !self.has_next() {
36 return None;
37 }
38
39 match (self.start_at, self.max_results) {
40 (Some(start), Some(max)) => Some(start + max),
41 _ => None,
42 }
43 }
44}
45
46#[async_trait]
47pub trait Paginator<T>: Sync {
48 async fn fetch_page(&self, start_at: u32, max_results: u32) -> Result<PagedResponse<T>>;
49
50 async fn fetch_all(&self, max_results: u32) -> Result<Vec<T>>
51 where
52 T: Send,
53 {
54 let mut all_items = Vec::new();
55 let mut start_at = 0;
56
57 loop {
58 debug!(start_at, max_results, "Fetching page");
59 let page = self.fetch_page(start_at, max_results).await?;
60 let item_count = page.values.len();
61 let has_next = page.has_next();
62 let next_start = page.next_start();
63
64 all_items.extend(page.values);
65
66 if !has_next || item_count == 0 {
67 debug!(total_items = all_items.len(), "Finished pagination");
68 break;
69 }
70
71 start_at = next_start.unwrap_or(start_at + max_results);
72 }
73
74 Ok(all_items)
75 }
76
77 fn stream<'a>(
78 &'a self,
79 max_results: u32,
80 ) -> Pin<Box<dyn Stream<Item = Result<Vec<T>>> + Send + 'a>>
81 where
82 T: Send + 'a,
83 {
84 Box::pin(async_stream::stream! {
85 let mut start_at = 0;
86
87 loop {
88 debug!(start_at, max_results, "Fetching page in stream");
89 let page = self.fetch_page(start_at, max_results).await;
90
91 match page {
92 Ok(page) => {
93 let item_count = page.values.len();
94 let has_next = page.has_next();
95 let next_start = page.next_start();
96
97 yield Ok(page.values);
98
99 if !has_next || item_count == 0 {
100 break;
101 }
102
103 start_at = next_start.unwrap_or(start_at + max_results);
104 }
105 Err(err) => {
106 yield Err(err);
107 break;
108 }
109 }
110 }
111 })
112 }
113}
114
115pub async fn collect_pages<T, P: Paginator<T>>(
116 paginator: &P,
117 max_results: u32,
118 limit: Option<usize>,
119) -> Result<Vec<T>>
120where
121 T: Send,
122{
123 let mut stream = paginator.stream(max_results);
124 let mut all_items = Vec::new();
125
126 while let Some(result) = stream.next().await {
127 let items = result?;
128 all_items.extend(items);
129
130 if let Some(limit) = limit {
131 if all_items.len() >= limit {
132 all_items.truncate(limit);
133 break;
134 }
135 }
136 }
137
138 Ok(all_items)
139}