Skip to main content

sui_graphql/
pagination.rs

1//! Pagination utilities for GraphQL connections.
2
3use std::future::Future;
4
5use futures::Stream;
6use futures::TryStreamExt;
7use futures::stream;
8use serde::Deserialize;
9
10use crate::error::Error;
11
12/// A page of results from a paginated GraphQL query.
13///
14/// Supports both forward pagination (`has_next_page`, `end_cursor`) and
15/// backward pagination (`has_previous_page`, `start_cursor`).
16#[derive(Debug, Clone)]
17pub struct Page<T> {
18    /// The items in this page.
19    pub items: Vec<T>,
20    /// Whether there are more pages after this one (forward pagination).
21    pub has_next_page: bool,
22    /// Cursor pointing to the last item in this page (forward pagination).
23    pub end_cursor: Option<String>,
24    /// Whether there are more pages before this one (backward pagination).
25    pub has_previous_page: bool,
26    /// Cursor pointing to the first item in this page (backward pagination).
27    pub start_cursor: Option<String>,
28}
29
30impl<T> Default for Page<T> {
31    fn default() -> Self {
32        Self {
33            items: Vec::new(),
34            has_next_page: false,
35            end_cursor: None,
36            has_previous_page: false,
37            start_cursor: None,
38        }
39    }
40}
41
42/// GraphQL PageInfo from Connection responses.
43///
44/// Supports both forward pagination (`has_next_page`, `end_cursor`) and
45/// backward pagination (`has_previous_page`, `start_cursor`).
46#[derive(Debug, Clone, Default, Deserialize)]
47#[serde(rename_all = "camelCase", default)]
48pub struct PageInfo {
49    /// Whether there are more pages after this one (forward pagination).
50    pub has_next_page: bool,
51    /// Cursor pointing to the last item in this page (forward pagination).
52    pub end_cursor: Option<String>,
53    /// Whether there are more pages before this one (backward pagination).
54    pub has_previous_page: bool,
55    /// Cursor pointing to the first item in this page (backward pagination).
56    pub start_cursor: Option<String>,
57}
58
59/// Creates a paginated stream from a fetch function.
60///
61/// This helper handles the common pattern of:
62/// 1. Fetching a page of results
63/// 2. Yielding items one by one
64/// 3. Fetching the next page when the current batch is exhausted
65/// 4. Stopping when there are no more pages
66///
67/// The closure should capture any state it needs (e.g., client, owner address).
68///
69/// # Example
70///
71/// ```ignore
72/// let stream = paginate(move |cursor| {
73///     let client = client.clone();
74///     async move {
75///         client.fetch_page(cursor.as_deref()).await
76///     }
77/// });
78/// ```
79pub fn paginate<T, F, Fut>(fetch_page: F) -> impl Stream<Item = Result<T, Error>>
80where
81    T: 'static,
82    F: Fn(Option<String>) -> Fut + Clone + 'static,
83    Fut: Future<Output = Result<Page<T>, Error>>,
84{
85    // Step 1: unfold produces a stream of pages (each page is a stream of items)
86    stream::unfold(
87        (None, true, fetch_page), // cursor, has_next_page, fetch function
88        |(cursor, has_next, fetch)| async move {
89            if !has_next {
90                return None;
91            }
92
93            match fetch(cursor).await {
94                Ok(page) => {
95                    let items = stream::iter(page.items.into_iter().map(Ok));
96                    Some((Ok(items), (page.end_cursor, page.has_next_page, fetch)))
97                }
98                Err(e) => {
99                    // Yield error and stop pagination
100                    Some((Err(e), (None, false, fetch)))
101                }
102            }
103        },
104    )
105    // Step 2: try_flatten converts stream of pages into stream of items
106    .try_flatten()
107}
108
109/// Creates a backward paginated stream from a fetch function.
110///
111/// Similar to [`paginate`], but iterates backward through pages using
112/// `has_previous_page` and `start_cursor` instead of `has_next_page` and `end_cursor`.
113///
114/// Note: Items within each page are yielded in the order returned by the server.
115/// The backward pagination only affects which pages are fetched, not item order within pages.
116///
117/// # Requirements
118///
119/// The fetch function must return a [`Page`] with `has_previous_page` and `start_cursor`
120/// populated. The GraphQL query should:
121///
122/// 1. Use `last` and `before` parameters (instead of `first` and `after` for forward pagination)
123/// 2. Query `hasPreviousPage` and `startCursor` in `pageInfo`
124///
125/// # Page Size
126///
127/// The `last` parameter specifies how many items to fetch per page. If `last` exceeds the
128/// server's maximum page size, the request will fail. Query `serviceConfig` to discover
129/// the page size limits:
130///
131/// ```graphql
132/// query {
133///   serviceConfig {
134///     defaultPageSize(type: "Query", field: "objects")
135///     maxPageSize(type: "Query", field: "objects")
136///   }
137/// }
138/// ```
139///
140/// # Example
141///
142/// ```ignore
143/// let stream = paginate_backward(move |cursor| {
144///     let client = client.clone();
145///     async move {
146///         // Query with `last` and `before` parameters
147///         // and fetch `hasPreviousPage` and `startCursor` in pageInfo
148///         client.fetch_page_backward(cursor.as_deref()).await
149///     }
150/// });
151/// ```
152pub fn paginate_backward<T, F, Fut>(fetch_page: F) -> impl Stream<Item = Result<T, Error>>
153where
154    T: 'static,
155    F: Fn(Option<String>) -> Fut + Clone + 'static,
156    Fut: Future<Output = Result<Page<T>, Error>>,
157{
158    stream::unfold(
159        (None, true, fetch_page),
160        |(cursor, has_prev, fetch)| async move {
161            if !has_prev {
162                return None;
163            }
164
165            match fetch(cursor).await {
166                Ok(page) => {
167                    let items = stream::iter(page.items.into_iter().map(Ok));
168                    Some((
169                        Ok(items),
170                        (page.start_cursor, page.has_previous_page, fetch),
171                    ))
172                }
173                Err(e) => Some((Err(e), (None, false, fetch))),
174            }
175        },
176    )
177    .try_flatten()
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use futures::StreamExt;
184    use std::sync::Arc;
185    use std::sync::atomic::AtomicUsize;
186    use std::sync::atomic::Ordering;
187
188    #[test]
189    fn test_page_info_deserialization() {
190        let json = r#"{"hasNextPage": true, "endCursor": "abc123"}"#;
191        let page_info: PageInfo = serde_json::from_str(json).unwrap();
192        assert!(page_info.has_next_page);
193        assert_eq!(page_info.end_cursor, Some("abc123".to_string()));
194    }
195
196    #[test]
197    fn test_page_info_deserialization_no_cursor() {
198        let json = r#"{"hasNextPage": false, "endCursor": null}"#;
199        let page_info: PageInfo = serde_json::from_str(json).unwrap();
200        assert!(!page_info.has_next_page);
201        assert_eq!(page_info.end_cursor, None);
202    }
203
204    #[tokio::test]
205    async fn test_paginate_single_page() {
206        let stream = paginate(|_cursor| async {
207            Ok(Page {
208                items: vec![1, 2, 3],
209                ..Default::default()
210            })
211        });
212
213        let results: Vec<_> = stream.collect().await;
214        assert_eq!(results.len(), 3);
215        assert_eq!(results[0].as_ref().unwrap(), &1);
216        assert_eq!(results[1].as_ref().unwrap(), &2);
217        assert_eq!(results[2].as_ref().unwrap(), &3);
218    }
219
220    #[tokio::test]
221    async fn test_paginate_multiple_pages() {
222        let page_count = Arc::new(AtomicUsize::new(0));
223
224        let stream = paginate({
225            let page_count = page_count.clone();
226            move |cursor| {
227                let page_count = page_count.clone();
228                async move {
229                    let page_num = page_count.fetch_add(1, Ordering::SeqCst);
230                    match page_num {
231                        0 => {
232                            assert!(cursor.is_none());
233                            Ok(Page {
234                                items: vec![1, 2],
235                                has_next_page: true,
236                                end_cursor: Some("cursor1".to_string()),
237                                ..Default::default()
238                            })
239                        }
240                        1 => {
241                            assert_eq!(cursor, Some("cursor1".to_string()));
242                            Ok(Page {
243                                items: vec![3, 4],
244                                has_next_page: true,
245                                end_cursor: Some("cursor2".to_string()),
246                                ..Default::default()
247                            })
248                        }
249                        2 => {
250                            assert_eq!(cursor, Some("cursor2".to_string()));
251                            Ok(Page {
252                                items: vec![5],
253                                ..Default::default()
254                            })
255                        }
256                        _ => panic!("unexpected page request"),
257                    }
258                }
259            }
260        });
261
262        let results: Vec<i32> = stream.map(|r| r.unwrap()).collect().await;
263        assert_eq!(results, vec![1, 2, 3, 4, 5]);
264        assert_eq!(page_count.load(Ordering::SeqCst), 3);
265    }
266
267    #[tokio::test]
268    async fn test_paginate_empty_page() {
269        let stream = paginate(|_cursor| async { Ok(Page::<i32>::default()) });
270
271        let results: Vec<_> = stream.collect().await;
272        assert!(results.is_empty());
273    }
274
275    #[tokio::test]
276    async fn test_paginate_backward_single_page() {
277        let stream = paginate_backward(|_cursor| async {
278            Ok(Page {
279                items: vec![1, 2, 3],
280                ..Default::default()
281            })
282        });
283
284        let results: Vec<_> = stream.collect().await;
285        assert_eq!(results.len(), 3);
286        assert_eq!(results[0].as_ref().unwrap(), &1);
287        assert_eq!(results[1].as_ref().unwrap(), &2);
288        assert_eq!(results[2].as_ref().unwrap(), &3);
289    }
290
291    #[tokio::test]
292    async fn test_paginate_backward_multiple_pages() {
293        let page_count = Arc::new(AtomicUsize::new(0));
294
295        let stream = paginate_backward({
296            let page_count = page_count.clone();
297            move |cursor| {
298                let page_count = page_count.clone();
299                async move {
300                    let page_num = page_count.fetch_add(1, Ordering::SeqCst);
301                    match page_num {
302                        0 => {
303                            assert!(cursor.is_none());
304                            Ok(Page {
305                                items: vec![5, 4],
306                                has_previous_page: true,
307                                start_cursor: Some("cursor1".to_string()),
308                                ..Default::default()
309                            })
310                        }
311                        1 => {
312                            assert_eq!(cursor, Some("cursor1".to_string()));
313                            Ok(Page {
314                                items: vec![3, 2],
315                                has_previous_page: true,
316                                start_cursor: Some("cursor2".to_string()),
317                                ..Default::default()
318                            })
319                        }
320                        2 => {
321                            assert_eq!(cursor, Some("cursor2".to_string()));
322                            Ok(Page {
323                                items: vec![1],
324                                ..Default::default()
325                            })
326                        }
327                        _ => panic!("unexpected page request"),
328                    }
329                }
330            }
331        });
332
333        let results: Vec<i32> = stream.map(|r| r.unwrap()).collect().await;
334        assert_eq!(results, vec![5, 4, 3, 2, 1]);
335        assert_eq!(page_count.load(Ordering::SeqCst), 3);
336    }
337
338    #[tokio::test]
339    async fn test_paginate_backward_empty_page() {
340        let stream = paginate_backward(|_cursor| async { Ok(Page::<i32>::default()) });
341
342        let results: Vec<_> = stream.collect().await;
343        assert!(results.is_empty());
344    }
345}