circles_rpc/methods/
query.rs1use crate::client::RpcClient;
2use crate::error::{CirclesRpcError, Result};
3use circles_types::{
4 CirclesQueryResponse, Cursor, CursorColumn, OrderBy, PagedQueryParams, PagedResult, QueryParams,
5};
6use serde_json::Value;
7
8#[derive(Clone, Debug)]
13pub struct QueryMethods {
14 client: RpcClient,
15}
16
17impl QueryMethods {
18 pub fn new(client: RpcClient) -> Self {
20 Self { client }
21 }
22
23 pub async fn circles_query<TRow>(&self, params: QueryParams) -> Result<Vec<TRow>>
25 where
26 TRow: serde::de::DeserializeOwned + Send + Sync + std::fmt::Debug + Unpin + 'static,
27 {
28 let result: CirclesQueryResponse = self.client.call("circles_query", (params,)).await?;
29 self.decode_rows::<TRow>(result.columns, result.rows)
30 }
31
32 pub async fn paged_query<TRow>(&self, params: PagedQueryParams) -> Result<PagedResult<TRow>>
36 where
37 TRow: serde::de::DeserializeOwned
38 + serde::Serialize
39 + Clone
40 + Send
41 + Sync
42 + std::fmt::Debug
43 + Unpin
44 + 'static,
45 {
46 let PagedQueryParams {
47 namespace,
48 table,
49 sort_order,
50 columns,
51 filter,
52 cursor_columns,
53 order_columns,
54 limit,
55 } = params.clone();
56
57 let order: Vec<OrderBy> = if let Some(order_columns) = order_columns.clone() {
58 if order_columns.is_empty() {
59 params.resolved_order_columns()
60 } else {
61 order_columns
62 }
63 } else {
64 params.resolved_order_columns()
65 };
66 let cursor_columns = if let Some(cursor_columns) = cursor_columns {
67 if cursor_columns.is_empty() {
68 params.resolved_cursor_columns()
69 } else {
70 cursor_columns
71 }
72 } else {
73 params.resolved_cursor_columns()
74 };
75
76 let query_params = QueryParams {
77 namespace,
78 table,
79 columns,
80 filter: filter.unwrap_or_default(),
81 order,
82 limit: Some(limit),
83 };
84
85 let result: CirclesQueryResponse =
86 self.client.call("circles_query", (query_params,)).await?;
87
88 let rows = self.decode_rows::<TRow>(result.columns.clone(), result.rows.clone())?;
89 let cursors = self.extract_cursors(&result.columns, &result.rows, &cursor_columns);
90 let first_cursor = cursors.first().cloned();
91 let last_cursor = cursors.last().cloned();
92 let size = rows.len() as u32;
93 let has_more = size == limit;
94
95 Ok(PagedResult {
96 limit,
97 size,
98 first_cursor,
99 last_cursor,
100 sort_order,
101 has_more,
102 results: rows,
103 })
104 }
105
106 pub fn decode_rows<TRow>(
107 &self,
108 columns: Vec<String>,
109 rows: Vec<Vec<Value>>,
110 ) -> Result<Vec<TRow>>
111 where
112 TRow: serde::de::DeserializeOwned,
113 {
114 rows.into_iter()
115 .map(|row| self.decode_row::<TRow>(&columns, row))
116 .collect()
117 }
118
119 pub fn decode_row<TRow>(&self, columns: &[String], row: Vec<Value>) -> Result<TRow>
120 where
121 TRow: serde::de::DeserializeOwned,
122 {
123 if columns.len() != row.len() {
124 return Err(CirclesRpcError::InvalidResponse {
125 message: "circles_query row length mismatch".to_string(),
126 });
127 }
128 let mut map = serde_json::Map::new();
129 for (col, val) in columns.iter().cloned().zip(row.into_iter()) {
130 map.insert(col, val);
131 }
132 serde_json::from_value(Value::Object(map)).map_err(|e| CirclesRpcError::InvalidResponse {
133 message: e.to_string(),
134 })
135 }
136
137 pub fn extract_cursors(
138 &self,
139 columns: &[String],
140 rows: &[Vec<Value>],
141 cursor_columns: &[CursorColumn],
142 ) -> Vec<Cursor> {
143 rows.iter()
144 .filter_map(|row| self.extract_cursor(columns, row, cursor_columns))
145 .collect()
146 }
147
148 fn extract_cursor(
149 &self,
150 columns: &[String],
151 row: &[Value],
152 cursor_columns: &[CursorColumn],
153 ) -> Option<Cursor> {
154 let mut block_number: Option<u64> = None;
155 let mut tx_index: Option<u32> = None;
156 let mut log_index: Option<u32> = None;
157 let mut batch_index: Option<u32> = None;
158 let mut timestamp: Option<u64> = None;
159 let mut cursor = Cursor::default();
160 let mut has_cursor_values = false;
161
162 for (col, val) in columns.iter().zip(row.iter()) {
163 match col.as_str() {
164 "blockNumber" => block_number = Self::as_u64(val),
165 "transactionIndex" => tx_index = Self::as_u32(val),
166 "logIndex" => log_index = Self::as_u32(val),
167 "batchIndex" => batch_index = Self::as_u32(val),
168 "timestamp" => timestamp = Self::as_u64(val),
169 _ => {}
170 }
171
172 if cursor_columns.iter().any(|column| column.name == *col) {
173 cursor.insert_value(col.clone(), val.clone());
174 has_cursor_values = true;
175 }
176 }
177
178 if let (Some(b), Some(tx), Some(log)) = (block_number, tx_index, log_index) {
179 cursor.block_number = b;
180 cursor.transaction_index = tx;
181 cursor.log_index = log;
182 cursor.batch_index = batch_index;
183 cursor.timestamp = timestamp;
184 return Some(cursor);
185 }
186
187 if has_cursor_values {
188 return Some(cursor);
189 }
190
191 None
192 }
193
194 fn as_u64(val: &Value) -> Option<u64> {
195 match val {
196 Value::Number(n) => n.as_u64(),
197 Value::String(s) => s.parse().ok(),
198 _ => None,
199 }
200 }
201
202 fn as_u32(val: &Value) -> Option<u32> {
203 Self::as_u64(val).map(|v| v as u32)
204 }
205}