1use std::{
2 collections::{BTreeSet, HashSet},
3 time::Duration,
4};
5
6use anyhow::{anyhow, Context, Result};
7use arrow2::{array::Array, chunk::Chunk};
8
9use filter::filter_out_unselected_data;
10use format::{Transaction, TransactionStatus};
11use from_arrow::{receipts_from_arrow_data, typed_data_from_arrow_data, FromArrow};
12use reqwest::Method;
13use skar_format::Hash;
14use skar_net_types::{
15 skar_net_types_capnp, ArchiveHeight, FieldSelection, Query, ReceiptSelection,
16};
17
18pub mod config;
19mod filter;
20mod from_arrow;
21mod parquet_out;
22mod transport_format;
23mod types;
24
25pub use config::Config;
26pub use skar_format as format;
27pub use transport_format::{ArrowIpc, TransportFormat};
28pub use types::{
29 ArrowBatch, LogContext, LogResponse, QueryResponse, QueryResponseData, QueryResponseDataTyped,
30 QueryResponseTyped,
31};
32
33pub type ArrowChunk = Chunk<Box<dyn Array>>;
34
35pub struct Client {
36 http_client: reqwest::Client,
37 cfg: Config,
38}
39
40impl Client {
41 pub fn new(cfg: Config) -> Result<Self> {
43 let http_client = reqwest::Client::builder()
44 .no_gzip()
45 .http1_only()
46 .timeout(Duration::from_millis(cfg.http_req_timeout_millis.get()))
47 .tcp_keepalive(Duration::from_secs(7200))
48 .connect_timeout(Duration::from_millis(cfg.http_req_timeout_millis.get()))
49 .build()
50 .unwrap();
51
52 Ok(Self { http_client, cfg })
53 }
54
55 pub async fn create_parquet_folder(&self, query: Query, path: String) -> Result<()> {
64 parquet_out::create_parquet_folder(self, query, path).await
65 }
66
67 pub async fn get_height(&self) -> Result<u64> {
69 let mut url = self.cfg.url.clone();
70 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
71 segments.push("height");
72 std::mem::drop(segments);
73 let mut req = self.http_client.request(Method::GET, url);
74
75 if let Some(bearer_token) = &self.cfg.bearer_token {
76 req = req.bearer_auth(bearer_token);
77 }
78
79 let res = req.send().await.context("execute http req")?;
80
81 let status = res.status();
82 if !status.is_success() {
83 return Err(anyhow!("http response status code {}", status));
84 }
85
86 let height: ArchiveHeight = res.json().await.context("read response body json")?;
87
88 Ok(height.height.unwrap_or(0))
89 }
90
91 pub async fn get_height_with_retry(&self) -> Result<u64> {
97 let mut base = 1;
98
99 loop {
100 match self.get_height().await {
101 Ok(res) => return Ok(res),
102 Err(e) => {
103 log::error!("failed to send request to skar server: {:?}", e);
104 }
105 }
106
107 let secs = Duration::from_secs(base);
108 let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
109
110 tokio::time::sleep(secs + millis).await;
111
112 base = std::cmp::min(base + 1, 5);
113 }
114 }
115
116 pub async fn get_data(&self, query: &Query) -> Result<QueryResponseTyped> {
125 let res = self.get_arrow_data(query).await.context("get arrow data")?;
126
127 let typed_data =
128 typed_data_from_arrow_data(res.data).context("convert arrow data to typed response")?;
129
130 Ok(QueryResponseTyped {
131 archive_height: res.archive_height,
132 next_block: res.next_block,
133 total_execution_time: res.total_execution_time,
134 data: typed_data,
135 })
136 }
137
138 pub async fn get_selected_data(&self, query: &Query) -> Result<QueryResponseTyped> {
143 let query = add_selections_to_field_selection(&mut query.clone());
144
145 let res = self
146 .get_arrow_data(&query)
147 .await
148 .context("get arrow data")?;
149
150 let filtered_data =
151 filter_out_unselected_data(res.data, &query).context("filter out unselected data")?;
152
153 let typed_data = typed_data_from_arrow_data(filtered_data)
154 .context("convert arrow data to typed response")?;
155
156 Ok(QueryResponseTyped {
157 archive_height: res.archive_height,
158 next_block: res.next_block,
159 total_execution_time: res.total_execution_time,
160 data: typed_data,
161 })
162 }
163
164 pub async fn preset_query_get_logs<H: Into<Hash>>(
175 &self,
176 emitting_contracts: Vec<H>,
177 from_block: u64,
178 to_block: Option<u64>,
179 ) -> Result<LogResponse> {
180 let mut transaction_field_selection = BTreeSet::new();
181 transaction_field_selection.insert("id".to_owned());
182 transaction_field_selection.insert("status".to_owned());
183
184 let mut receipt_field_selection = BTreeSet::new();
185 receipt_field_selection.insert("block_height".to_owned());
186 receipt_field_selection.insert("tx_id".to_owned());
187 receipt_field_selection.insert("receipt_index".to_owned());
188 receipt_field_selection.insert("receipt_type".to_owned());
189 receipt_field_selection.insert("contract_id".to_owned());
190 receipt_field_selection.insert("root_contract_id".to_owned());
191 receipt_field_selection.insert("ra".to_owned());
192 receipt_field_selection.insert("rb".to_owned());
193 receipt_field_selection.insert("rc".to_owned());
194 receipt_field_selection.insert("rd".to_owned());
195 receipt_field_selection.insert("pc".to_owned());
196 receipt_field_selection.insert("is".to_owned());
197 receipt_field_selection.insert("ptr".to_owned());
198 receipt_field_selection.insert("len".to_owned());
199 receipt_field_selection.insert("digest".to_owned());
200 receipt_field_selection.insert("data".to_owned());
201
202 let emitting_contracts: Vec<Hash> =
203 emitting_contracts.into_iter().map(|c| c.into()).collect();
204 let query = Query {
205 from_block,
206 to_block,
207 receipts: vec![
208 ReceiptSelection {
209 root_contract_id: emitting_contracts.clone(),
210 receipt_type: vec![5, 6],
211 ..Default::default()
212 },
213 ReceiptSelection {
214 contract_id: emitting_contracts,
215 receipt_type: vec![5, 6],
216 ..Default::default()
217 },
218 ],
219 field_selection: FieldSelection {
220 transaction: transaction_field_selection,
221 receipt: receipt_field_selection,
222 ..Default::default()
223 },
224 ..Default::default()
225 };
226
227 let res = self
228 .get_arrow_data(&query)
229 .await
230 .context("get arrow data")?;
231
232 let filtered_data = filter_out_unselected_data(res.data, &query)
233 .context("filter out unselected receipts")?;
234
235 let typed_receipts = receipts_from_arrow_data(&filtered_data.receipts)
236 .context("convert arrow data to receipt response")?;
237
238 let mut failed_txns = HashSet::new();
239 for batch in filtered_data.transactions.iter() {
240 let data = Transaction::from_arrow(batch).context("transaction from arrow")?;
241 for transaction in data {
242 if transaction.status == TransactionStatus::Failure {
243 failed_txns.insert(transaction.id);
244 }
245 }
246 }
247
248 let successful_logs: Vec<LogContext> = typed_receipts
249 .into_iter()
250 .filter_map(|receipt| {
251 if !failed_txns.contains(&receipt.tx_id) {
252 Some(receipt.into())
253 } else {
254 None
255 }
256 })
257 .collect();
258
259 Ok(LogResponse {
260 archive_height: res.archive_height,
261 next_block: res.next_block,
262 total_execution_time: res.total_execution_time,
263 data: successful_logs,
264 })
265 }
266
267 pub async fn get_arrow_data(&self, query: &Query) -> Result<QueryResponse> {
276 let mut url = self.cfg.url.clone();
277 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
278 segments.push("query");
279 segments.push(ArrowIpc::path());
280 std::mem::drop(segments);
281 let mut req = self.http_client.request(Method::POST, url);
282
283 if let Some(bearer_token) = &self.cfg.bearer_token {
284 req = req.bearer_auth(bearer_token);
285 }
286
287 log::trace!("sending req to skar");
288 let res = req.json(&query).send().await.context("execute http req")?;
289 log::trace!("got req response");
290
291 let status = res.status();
292 if !status.is_success() {
293 let text = res.text().await.context("read text to see error")?;
294
295 return Err(anyhow!(
296 "http response status code {}, err body: {}",
297 status,
298 text
299 ));
300 }
301
302 log::trace!("starting to get response body bytes");
303
304 let bytes = res.bytes().await.context("read response body bytes")?;
305
306 log::trace!("starting to parse query response");
307
308 let res = tokio::task::block_in_place(|| {
309 self.parse_query_response::<ArrowIpc>(&bytes)
310 .context("parse query response")
311 })?;
312
313 log::trace!("got data from skar");
314
315 Ok(res)
316 }
317
318 pub async fn get_arrow_data_with_retry(&self, query: &Query) -> Result<QueryResponse> {
332 let mut base = 1;
333
334 loop {
335 match self.get_arrow_data(query).await {
336 Ok(res) => return Ok(res),
337 Err(e) => {
338 log::error!("failed to send request to skar server: {:?}", e);
339 }
340 }
341
342 let secs = Duration::from_secs(base);
343 let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
344
345 tokio::time::sleep(secs + millis).await;
346
347 base = std::cmp::min(base + 1, 5);
348 }
349 }
350
351 fn parse_query_response<Format: TransportFormat>(&self, bytes: &[u8]) -> Result<QueryResponse> {
352 let mut opts = capnp::message::ReaderOptions::new();
353 opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
354 let message_reader =
355 capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?;
356
357 let query_response = message_reader
358 .get_root::<skar_net_types_capnp::query_response::Reader>()
359 .context("get root")?;
360
361 let archive_height = match query_response.get_archive_height() {
362 -1 => None,
363 h => Some(
364 h.try_into()
365 .context("invalid archive height returned from server")?,
366 ),
367 };
368
369 let data = query_response.get_data().context("read data")?;
370
371 let blocks = Format::read_chunks(data.get_blocks().context("get data")?)
372 .context("parse block data")?;
373 let transactions = Format::read_chunks(data.get_transactions().context("get data")?)
374 .context("parse tx data")?;
375 let receipts = Format::read_chunks(data.get_receipts().context("get data")?)
376 .context("parse receipt data")?;
377 let inputs = Format::read_chunks(data.get_inputs().context("get data")?)
378 .context("parse input data")?;
379 let outputs = Format::read_chunks(data.get_outputs().context("get data")?)
380 .context("parse output data")?;
381
382 Ok(QueryResponse {
383 archive_height,
384 next_block: query_response.get_next_block(),
385 total_execution_time: query_response.get_total_execution_time(),
386 data: QueryResponseData {
387 blocks,
388 transactions,
389 receipts,
390 inputs,
391 outputs,
392 },
393 })
394 }
395}
396
397fn add_selections_to_field_selection(query: &mut Query) -> Query {
400 query.receipts.iter_mut().for_each(|selection| {
401 if !selection.root_contract_id.is_empty() {
402 query
403 .field_selection
404 .receipt
405 .insert("root_contract_id".into());
406 }
407 if !selection.to_address.is_empty() {
408 query.field_selection.receipt.insert("to_address".into());
409 }
410 if !selection.asset_id.is_empty() {
411 query.field_selection.receipt.insert("asset_id".into());
412 }
413 if !selection.receipt_type.is_empty() {
414 query.field_selection.receipt.insert("receipt_type".into());
415 }
416 if !selection.sender.is_empty() {
417 query.field_selection.receipt.insert("sender".into());
418 }
419 if !selection.recipient.is_empty() {
420 query.field_selection.receipt.insert("recipient".into());
421 }
422 if !selection.contract_id.is_empty() {
423 query.field_selection.receipt.insert("contract_id".into());
424 }
425 if !selection.ra.is_empty() {
426 query.field_selection.receipt.insert("ra".into());
427 }
428 if !selection.rb.is_empty() {
429 query.field_selection.receipt.insert("rb".into());
430 }
431 if !selection.rc.is_empty() {
432 query.field_selection.receipt.insert("rc".into());
433 }
434 if !selection.rd.is_empty() {
435 query.field_selection.receipt.insert("rd".into());
436 }
437 });
438
439 query.inputs.iter_mut().for_each(|selection| {
440 if !selection.owner.is_empty() {
441 query.field_selection.input.insert("owner".into());
442 }
443 if !selection.asset_id.is_empty() {
444 query.field_selection.input.insert("asset_id".into());
445 }
446 if !selection.contract.is_empty() {
447 query.field_selection.input.insert("contract".into());
448 }
449 if !selection.sender.is_empty() {
450 query.field_selection.input.insert("sender".into());
451 }
452 if !selection.recipient.is_empty() {
453 query.field_selection.input.insert("recipient".into());
454 }
455 if !selection.input_type.is_empty() {
456 query.field_selection.input.insert("input_type".into());
457 }
458 });
459
460 query.outputs.iter_mut().for_each(|selection| {
461 if !selection.to.is_empty() {
462 query.field_selection.output.insert("to".into());
463 }
464 if !selection.asset_id.is_empty() {
465 query.field_selection.output.insert("asset_id".into());
466 }
467 if !selection.contract.is_empty() {
468 query.field_selection.output.insert("contract".into());
469 }
470 if !selection.output_type.is_empty() {
471 query.field_selection.output.insert("output_type".into());
472 }
473 });
474
475 query.clone()
476}