1use crate::error::Result;
2use circles_types::{
3 Conjunction, Cursor, CursorColumn, Filter, FilterPredicate, FilterType, PagedQueryParams,
4 PagedResult, SortOrder,
5};
6use futures::{Stream, StreamExt};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use serde_json::Value;
10use std::future::Future;
11use std::pin::Pin;
12use std::sync::Arc;
13
14#[derive(Debug, Clone)]
16pub struct Page<T> {
17 pub items: Vec<T>,
19 pub first_cursor: Option<Cursor>,
21 pub last_cursor: Option<Cursor>,
23 pub has_more: bool,
25}
26
27pub type PagedFetch<TRow> = Arc<
28 dyn Fn(PagedQueryParams) -> Pin<Box<dyn Future<Output = Result<PagedResult<TRow>>> + Send>>
29 + Send
30 + Sync,
31>;
32
33pub struct PagedQuery<TRow: Clone + Serialize> {
37 fetch: PagedFetch<TRow>,
38 pub params: PagedQueryParams,
40 pub current_cursor: Option<Cursor>,
42}
43
44impl<TRow> PagedQuery<TRow>
45where
46 TRow: Clone + Serialize + DeserializeOwned + Send + 'static,
47{
48 pub fn new(fetch: PagedFetch<TRow>, params: PagedQueryParams) -> Self {
49 Self {
50 fetch,
51 params,
52 current_cursor: None,
53 }
54 }
55
56 pub async fn next_page(&mut self) -> Result<Option<Page<TRow>>> {
58 let mut params = self.params.clone();
59
60 if let Some(cursor) = &self.current_cursor {
61 let cursor_filter = build_cursor_filter(cursor, ¶ms.resolved_cursor_columns());
62 params.filter = combine_filters(params.filter.take(), cursor_filter);
63 }
64
65 let result = (self.fetch)(params).await?;
66
67 if result.results.is_empty() {
68 return Ok(None);
69 }
70
71 self.current_cursor = result.last_cursor.clone();
73
74 Ok(Some(Page {
75 items: result.results,
76 first_cursor: result.first_cursor,
77 last_cursor: result.last_cursor,
78 has_more: result.has_more,
79 }))
80 }
81
82 pub fn into_stream(self) -> impl Stream<Item = Result<TRow>> {
84 futures::stream::unfold(self, |mut state| async move {
85 match state.next_page().await {
86 Ok(Some(page)) => {
87 let has_more = page.has_more;
88 let items = page.items;
89 if has_more {
90 Some((Ok(items), state))
91 } else {
92 None
93 }
94 }
95 Ok(None) => None,
96 Err(e) => Some((Err(e), state)),
97 }
98 })
99 .flat_map(|res| match res {
100 Ok(vec) => futures::stream::iter(vec.into_iter().map(Ok)).boxed(),
101 Err(err) => futures::stream::iter(vec![Err(err)]).boxed(),
102 })
103 }
104}
105
106fn build_cursor_filter(cursor: &Cursor, cursor_columns: &[CursorColumn]) -> Vec<Filter> {
107 let mut or_predicates = Vec::new();
108
109 for level in 0..cursor_columns.len() {
110 let current_column = &cursor_columns[level];
111 let Some(cursor_value) = cursor_column_value(cursor, ¤t_column.name) else {
112 continue;
113 };
114
115 if level == 0 {
116 or_predicates.push(comparison_predicate(current_column, cursor_value));
117 continue;
118 }
119
120 let mut and_predicates = Vec::new();
121 for previous_column in cursor_columns.iter().take(level) {
122 let Some(previous_value) = cursor_column_value(cursor, &previous_column.name) else {
123 continue;
124 };
125 and_predicates
126 .push(FilterPredicate::equals(previous_column.name.clone(), previous_value).into());
127 }
128 and_predicates.push(comparison_predicate(current_column, cursor_value));
129 or_predicates.push(Conjunction::and(and_predicates).into());
130 }
131
132 if or_predicates.is_empty() {
133 Vec::new()
134 } else {
135 vec![Conjunction::or(or_predicates).into()]
136 }
137}
138
139fn combine_filters(
140 base_filters: Option<Vec<Filter>>,
141 cursor_filter: Vec<Filter>,
142) -> Option<Vec<Filter>> {
143 match (base_filters, cursor_filter.is_empty()) {
144 (None, true) => None,
145 (Some(filters), true) => Some(filters),
146 (None, false) => Some(cursor_filter),
147 (Some(base_filters), false) => {
148 let mut predicates = base_filters;
149 predicates.extend(cursor_filter);
150 Some(vec![Conjunction::and(predicates).into()])
151 }
152 }
153}
154
155fn comparison_predicate(column: &CursorColumn, value: Value) -> Filter {
156 let filter_type = match column.sort_order {
157 SortOrder::ASC => FilterType::GreaterThan,
158 SortOrder::DESC => FilterType::LessThan,
159 };
160 FilterPredicate::new(filter_type, column.name.clone(), value).into()
161}
162
163fn cursor_column_value(cursor: &Cursor, column: &str) -> Option<Value> {
164 match column {
165 "blockNumber" => Some(
166 cursor
167 .value(column)
168 .cloned()
169 .unwrap_or_else(|| Value::from(cursor.block_number)),
170 ),
171 "transactionIndex" => Some(
172 cursor
173 .value(column)
174 .cloned()
175 .unwrap_or_else(|| Value::from(cursor.transaction_index)),
176 ),
177 "logIndex" => Some(
178 cursor
179 .value(column)
180 .cloned()
181 .unwrap_or_else(|| Value::from(cursor.log_index)),
182 ),
183 "batchIndex" => cursor
184 .value(column)
185 .cloned()
186 .or_else(|| cursor.batch_index.map(Value::from)),
187 "timestamp" => cursor
188 .value(column)
189 .cloned()
190 .or_else(|| cursor.timestamp.map(Value::from)),
191 _ => cursor.value(column).cloned(),
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use serde::{Deserialize, Serialize};
199 use serde_json::json;
200 use std::sync::Mutex;
201
202 #[derive(Debug, Clone, Serialize, Deserialize)]
203 struct HolderRow {
204 group: String,
205 holder: String,
206 #[serde(rename = "totalBalance")]
207 total_balance: String,
208 #[serde(rename = "demurragedTotalBalance")]
209 demurraged_total_balance: String,
210 #[serde(rename = "fractionOwnership")]
211 fraction_ownership: f64,
212 }
213
214 #[tokio::test]
215 async fn next_page_builds_ts_style_custom_cursor_filter() {
216 let seen_params = Arc::new(Mutex::new(Vec::<PagedQueryParams>::new()));
217 let seen_params_fetch = Arc::clone(&seen_params);
218 let fetch: PagedFetch<HolderRow> = Arc::new(move |params: PagedQueryParams| {
219 let seen_params = Arc::clone(&seen_params_fetch);
220 Box::pin(async move {
221 let call_index = {
222 let mut guard = seen_params.lock().expect("lock params");
223 guard.push(params.clone());
224 guard.len()
225 };
226
227 if call_index == 1 {
228 let mut cursor = Cursor::default();
229 cursor.insert_value(
230 "holder".to_string(),
231 json!("0x2222222222222222222222222222222222222222"),
232 );
233
234 Ok(PagedResult {
235 limit: params.limit,
236 size: 1,
237 first_cursor: Some(cursor.clone()),
238 last_cursor: Some(cursor),
239 sort_order: params.sort_order,
240 has_more: true,
241 results: vec![HolderRow {
242 group: "0x1111111111111111111111111111111111111111".into(),
243 holder: "0x2222222222222222222222222222222222222222".into(),
244 total_balance: "100".into(),
245 demurraged_total_balance: "100".into(),
246 fraction_ownership: 0.5,
247 }],
248 })
249 } else {
250 Ok(PagedResult {
251 limit: params.limit,
252 size: 0,
253 first_cursor: None,
254 last_cursor: None,
255 sort_order: params.sort_order,
256 has_more: false,
257 results: Vec::new(),
258 })
259 }
260 })
261 });
262
263 let mut query = PagedQuery::new(
264 fetch,
265 PagedQueryParams {
266 namespace: "V_CrcV2".into(),
267 table: "GroupTokenHoldersBalance".into(),
268 sort_order: SortOrder::DESC,
269 columns: vec![
270 "group".into(),
271 "holder".into(),
272 "totalBalance".into(),
273 "demurragedTotalBalance".into(),
274 "fractionOwnership".into(),
275 ],
276 filter: Some(vec![
277 FilterPredicate::equals(
278 "group".into(),
279 "0x1111111111111111111111111111111111111111",
280 )
281 .into(),
282 ]),
283 cursor_columns: Some(vec![CursorColumn::asc("holder".into())]),
284 order_columns: Some(vec![
285 circles_types::OrderBy::desc("totalBalance".into()),
286 circles_types::OrderBy::asc("holder".into()),
287 ]),
288 limit: 50,
289 },
290 );
291
292 assert!(query.next_page().await.expect("first page").is_some());
293 assert!(query.next_page().await.expect("second page").is_none());
294
295 let recorded = seen_params.lock().expect("lock params");
296 assert_eq!(recorded.len(), 2);
297 let second_filter = serde_json::to_value(recorded[1].filter.clone().expect("filter"))
298 .expect("serialize filter");
299 assert_eq!(second_filter[0]["Type"], json!("Conjunction"));
300 assert_eq!(second_filter[0]["ConjunctionType"], json!("And"));
301 assert_eq!(
302 second_filter[0]["Predicates"][1]["ConjunctionType"],
303 json!("Or")
304 );
305 assert_eq!(
306 second_filter[0]["Predicates"][1]["Predicates"][0]["Column"],
307 json!("holder")
308 );
309 assert_eq!(
310 second_filter[0]["Predicates"][1]["Predicates"][0]["FilterType"],
311 json!("GreaterThan")
312 );
313 }
314}