Skip to main content

circles_rpc/methods/
query.rs

1use crate::client::RpcClient;
2use crate::error::{CirclesRpcError, Result};
3use circles_types::{
4    CirclesQueryResponse, Cursor, CursorColumn, OrderBy, PagedQueryParams, PagedResult, QueryParams,
5};
6use serde_json::Value;
7
8/// Methods for issuing `circles_query` requests and decoding the tabular response.
9///
10/// Includes a pager that adds stable ordering (block/tx/log/timestamp) and
11/// extracts cursors for streaming.
12#[derive(Clone, Debug)]
13pub struct QueryMethods {
14    client: RpcClient,
15}
16
17impl QueryMethods {
18    /// Create a new accessor for `circles_query` RPCs.
19    pub fn new(client: RpcClient) -> Self {
20        Self { client }
21    }
22
23    /// Direct `circles_query` invocation returning decoded rows.
24    pub async fn circles_query<TRow>(&self, params: QueryParams) -> Result<Vec<TRow>>
25    where
26        TRow: serde::de::DeserializeOwned + Send + Sync + std::fmt::Debug + Unpin + 'static,
27    {
28        let result: CirclesQueryResponse = self.client.call("circles_query", (params,)).await?;
29        self.decode_rows::<TRow>(result.columns, result.rows)
30    }
31
32    /// Convenience wrapper for paged queries using the `circles_query` method.
33    /// Note: The underlying backend expects `QueryParams`; we translate from the
34    /// higher-level `PagedQueryParams` struct.
35    pub async fn paged_query<TRow>(&self, params: PagedQueryParams) -> Result<PagedResult<TRow>>
36    where
37        TRow: serde::de::DeserializeOwned
38            + serde::Serialize
39            + Clone
40            + Send
41            + Sync
42            + std::fmt::Debug
43            + Unpin
44            + 'static,
45    {
46        let PagedQueryParams {
47            namespace,
48            table,
49            sort_order,
50            columns,
51            filter,
52            cursor_columns,
53            order_columns,
54            limit,
55        } = params.clone();
56
57        let order: Vec<OrderBy> = if let Some(order_columns) = order_columns.clone() {
58            if order_columns.is_empty() {
59                params.resolved_order_columns()
60            } else {
61                order_columns
62            }
63        } else {
64            params.resolved_order_columns()
65        };
66        let cursor_columns = if let Some(cursor_columns) = cursor_columns {
67            if cursor_columns.is_empty() {
68                params.resolved_cursor_columns()
69            } else {
70                cursor_columns
71            }
72        } else {
73            params.resolved_cursor_columns()
74        };
75
76        let query_params = QueryParams {
77            namespace,
78            table,
79            columns,
80            filter: filter.unwrap_or_default(),
81            order,
82            limit: Some(limit),
83        };
84
85        let result: CirclesQueryResponse =
86            self.client.call("circles_query", (query_params,)).await?;
87
88        let rows = self.decode_rows::<TRow>(result.columns.clone(), result.rows.clone())?;
89        let cursors = self.extract_cursors(&result.columns, &result.rows, &cursor_columns);
90        let first_cursor = cursors.first().cloned();
91        let last_cursor = cursors.last().cloned();
92        let size = rows.len() as u32;
93        let has_more = size == limit;
94
95        Ok(PagedResult {
96            limit,
97            size,
98            first_cursor,
99            last_cursor,
100            sort_order,
101            has_more,
102            results: rows,
103        })
104    }
105
106    pub fn decode_rows<TRow>(
107        &self,
108        columns: Vec<String>,
109        rows: Vec<Vec<Value>>,
110    ) -> Result<Vec<TRow>>
111    where
112        TRow: serde::de::DeserializeOwned,
113    {
114        rows.into_iter()
115            .map(|row| self.decode_row::<TRow>(&columns, row))
116            .collect()
117    }
118
119    pub fn decode_row<TRow>(&self, columns: &[String], row: Vec<Value>) -> Result<TRow>
120    where
121        TRow: serde::de::DeserializeOwned,
122    {
123        if columns.len() != row.len() {
124            return Err(CirclesRpcError::InvalidResponse {
125                message: "circles_query row length mismatch".to_string(),
126            });
127        }
128        let mut map = serde_json::Map::new();
129        for (col, val) in columns.iter().cloned().zip(row.into_iter()) {
130            map.insert(col, val);
131        }
132        serde_json::from_value(Value::Object(map)).map_err(|e| CirclesRpcError::InvalidResponse {
133            message: e.to_string(),
134        })
135    }
136
137    pub fn extract_cursors(
138        &self,
139        columns: &[String],
140        rows: &[Vec<Value>],
141        cursor_columns: &[CursorColumn],
142    ) -> Vec<Cursor> {
143        rows.iter()
144            .filter_map(|row| self.extract_cursor(columns, row, cursor_columns))
145            .collect()
146    }
147
148    fn extract_cursor(
149        &self,
150        columns: &[String],
151        row: &[Value],
152        cursor_columns: &[CursorColumn],
153    ) -> Option<Cursor> {
154        let mut block_number: Option<u64> = None;
155        let mut tx_index: Option<u32> = None;
156        let mut log_index: Option<u32> = None;
157        let mut batch_index: Option<u32> = None;
158        let mut timestamp: Option<u64> = None;
159        let mut cursor = Cursor::default();
160        let mut has_cursor_values = false;
161
162        for (col, val) in columns.iter().zip(row.iter()) {
163            match col.as_str() {
164                "blockNumber" => block_number = Self::as_u64(val),
165                "transactionIndex" => tx_index = Self::as_u32(val),
166                "logIndex" => log_index = Self::as_u32(val),
167                "batchIndex" => batch_index = Self::as_u32(val),
168                "timestamp" => timestamp = Self::as_u64(val),
169                _ => {}
170            }
171
172            if cursor_columns.iter().any(|column| column.name == *col) {
173                cursor.insert_value(col.clone(), val.clone());
174                has_cursor_values = true;
175            }
176        }
177
178        if let (Some(b), Some(tx), Some(log)) = (block_number, tx_index, log_index) {
179            cursor.block_number = b;
180            cursor.transaction_index = tx;
181            cursor.log_index = log;
182            cursor.batch_index = batch_index;
183            cursor.timestamp = timestamp;
184            return Some(cursor);
185        }
186
187        if has_cursor_values {
188            return Some(cursor);
189        }
190
191        None
192    }
193
194    fn as_u64(val: &Value) -> Option<u64> {
195        match val {
196            Value::Number(n) => n.as_u64(),
197            Value::String(s) => s.parse().ok(),
198            _ => None,
199        }
200    }
201
202    fn as_u32(val: &Value) -> Option<u32> {
203        Self::as_u64(val).map(|v| v as u32)
204    }
205}