iota_sdk_graphql_client/
streams.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    future::Future,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use futures::Stream;
12
13use crate::{
14    error,
15    pagination::{Direction, Page, PaginationFilter},
16    query_types::PageInfo,
17};
18
19/// A stream that yields items from a paginated query with support for
20/// bidirectional pagination.
21pub struct PageStream<T, F, Fut> {
22    query_fn: F,
23    direction: Direction,
24    current_page: Option<(PageInfo, std::vec::IntoIter<T>)>,
25    current_future: Option<Pin<Box<Fut>>>,
26    finished: bool,
27    is_first_page: bool,
28}
29
30impl<T, F, Fut> PageStream<T, F, Fut> {
31    pub fn new(query_fn: F, direction: Direction) -> Self {
32        Self {
33            query_fn,
34            direction,
35            current_page: None,
36            current_future: None,
37            finished: false,
38            is_first_page: true,
39        }
40    }
41}
42
43impl<T, F, Fut> Stream for PageStream<T, F, Fut>
44where
45    T: Clone + Unpin,
46    F: Fn(PaginationFilter) -> Fut,
47    F: Unpin,
48    Fut: Future<Output = Result<Page<T>, error::Error>>,
49{
50    type Item = Result<T, error::Error>;
51
52    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53        if self.finished {
54            return Poll::Ready(None);
55        }
56
57        loop {
58            let direction = self.direction.clone();
59            // If we have a current page, return the next item
60            if let Some((page_info, iter)) = &mut self.current_page {
61                if let Some(item) = iter.next() {
62                    return Poll::Ready(Some(Ok(item)));
63                }
64
65                // For backward pagination, we check for previous page
66                // For the first page in backward pagination, we don't need to check
67                // has_previous_page
68                let should_continue = match direction {
69                    Direction::Forward => page_info.has_next_page,
70                    Direction::Backward => page_info.has_previous_page,
71                };
72                if !should_continue {
73                    self.finished = true;
74                    return Poll::Ready(None);
75                }
76            }
77
78            // Get cursor from current page
79            let current_cursor = self
80                .current_page
81                .as_ref()
82                .and_then(|(page_info, _iter)| {
83                    match self.direction {
84                        Direction::Forward => page_info
85                            .has_next_page
86                            .then(|| page_info.end_cursor.clone()),
87                        Direction::Backward => {
88                            // For the first page in backward pagination, we don't use a cursor
89                            // This ensures we start from the last page
90                            if self.is_first_page {
91                                None
92                            } else {
93                                page_info
94                                    .has_previous_page
95                                    .then(|| page_info.start_cursor.clone())
96                            }
97                        }
98                    }
99                })
100                .flatten();
101
102            // If there's no future yet, create one
103            if self.current_future.is_none() {
104                if self.is_first_page && current_cursor.is_some() {
105                    self.is_first_page = false;
106                }
107                let filter = PaginationFilter {
108                    direction: self.direction.clone(),
109                    cursor: current_cursor,
110                    limit: None,
111                };
112                let future = (self.query_fn)(filter);
113                self.current_future = Some(Box::pin(future));
114            }
115
116            // Poll the future
117            match self.current_future.as_mut().unwrap().as_mut().poll(cx) {
118                Poll::Ready(Ok(page)) => {
119                    self.current_future = None;
120
121                    if page.is_empty() {
122                        self.finished = true;
123                        return Poll::Ready(None);
124                    }
125
126                    let (page_info, data) = page.into_parts();
127                    // For backward pagination, we need to reverse the items
128                    let iter = match self.direction {
129                        Direction::Forward => data.into_iter(),
130                        Direction::Backward => {
131                            let mut vec = data;
132                            vec.reverse();
133                            vec.into_iter()
134                        }
135                    };
136                    self.current_page = Some((page_info, iter));
137
138                    if self.is_first_page {
139                        self.is_first_page = false;
140                    }
141                }
142                Poll::Ready(Err(e)) => {
143                    if self.is_first_page {
144                        self.is_first_page = false;
145                    }
146                    self.finished = true;
147                    self.current_future = None;
148                    return Poll::Ready(Some(Err(e)));
149                }
150                Poll::Pending => return Poll::Pending,
151            }
152        }
153    }
154}
155
156/// Creates a new `PageStream` for a paginated query.
157///
158/// ## Example
159///
160/// ```rust,ignore
161/// use futures::StreamExt;
162/// use iota_graphql_client::streams::stream_paginated_query;
163/// use iota_graphql_client::Client;
164/// use iota_graphql_client::PaginationFilter;
165/// use iota_graphql_client::Direction;
166///
167/// let client = Client::new_testnet();
168/// let stream = stream_paginated_query(|pagination_filter, Direction::Forward| {
169///    client.coins(owner, coin_type, pagination_filter)
170/// });
171///
172/// while let Some(result) = stream.next().await {
173///    match result {
174///        Ok(coin) => println!("Got coin: {:?}", coin),
175///        Err(e) => eprintln!("Error: {}", e),
176///    }
177/// }
178/// ```
179pub fn stream_paginated_query<T, F, Fut>(query_fn: F, direction: Direction) -> PageStream<T, F, Fut>
180where
181    F: Fn(PaginationFilter) -> Fut,
182    Fut: Future<Output = Result<Page<T>, error::Error>>,
183{
184    PageStream::new(query_fn, direction)
185}