1use std::future::Future;
4
5use futures::Stream;
6use futures::TryStreamExt;
7use futures::stream;
8use serde::Deserialize;
9
10use crate::error::Error;
11
12#[derive(Debug, Clone)]
17pub struct Page<T> {
18 pub items: Vec<T>,
20 pub has_next_page: bool,
22 pub end_cursor: Option<String>,
24 pub has_previous_page: bool,
26 pub start_cursor: Option<String>,
28}
29
30impl<T> Default for Page<T> {
31 fn default() -> Self {
32 Self {
33 items: Vec::new(),
34 has_next_page: false,
35 end_cursor: None,
36 has_previous_page: false,
37 start_cursor: None,
38 }
39 }
40}
41
42#[derive(Debug, Clone, Default, Deserialize)]
47#[serde(rename_all = "camelCase", default)]
48pub struct PageInfo {
49 pub has_next_page: bool,
51 pub end_cursor: Option<String>,
53 pub has_previous_page: bool,
55 pub start_cursor: Option<String>,
57}
58
59pub fn paginate<T, F, Fut>(fetch_page: F) -> impl Stream<Item = Result<T, Error>>
80where
81 T: 'static,
82 F: Fn(Option<String>) -> Fut + Clone + 'static,
83 Fut: Future<Output = Result<Page<T>, Error>>,
84{
85 stream::unfold(
87 (None, true, fetch_page), |(cursor, has_next, fetch)| async move {
89 if !has_next {
90 return None;
91 }
92
93 match fetch(cursor).await {
94 Ok(page) => {
95 let items = stream::iter(page.items.into_iter().map(Ok));
96 Some((Ok(items), (page.end_cursor, page.has_next_page, fetch)))
97 }
98 Err(e) => {
99 Some((Err(e), (None, false, fetch)))
101 }
102 }
103 },
104 )
105 .try_flatten()
107}
108
109pub fn paginate_backward<T, F, Fut>(fetch_page: F) -> impl Stream<Item = Result<T, Error>>
153where
154 T: 'static,
155 F: Fn(Option<String>) -> Fut + Clone + 'static,
156 Fut: Future<Output = Result<Page<T>, Error>>,
157{
158 stream::unfold(
159 (None, true, fetch_page),
160 |(cursor, has_prev, fetch)| async move {
161 if !has_prev {
162 return None;
163 }
164
165 match fetch(cursor).await {
166 Ok(page) => {
167 let items = stream::iter(page.items.into_iter().map(Ok));
168 Some((
169 Ok(items),
170 (page.start_cursor, page.has_previous_page, fetch),
171 ))
172 }
173 Err(e) => Some((Err(e), (None, false, fetch))),
174 }
175 },
176 )
177 .try_flatten()
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use futures::StreamExt;
184 use std::sync::Arc;
185 use std::sync::atomic::AtomicUsize;
186 use std::sync::atomic::Ordering;
187
188 #[test]
189 fn test_page_info_deserialization() {
190 let json = r#"{"hasNextPage": true, "endCursor": "abc123"}"#;
191 let page_info: PageInfo = serde_json::from_str(json).unwrap();
192 assert!(page_info.has_next_page);
193 assert_eq!(page_info.end_cursor, Some("abc123".to_string()));
194 }
195
196 #[test]
197 fn test_page_info_deserialization_no_cursor() {
198 let json = r#"{"hasNextPage": false, "endCursor": null}"#;
199 let page_info: PageInfo = serde_json::from_str(json).unwrap();
200 assert!(!page_info.has_next_page);
201 assert_eq!(page_info.end_cursor, None);
202 }
203
204 #[tokio::test]
205 async fn test_paginate_single_page() {
206 let stream = paginate(|_cursor| async {
207 Ok(Page {
208 items: vec![1, 2, 3],
209 ..Default::default()
210 })
211 });
212
213 let results: Vec<_> = stream.collect().await;
214 assert_eq!(results.len(), 3);
215 assert_eq!(results[0].as_ref().unwrap(), &1);
216 assert_eq!(results[1].as_ref().unwrap(), &2);
217 assert_eq!(results[2].as_ref().unwrap(), &3);
218 }
219
220 #[tokio::test]
221 async fn test_paginate_multiple_pages() {
222 let page_count = Arc::new(AtomicUsize::new(0));
223
224 let stream = paginate({
225 let page_count = page_count.clone();
226 move |cursor| {
227 let page_count = page_count.clone();
228 async move {
229 let page_num = page_count.fetch_add(1, Ordering::SeqCst);
230 match page_num {
231 0 => {
232 assert!(cursor.is_none());
233 Ok(Page {
234 items: vec![1, 2],
235 has_next_page: true,
236 end_cursor: Some("cursor1".to_string()),
237 ..Default::default()
238 })
239 }
240 1 => {
241 assert_eq!(cursor, Some("cursor1".to_string()));
242 Ok(Page {
243 items: vec![3, 4],
244 has_next_page: true,
245 end_cursor: Some("cursor2".to_string()),
246 ..Default::default()
247 })
248 }
249 2 => {
250 assert_eq!(cursor, Some("cursor2".to_string()));
251 Ok(Page {
252 items: vec![5],
253 ..Default::default()
254 })
255 }
256 _ => panic!("unexpected page request"),
257 }
258 }
259 }
260 });
261
262 let results: Vec<i32> = stream.map(|r| r.unwrap()).collect().await;
263 assert_eq!(results, vec![1, 2, 3, 4, 5]);
264 assert_eq!(page_count.load(Ordering::SeqCst), 3);
265 }
266
267 #[tokio::test]
268 async fn test_paginate_empty_page() {
269 let stream = paginate(|_cursor| async { Ok(Page::<i32>::default()) });
270
271 let results: Vec<_> = stream.collect().await;
272 assert!(results.is_empty());
273 }
274
275 #[tokio::test]
276 async fn test_paginate_backward_single_page() {
277 let stream = paginate_backward(|_cursor| async {
278 Ok(Page {
279 items: vec![1, 2, 3],
280 ..Default::default()
281 })
282 });
283
284 let results: Vec<_> = stream.collect().await;
285 assert_eq!(results.len(), 3);
286 assert_eq!(results[0].as_ref().unwrap(), &1);
287 assert_eq!(results[1].as_ref().unwrap(), &2);
288 assert_eq!(results[2].as_ref().unwrap(), &3);
289 }
290
291 #[tokio::test]
292 async fn test_paginate_backward_multiple_pages() {
293 let page_count = Arc::new(AtomicUsize::new(0));
294
295 let stream = paginate_backward({
296 let page_count = page_count.clone();
297 move |cursor| {
298 let page_count = page_count.clone();
299 async move {
300 let page_num = page_count.fetch_add(1, Ordering::SeqCst);
301 match page_num {
302 0 => {
303 assert!(cursor.is_none());
304 Ok(Page {
305 items: vec![5, 4],
306 has_previous_page: true,
307 start_cursor: Some("cursor1".to_string()),
308 ..Default::default()
309 })
310 }
311 1 => {
312 assert_eq!(cursor, Some("cursor1".to_string()));
313 Ok(Page {
314 items: vec![3, 2],
315 has_previous_page: true,
316 start_cursor: Some("cursor2".to_string()),
317 ..Default::default()
318 })
319 }
320 2 => {
321 assert_eq!(cursor, Some("cursor2".to_string()));
322 Ok(Page {
323 items: vec![1],
324 ..Default::default()
325 })
326 }
327 _ => panic!("unexpected page request"),
328 }
329 }
330 }
331 });
332
333 let results: Vec<i32> = stream.map(|r| r.unwrap()).collect().await;
334 assert_eq!(results, vec![5, 4, 3, 2, 1]);
335 assert_eq!(page_count.load(Ordering::SeqCst), 3);
336 }
337
338 #[tokio::test]
339 async fn test_paginate_backward_empty_page() {
340 let stream = paginate_backward(|_cursor| async { Ok(Page::<i32>::default()) });
341
342 let results: Vec<_> = stream.collect().await;
343 assert!(results.is_empty());
344 }
345}