skar_client_fuel/
lib.rs

1use std::{
2    collections::{BTreeSet, HashSet},
3    time::Duration,
4};
5
6use anyhow::{anyhow, Context, Result};
7use arrow2::{array::Array, chunk::Chunk};
8
9use filter::filter_out_unselected_data;
10use format::{Transaction, TransactionStatus};
11use from_arrow::{receipts_from_arrow_data, typed_data_from_arrow_data, FromArrow};
12use reqwest::Method;
13use skar_format::Hash;
14use skar_net_types::{
15    skar_net_types_capnp, ArchiveHeight, FieldSelection, Query, ReceiptSelection,
16};
17
18pub mod config;
19mod filter;
20mod from_arrow;
21mod parquet_out;
22mod transport_format;
23mod types;
24
25pub use config::Config;
26pub use skar_format as format;
27pub use transport_format::{ArrowIpc, TransportFormat};
28pub use types::{
29    ArrowBatch, LogContext, LogResponse, QueryResponse, QueryResponseData, QueryResponseDataTyped,
30    QueryResponseTyped,
31};
32
33pub type ArrowChunk = Chunk<Box<dyn Array>>;
34
35pub struct Client {
36    http_client: reqwest::Client,
37    cfg: Config,
38}
39
40impl Client {
41    /// Create a new client with given config
42    pub fn new(cfg: Config) -> Result<Self> {
43        let http_client = reqwest::Client::builder()
44            .no_gzip()
45            .http1_only()
46            .timeout(Duration::from_millis(cfg.http_req_timeout_millis.get()))
47            .tcp_keepalive(Duration::from_secs(7200))
48            .connect_timeout(Duration::from_millis(cfg.http_req_timeout_millis.get()))
49            .build()
50            .unwrap();
51
52        Ok(Self { http_client, cfg })
53    }
54
55    /// Create a parquet file by executing a query.
56    ///
57    /// If the query can't be finished in a single request, this function will
58    /// keep on making requests using the pagination mechanism (next_block) until
59    /// it reaches the end. It will stream data into the parquet file as it comes from
60    /// the server.
61    ///
62    /// Path should point to a folder that will contain the parquet files in the end.
63    pub async fn create_parquet_folder(&self, query: Query, path: String) -> Result<()> {
64        parquet_out::create_parquet_folder(self, query, path).await
65    }
66
67    /// Get the height of the source hypersync instance
68    pub async fn get_height(&self) -> Result<u64> {
69        let mut url = self.cfg.url.clone();
70        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
71        segments.push("height");
72        std::mem::drop(segments);
73        let mut req = self.http_client.request(Method::GET, url);
74
75        if let Some(bearer_token) = &self.cfg.bearer_token {
76            req = req.bearer_auth(bearer_token);
77        }
78
79        let res = req.send().await.context("execute http req")?;
80
81        let status = res.status();
82        if !status.is_success() {
83            return Err(anyhow!("http response status code {}", status));
84        }
85
86        let height: ArchiveHeight = res.json().await.context("read response body json")?;
87
88        Ok(height.height.unwrap_or(0))
89    }
90
91    /// Get the height of the source hypersync instance
92    /// Internally calls get_height.
93    /// On an error from the source hypersync instance, sleeps for
94    /// 1 second (increasing by 1 each failure up to max of 5 seconds)
95    /// and retries query until success.
96    pub async fn get_height_with_retry(&self) -> Result<u64> {
97        let mut base = 1;
98
99        loop {
100            match self.get_height().await {
101                Ok(res) => return Ok(res),
102                Err(e) => {
103                    log::error!("failed to send request to skar server: {:?}", e);
104                }
105            }
106
107            let secs = Duration::from_secs(base);
108            let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
109
110            tokio::time::sleep(secs + millis).await;
111
112            base = std::cmp::min(base + 1, 5);
113        }
114    }
115
116    /// Send a query request to the source hypersync instance.
117    ///
118    /// Returns a query response which contains typed data.
119    ///
120    /// NOTE: this query returns loads all transactions that your match your receipt, input, or output selections
121    /// and applies the field selection to all these loaded transactions.  So your query will return the data you
122    /// want plus additional data from the loaded transactions.  This functionality is in case you want to associate
123    /// receipts, inputs, or outputs with eachother.
124    pub async fn get_data(&self, query: &Query) -> Result<QueryResponseTyped> {
125        let res = self.get_arrow_data(query).await.context("get arrow data")?;
126
127        let typed_data =
128            typed_data_from_arrow_data(res.data).context("convert arrow data to typed response")?;
129
130        Ok(QueryResponseTyped {
131            archive_height: res.archive_height,
132            next_block: res.next_block,
133            total_execution_time: res.total_execution_time,
134            data: typed_data,
135        })
136    }
137
138    /// Send a query request to the source hypersync instance.
139    ///
140    /// Returns a query response that which contains structured data that doesn't include any inputs, outputs,
141    /// and receipts that don't exactly match the query's input, outout, or receipt selection.
142    pub async fn get_selected_data(&self, query: &Query) -> Result<QueryResponseTyped> {
143        let query = add_selections_to_field_selection(&mut query.clone());
144
145        let res = self
146            .get_arrow_data(&query)
147            .await
148            .context("get arrow data")?;
149
150        let filtered_data =
151            filter_out_unselected_data(res.data, &query).context("filter out unselected data")?;
152
153        let typed_data = typed_data_from_arrow_data(filtered_data)
154            .context("convert arrow data to typed response")?;
155
156        Ok(QueryResponseTyped {
157            archive_height: res.archive_height,
158            next_block: res.next_block,
159            total_execution_time: res.total_execution_time,
160            data: typed_data,
161        })
162    }
163
164    /// Send a query request to the source hypersync instance.
165    ///
166    /// Returns all log and logdata receipts of logs emitted by any of the specified contracts
167    /// within the block range.
168    /// If no 'to_block' is specified, query will run to the head of the chain.
169    /// Returned data contains all the data needed to decode Fuel Log or LogData
170    /// receipts as well as some extra data for context.  This query doesn't return any logs that
171    /// were a part of a failed transaction.
172    ///
173    /// NOTE: this function is experimental and might be removed in future versions.
174    pub async fn preset_query_get_logs<H: Into<Hash>>(
175        &self,
176        emitting_contracts: Vec<H>,
177        from_block: u64,
178        to_block: Option<u64>,
179    ) -> Result<LogResponse> {
180        let mut transaction_field_selection = BTreeSet::new();
181        transaction_field_selection.insert("id".to_owned());
182        transaction_field_selection.insert("status".to_owned());
183
184        let mut receipt_field_selection = BTreeSet::new();
185        receipt_field_selection.insert("block_height".to_owned());
186        receipt_field_selection.insert("tx_id".to_owned());
187        receipt_field_selection.insert("receipt_index".to_owned());
188        receipt_field_selection.insert("receipt_type".to_owned());
189        receipt_field_selection.insert("contract_id".to_owned());
190        receipt_field_selection.insert("root_contract_id".to_owned());
191        receipt_field_selection.insert("ra".to_owned());
192        receipt_field_selection.insert("rb".to_owned());
193        receipt_field_selection.insert("rc".to_owned());
194        receipt_field_selection.insert("rd".to_owned());
195        receipt_field_selection.insert("pc".to_owned());
196        receipt_field_selection.insert("is".to_owned());
197        receipt_field_selection.insert("ptr".to_owned());
198        receipt_field_selection.insert("len".to_owned());
199        receipt_field_selection.insert("digest".to_owned());
200        receipt_field_selection.insert("data".to_owned());
201
202        let emitting_contracts: Vec<Hash> =
203            emitting_contracts.into_iter().map(|c| c.into()).collect();
204        let query = Query {
205            from_block,
206            to_block,
207            receipts: vec![
208                ReceiptSelection {
209                    root_contract_id: emitting_contracts.clone(),
210                    receipt_type: vec![5, 6],
211                    ..Default::default()
212                },
213                ReceiptSelection {
214                    contract_id: emitting_contracts,
215                    receipt_type: vec![5, 6],
216                    ..Default::default()
217                },
218            ],
219            field_selection: FieldSelection {
220                transaction: transaction_field_selection,
221                receipt: receipt_field_selection,
222                ..Default::default()
223            },
224            ..Default::default()
225        };
226
227        let res = self
228            .get_arrow_data(&query)
229            .await
230            .context("get arrow data")?;
231
232        let filtered_data = filter_out_unselected_data(res.data, &query)
233            .context("filter out unselected receipts")?;
234
235        let typed_receipts = receipts_from_arrow_data(&filtered_data.receipts)
236            .context("convert arrow data to receipt response")?;
237
238        let mut failed_txns = HashSet::new();
239        for batch in filtered_data.transactions.iter() {
240            let data = Transaction::from_arrow(batch).context("transaction from arrow")?;
241            for transaction in data {
242                if transaction.status == TransactionStatus::Failure {
243                    failed_txns.insert(transaction.id);
244                }
245            }
246        }
247
248        let successful_logs: Vec<LogContext> = typed_receipts
249            .into_iter()
250            .filter_map(|receipt| {
251                if !failed_txns.contains(&receipt.tx_id) {
252                    Some(receipt.into())
253                } else {
254                    None
255                }
256            })
257            .collect();
258
259        Ok(LogResponse {
260            archive_height: res.archive_height,
261            next_block: res.next_block,
262            total_execution_time: res.total_execution_time,
263            data: successful_logs,
264        })
265    }
266
267    /// Send a query request to the source hypersync instance.
268    ///
269    /// Returns a query response which contains arrow data.
270    ///
271    /// NOTE: this query returns loads all transactions that your match your receipt, input, or output selections
272    /// and applies the field selection to all these loaded transactions.  So your query will return the data you
273    /// want plus additional data from the loaded transactions.  This functionality is in case you want to associate
274    /// receipts, inputs, or outputs with eachother.
275    pub async fn get_arrow_data(&self, query: &Query) -> Result<QueryResponse> {
276        let mut url = self.cfg.url.clone();
277        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
278        segments.push("query");
279        segments.push(ArrowIpc::path());
280        std::mem::drop(segments);
281        let mut req = self.http_client.request(Method::POST, url);
282
283        if let Some(bearer_token) = &self.cfg.bearer_token {
284            req = req.bearer_auth(bearer_token);
285        }
286
287        log::trace!("sending req to skar");
288        let res = req.json(&query).send().await.context("execute http req")?;
289        log::trace!("got req response");
290
291        let status = res.status();
292        if !status.is_success() {
293            let text = res.text().await.context("read text to see error")?;
294
295            return Err(anyhow!(
296                "http response status code {}, err body: {}",
297                status,
298                text
299            ));
300        }
301
302        log::trace!("starting to get response body bytes");
303
304        let bytes = res.bytes().await.context("read response body bytes")?;
305
306        log::trace!("starting to parse query response");
307
308        let res = tokio::task::block_in_place(|| {
309            self.parse_query_response::<ArrowIpc>(&bytes)
310                .context("parse query response")
311        })?;
312
313        log::trace!("got data from skar");
314
315        Ok(res)
316    }
317
318    /// Send a query request to the source hypersync instance.
319    /// Internally calls send.
320    /// On an error from the source hypersync instance, sleeps for
321    /// 1 second (increasing by 1 each failure up to max of 5 seconds)
322    /// and retries query until success.
323    ///
324    /// Returns a query response which contains arrow data.
325    ///
326    /// NOTE: this query returns loads all transactions that your match your receipt, input, or output selections
327    /// and applies the field selection to all these loaded transactions.  So your query will return the data you
328    /// want plus additional data from the loaded transactions.  This functionality is in case you want to associate
329    /// receipts, inputs, or outputs with eachother.
330    /// Format can be ArrowIpc.
331    pub async fn get_arrow_data_with_retry(&self, query: &Query) -> Result<QueryResponse> {
332        let mut base = 1;
333
334        loop {
335            match self.get_arrow_data(query).await {
336                Ok(res) => return Ok(res),
337                Err(e) => {
338                    log::error!("failed to send request to skar server: {:?}", e);
339                }
340            }
341
342            let secs = Duration::from_secs(base);
343            let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
344
345            tokio::time::sleep(secs + millis).await;
346
347            base = std::cmp::min(base + 1, 5);
348        }
349    }
350
351    fn parse_query_response<Format: TransportFormat>(&self, bytes: &[u8]) -> Result<QueryResponse> {
352        let mut opts = capnp::message::ReaderOptions::new();
353        opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
354        let message_reader =
355            capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?;
356
357        let query_response = message_reader
358            .get_root::<skar_net_types_capnp::query_response::Reader>()
359            .context("get root")?;
360
361        let archive_height = match query_response.get_archive_height() {
362            -1 => None,
363            h => Some(
364                h.try_into()
365                    .context("invalid archive height returned from server")?,
366            ),
367        };
368
369        let data = query_response.get_data().context("read data")?;
370
371        let blocks = Format::read_chunks(data.get_blocks().context("get data")?)
372            .context("parse block data")?;
373        let transactions = Format::read_chunks(data.get_transactions().context("get data")?)
374            .context("parse tx data")?;
375        let receipts = Format::read_chunks(data.get_receipts().context("get data")?)
376            .context("parse receipt data")?;
377        let inputs = Format::read_chunks(data.get_inputs().context("get data")?)
378            .context("parse input data")?;
379        let outputs = Format::read_chunks(data.get_outputs().context("get data")?)
380            .context("parse output data")?;
381
382        Ok(QueryResponse {
383            archive_height,
384            next_block: query_response.get_next_block(),
385            total_execution_time: query_response.get_total_execution_time(),
386            data: QueryResponseData {
387                blocks,
388                transactions,
389                receipts,
390                inputs,
391                outputs,
392            },
393        })
394    }
395}
396
397// receipt, input, and output selections must have the associated query fields in
398// field_selection or else we can't do client-side filtering via comparison
399fn add_selections_to_field_selection(query: &mut Query) -> Query {
400    query.receipts.iter_mut().for_each(|selection| {
401        if !selection.root_contract_id.is_empty() {
402            query
403                .field_selection
404                .receipt
405                .insert("root_contract_id".into());
406        }
407        if !selection.to_address.is_empty() {
408            query.field_selection.receipt.insert("to_address".into());
409        }
410        if !selection.asset_id.is_empty() {
411            query.field_selection.receipt.insert("asset_id".into());
412        }
413        if !selection.receipt_type.is_empty() {
414            query.field_selection.receipt.insert("receipt_type".into());
415        }
416        if !selection.sender.is_empty() {
417            query.field_selection.receipt.insert("sender".into());
418        }
419        if !selection.recipient.is_empty() {
420            query.field_selection.receipt.insert("recipient".into());
421        }
422        if !selection.contract_id.is_empty() {
423            query.field_selection.receipt.insert("contract_id".into());
424        }
425        if !selection.ra.is_empty() {
426            query.field_selection.receipt.insert("ra".into());
427        }
428        if !selection.rb.is_empty() {
429            query.field_selection.receipt.insert("rb".into());
430        }
431        if !selection.rc.is_empty() {
432            query.field_selection.receipt.insert("rc".into());
433        }
434        if !selection.rd.is_empty() {
435            query.field_selection.receipt.insert("rd".into());
436        }
437    });
438
439    query.inputs.iter_mut().for_each(|selection| {
440        if !selection.owner.is_empty() {
441            query.field_selection.input.insert("owner".into());
442        }
443        if !selection.asset_id.is_empty() {
444            query.field_selection.input.insert("asset_id".into());
445        }
446        if !selection.contract.is_empty() {
447            query.field_selection.input.insert("contract".into());
448        }
449        if !selection.sender.is_empty() {
450            query.field_selection.input.insert("sender".into());
451        }
452        if !selection.recipient.is_empty() {
453            query.field_selection.input.insert("recipient".into());
454        }
455        if !selection.input_type.is_empty() {
456            query.field_selection.input.insert("input_type".into());
457        }
458    });
459
460    query.outputs.iter_mut().for_each(|selection| {
461        if !selection.to.is_empty() {
462            query.field_selection.output.insert("to".into());
463        }
464        if !selection.asset_id.is_empty() {
465            query.field_selection.output.insert("asset_id".into());
466        }
467        if !selection.contract.is_empty() {
468            query.field_selection.output.insert("contract".into());
469        }
470        if !selection.output_type.is_empty() {
471            query.field_selection.output.insert("output_type".into());
472        }
473    });
474
475    query.clone()
476}