iota_sdk_graphql_client/
streams.rs1use std::{
6 future::Future,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use futures::Stream;
12
13use crate::{
14 error,
15 pagination::{Direction, Page, PaginationFilter},
16 query_types::PageInfo,
17};
18
19pub struct PageStream<T, F, Fut> {
22 query_fn: F,
23 direction: Direction,
24 current_page: Option<(PageInfo, std::vec::IntoIter<T>)>,
25 current_future: Option<Pin<Box<Fut>>>,
26 finished: bool,
27 is_first_page: bool,
28}
29
30impl<T, F, Fut> PageStream<T, F, Fut> {
31 pub fn new(query_fn: F, direction: Direction) -> Self {
32 Self {
33 query_fn,
34 direction,
35 current_page: None,
36 current_future: None,
37 finished: false,
38 is_first_page: true,
39 }
40 }
41}
42
43impl<T, F, Fut> Stream for PageStream<T, F, Fut>
44where
45 T: Clone + Unpin,
46 F: Fn(PaginationFilter) -> Fut,
47 F: Unpin,
48 Fut: Future<Output = Result<Page<T>, error::Error>>,
49{
50 type Item = Result<T, error::Error>;
51
52 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53 if self.finished {
54 return Poll::Ready(None);
55 }
56
57 loop {
58 let direction = self.direction.clone();
59 if let Some((page_info, iter)) = &mut self.current_page {
61 if let Some(item) = iter.next() {
62 return Poll::Ready(Some(Ok(item)));
63 }
64
65 let should_continue = match direction {
69 Direction::Forward => page_info.has_next_page,
70 Direction::Backward => page_info.has_previous_page,
71 };
72 if !should_continue {
73 self.finished = true;
74 return Poll::Ready(None);
75 }
76 }
77
78 let current_cursor = self
80 .current_page
81 .as_ref()
82 .and_then(|(page_info, _iter)| {
83 match self.direction {
84 Direction::Forward => page_info
85 .has_next_page
86 .then(|| page_info.end_cursor.clone()),
87 Direction::Backward => {
88 if self.is_first_page {
91 None
92 } else {
93 page_info
94 .has_previous_page
95 .then(|| page_info.start_cursor.clone())
96 }
97 }
98 }
99 })
100 .flatten();
101
102 if self.current_future.is_none() {
104 if self.is_first_page && current_cursor.is_some() {
105 self.is_first_page = false;
106 }
107 let filter = PaginationFilter {
108 direction: self.direction.clone(),
109 cursor: current_cursor,
110 limit: None,
111 };
112 let future = (self.query_fn)(filter);
113 self.current_future = Some(Box::pin(future));
114 }
115
116 match self.current_future.as_mut().unwrap().as_mut().poll(cx) {
118 Poll::Ready(Ok(page)) => {
119 self.current_future = None;
120
121 if page.is_empty() {
122 self.finished = true;
123 return Poll::Ready(None);
124 }
125
126 let (page_info, data) = page.into_parts();
127 let iter = match self.direction {
129 Direction::Forward => data.into_iter(),
130 Direction::Backward => {
131 let mut vec = data;
132 vec.reverse();
133 vec.into_iter()
134 }
135 };
136 self.current_page = Some((page_info, iter));
137
138 if self.is_first_page {
139 self.is_first_page = false;
140 }
141 }
142 Poll::Ready(Err(e)) => {
143 if self.is_first_page {
144 self.is_first_page = false;
145 }
146 self.finished = true;
147 self.current_future = None;
148 return Poll::Ready(Some(Err(e)));
149 }
150 Poll::Pending => return Poll::Pending,
151 }
152 }
153 }
154}
155
156pub fn stream_paginated_query<T, F, Fut>(query_fn: F, direction: Direction) -> PageStream<T, F, Fut>
180where
181 F: Fn(PaginationFilter) -> Fut,
182 Fut: Future<Output = Result<Page<T>, error::Error>>,
183{
184 PageStream::new(query_fn, direction)
185}