Skip to main content

circles_rpc/
paged_query.rs

1use crate::error::Result;
2use circles_types::{
3    Conjunction, Cursor, CursorColumn, Filter, FilterPredicate, FilterType, PagedQueryParams,
4    PagedResult, SortOrder,
5};
6use futures::{Stream, StreamExt};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use serde_json::Value;
10use std::future::Future;
11use std::pin::Pin;
12use std::sync::Arc;
13
14/// Represents a page of results returned by [`PagedQuery`].
15#[derive(Debug, Clone)]
16pub struct Page<T> {
17    /// Items contained in this page.
18    pub items: Vec<T>,
19    /// Cursor pointing to the first row.
20    pub first_cursor: Option<Cursor>,
21    /// Cursor pointing to the last row.
22    pub last_cursor: Option<Cursor>,
23    /// Whether more results are available after this page.
24    pub has_more: bool,
25}
26
27pub type PagedFetch<TRow> = Arc<
28    dyn Fn(PagedQueryParams) -> Pin<Box<dyn Future<Output = Result<PagedResult<TRow>>> + Send>>
29        + Send
30        + Sync,
31>;
32
33/// Generic paginator that wraps the `circles_query` RPC method.
34/// The fetcher function is responsible for honoring `current_cursor` if desired.
35// TODO: table-aware default ordering + helper to apply cursor to params.
36pub struct PagedQuery<TRow: Clone + Serialize> {
37    fetch: PagedFetch<TRow>,
38    /// Base params to reuse across calls.
39    pub params: PagedQueryParams,
40    /// Last cursor that was seen (advanced after each page).
41    pub current_cursor: Option<Cursor>,
42}
43
44impl<TRow> PagedQuery<TRow>
45where
46    TRow: Clone + Serialize + DeserializeOwned + Send + 'static,
47{
48    pub fn new(fetch: PagedFetch<TRow>, params: PagedQueryParams) -> Self {
49        Self {
50            fetch,
51            params,
52            current_cursor: None,
53        }
54    }
55
56    /// Fetch the next page. Consumers can track `current_cursor` to drive cursor-based filters.
57    pub async fn next_page(&mut self) -> Result<Option<Page<TRow>>> {
58        let mut params = self.params.clone();
59
60        if let Some(cursor) = &self.current_cursor {
61            let cursor_filter = build_cursor_filter(cursor, &params.resolved_cursor_columns());
62            params.filter = combine_filters(params.filter.take(), cursor_filter);
63        }
64
65        let result = (self.fetch)(params).await?;
66
67        if result.results.is_empty() {
68            return Ok(None);
69        }
70
71        // Advance cursor to the last item returned.
72        self.current_cursor = result.last_cursor.clone();
73
74        Ok(Some(Page {
75            items: result.results,
76            first_cursor: result.first_cursor,
77            last_cursor: result.last_cursor,
78            has_more: result.has_more,
79        }))
80    }
81
82    /// Convert this paginator into a stream of rows.
83    pub fn into_stream(self) -> impl Stream<Item = Result<TRow>> {
84        futures::stream::unfold(self, |mut state| async move {
85            match state.next_page().await {
86                Ok(Some(page)) => {
87                    let has_more = page.has_more;
88                    let items = page.items;
89                    if has_more {
90                        Some((Ok(items), state))
91                    } else {
92                        None
93                    }
94                }
95                Ok(None) => None,
96                Err(e) => Some((Err(e), state)),
97            }
98        })
99        .flat_map(|res| match res {
100            Ok(vec) => futures::stream::iter(vec.into_iter().map(Ok)).boxed(),
101            Err(err) => futures::stream::iter(vec![Err(err)]).boxed(),
102        })
103    }
104}
105
106fn build_cursor_filter(cursor: &Cursor, cursor_columns: &[CursorColumn]) -> Vec<Filter> {
107    let mut or_predicates = Vec::new();
108
109    for level in 0..cursor_columns.len() {
110        let current_column = &cursor_columns[level];
111        let Some(cursor_value) = cursor_column_value(cursor, &current_column.name) else {
112            continue;
113        };
114
115        if level == 0 {
116            or_predicates.push(comparison_predicate(current_column, cursor_value));
117            continue;
118        }
119
120        let mut and_predicates = Vec::new();
121        for previous_column in cursor_columns.iter().take(level) {
122            let Some(previous_value) = cursor_column_value(cursor, &previous_column.name) else {
123                continue;
124            };
125            and_predicates
126                .push(FilterPredicate::equals(previous_column.name.clone(), previous_value).into());
127        }
128        and_predicates.push(comparison_predicate(current_column, cursor_value));
129        or_predicates.push(Conjunction::and(and_predicates).into());
130    }
131
132    if or_predicates.is_empty() {
133        Vec::new()
134    } else {
135        vec![Conjunction::or(or_predicates).into()]
136    }
137}
138
139fn combine_filters(
140    base_filters: Option<Vec<Filter>>,
141    cursor_filter: Vec<Filter>,
142) -> Option<Vec<Filter>> {
143    match (base_filters, cursor_filter.is_empty()) {
144        (None, true) => None,
145        (Some(filters), true) => Some(filters),
146        (None, false) => Some(cursor_filter),
147        (Some(base_filters), false) => {
148            let mut predicates = base_filters;
149            predicates.extend(cursor_filter);
150            Some(vec![Conjunction::and(predicates).into()])
151        }
152    }
153}
154
155fn comparison_predicate(column: &CursorColumn, value: Value) -> Filter {
156    let filter_type = match column.sort_order {
157        SortOrder::ASC => FilterType::GreaterThan,
158        SortOrder::DESC => FilterType::LessThan,
159    };
160    FilterPredicate::new(filter_type, column.name.clone(), value).into()
161}
162
163fn cursor_column_value(cursor: &Cursor, column: &str) -> Option<Value> {
164    match column {
165        "blockNumber" => Some(
166            cursor
167                .value(column)
168                .cloned()
169                .unwrap_or_else(|| Value::from(cursor.block_number)),
170        ),
171        "transactionIndex" => Some(
172            cursor
173                .value(column)
174                .cloned()
175                .unwrap_or_else(|| Value::from(cursor.transaction_index)),
176        ),
177        "logIndex" => Some(
178            cursor
179                .value(column)
180                .cloned()
181                .unwrap_or_else(|| Value::from(cursor.log_index)),
182        ),
183        "batchIndex" => cursor
184            .value(column)
185            .cloned()
186            .or_else(|| cursor.batch_index.map(Value::from)),
187        "timestamp" => cursor
188            .value(column)
189            .cloned()
190            .or_else(|| cursor.timestamp.map(Value::from)),
191        _ => cursor.value(column).cloned(),
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use serde::{Deserialize, Serialize};
199    use serde_json::json;
200    use std::sync::Mutex;
201
202    #[derive(Debug, Clone, Serialize, Deserialize)]
203    struct HolderRow {
204        group: String,
205        holder: String,
206        #[serde(rename = "totalBalance")]
207        total_balance: String,
208        #[serde(rename = "demurragedTotalBalance")]
209        demurraged_total_balance: String,
210        #[serde(rename = "fractionOwnership")]
211        fraction_ownership: f64,
212    }
213
214    #[tokio::test]
215    async fn next_page_builds_ts_style_custom_cursor_filter() {
216        let seen_params = Arc::new(Mutex::new(Vec::<PagedQueryParams>::new()));
217        let seen_params_fetch = Arc::clone(&seen_params);
218        let fetch: PagedFetch<HolderRow> = Arc::new(move |params: PagedQueryParams| {
219            let seen_params = Arc::clone(&seen_params_fetch);
220            Box::pin(async move {
221                let call_index = {
222                    let mut guard = seen_params.lock().expect("lock params");
223                    guard.push(params.clone());
224                    guard.len()
225                };
226
227                if call_index == 1 {
228                    let mut cursor = Cursor::default();
229                    cursor.insert_value(
230                        "holder".to_string(),
231                        json!("0x2222222222222222222222222222222222222222"),
232                    );
233
234                    Ok(PagedResult {
235                        limit: params.limit,
236                        size: 1,
237                        first_cursor: Some(cursor.clone()),
238                        last_cursor: Some(cursor),
239                        sort_order: params.sort_order,
240                        has_more: true,
241                        results: vec![HolderRow {
242                            group: "0x1111111111111111111111111111111111111111".into(),
243                            holder: "0x2222222222222222222222222222222222222222".into(),
244                            total_balance: "100".into(),
245                            demurraged_total_balance: "100".into(),
246                            fraction_ownership: 0.5,
247                        }],
248                    })
249                } else {
250                    Ok(PagedResult {
251                        limit: params.limit,
252                        size: 0,
253                        first_cursor: None,
254                        last_cursor: None,
255                        sort_order: params.sort_order,
256                        has_more: false,
257                        results: Vec::new(),
258                    })
259                }
260            })
261        });
262
263        let mut query = PagedQuery::new(
264            fetch,
265            PagedQueryParams {
266                namespace: "V_CrcV2".into(),
267                table: "GroupTokenHoldersBalance".into(),
268                sort_order: SortOrder::DESC,
269                columns: vec![
270                    "group".into(),
271                    "holder".into(),
272                    "totalBalance".into(),
273                    "demurragedTotalBalance".into(),
274                    "fractionOwnership".into(),
275                ],
276                filter: Some(vec![
277                    FilterPredicate::equals(
278                        "group".into(),
279                        "0x1111111111111111111111111111111111111111",
280                    )
281                    .into(),
282                ]),
283                cursor_columns: Some(vec![CursorColumn::asc("holder".into())]),
284                order_columns: Some(vec![
285                    circles_types::OrderBy::desc("totalBalance".into()),
286                    circles_types::OrderBy::asc("holder".into()),
287                ]),
288                limit: 50,
289            },
290        );
291
292        assert!(query.next_page().await.expect("first page").is_some());
293        assert!(query.next_page().await.expect("second page").is_none());
294
295        let recorded = seen_params.lock().expect("lock params");
296        assert_eq!(recorded.len(), 2);
297        let second_filter = serde_json::to_value(recorded[1].filter.clone().expect("filter"))
298            .expect("serialize filter");
299        assert_eq!(second_filter[0]["Type"], json!("Conjunction"));
300        assert_eq!(second_filter[0]["ConjunctionType"], json!("And"));
301        assert_eq!(
302            second_filter[0]["Predicates"][1]["ConjunctionType"],
303            json!("Or")
304        );
305        assert_eq!(
306            second_filter[0]["Predicates"][1]["Predicates"][0]["Column"],
307            json!("holder")
308        );
309        assert_eq!(
310            second_filter[0]["Predicates"][1]["Predicates"][0]["FilterType"],
311            json!("GreaterThan")
312        );
313    }
314}