skar_client/
lib.rs

1use std::{cmp, collections::BTreeSet, error::Error, time::Duration};
2
3use anyhow::{anyhow, Context, Result};
4use arrayvec::ArrayVec;
5use format::{Address, LogArgument};
6use futures::StreamExt;
7use polars_arrow::{array::Array, record_batch::RecordBatch as Chunk};
8use reqwest::Method;
9use skar_net_types::{
10    skar_net_types_capnp, ArchiveHeight, FieldSelection, LogSelection, Query, RollbackGuard,
11    TransactionSelection,
12};
13
14mod column_mapping;
15pub mod config;
16mod decode;
17mod parquet_out;
18mod rayon_async;
19mod transport_format;
20mod types;
21
22pub use column_mapping::{ColumnMapping, DataType};
23pub use config::Config;
24pub use decode::Decoder;
25pub use parquet_out::map_batch_to_binary_view;
26pub use skar_format as format;
27use tokio::sync::mpsc;
28pub use transport_format::{ArrowIpc, TransportFormat};
29pub use types::{ArrowBatch, ParquetConfig, QueryResponse, QueryResponseData, StreamConfig};
30
31pub type ArrowChunk = Chunk<Box<dyn Array>>;
32
33#[derive(Clone)]
34pub struct Client {
35    http_client: reqwest::Client,
36    cfg: Config,
37}
38
39impl Client {
40    /// Create a new client with given config
41    pub fn new(cfg: Config) -> Result<Self> {
42        let http_client = reqwest::Client::builder()
43            .no_gzip()
44            .timeout(Duration::from_millis(cfg.http_req_timeout_millis.get()))
45            .pool_max_idle_per_host(0)
46            .build()
47            .unwrap();
48
49        Ok(Self { http_client, cfg })
50    }
51
52    /// Create a parquet file by executing a query.
53    ///
54    /// Path should point to a folder that will contain the parquet files in the end.
55    pub async fn create_parquet_folder(&self, query: Query, config: ParquetConfig) -> Result<()> {
56        parquet_out::create_parquet_folder(self, query, config).await
57    }
58
59    /// Get the height of the source hypersync instance
60    pub async fn get_height(&self) -> Result<u64> {
61        let mut url = self.cfg.url.clone();
62        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
63        segments.push("height");
64        std::mem::drop(segments);
65        let mut req = self.http_client.request(Method::GET, url);
66
67        if let Some(bearer_token) = &self.cfg.bearer_token {
68            req = req.bearer_auth(bearer_token);
69        }
70
71        let res = req.send().await.context("execute http req")?;
72
73        let status = res.status();
74        if !status.is_success() {
75            return Err(anyhow!("http response status code {}", status));
76        }
77
78        let height: ArchiveHeight = res.json().await.context("read response body json")?;
79
80        Ok(height.height.unwrap_or(0))
81    }
82
83    /// Get the height of the source hypersync instance
84    /// Internally calls get_height.
85    /// On an error from the source hypersync instance, sleeps for
86    /// 1 second (increasing by 1 each failure up to max of 5 seconds)
87    /// and retries query until success.
88    pub async fn get_height_with_retry(&self) -> Result<u64> {
89        let mut base = 1;
90
91        loop {
92            match self.get_height().await {
93                Ok(res) => return Ok(res),
94                Err(e) => {
95                    log::error!("failed to send request to skar server: {:?}", e);
96                }
97            }
98
99            let secs = Duration::from_secs(base);
100            let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
101
102            tokio::time::sleep(secs + millis).await;
103
104            base = std::cmp::min(base + 1, 5);
105        }
106    }
107
108    pub async fn stream<Format: TransportFormat>(
109        &self,
110        query: Query,
111        config: StreamConfig,
112    ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
113        let (tx, rx) = mpsc::channel(config.concurrency);
114
115        let to_block = match query.to_block {
116            Some(to_block) => to_block,
117            None => {
118                if config.retry {
119                    self.get_height_with_retry().await.context("get height")?
120                } else {
121                    self.get_height().await.context("get height")?
122                }
123            }
124        };
125
126        let client = self.clone();
127        let step = usize::try_from(config.batch_size).unwrap();
128        tokio::spawn(async move {
129            let mut query = query;
130            let initial_res = if config.retry {
131                client
132                    .send_with_retry::<crate::ArrowIpc>(&query)
133                    .await
134                    .context("run initial query")
135            } else {
136                client
137                    .send::<crate::ArrowIpc>(&query)
138                    .await
139                    .context("run initial query")
140            };
141            match initial_res {
142                Ok(res) => {
143                    query.from_block = res.next_block;
144                    if tx.send(Ok(res)).await.is_err() {
145                        return;
146                    }
147                }
148                Err(e) => {
149                    tx.send(Err(e)).await.ok();
150                    return;
151                }
152            }
153
154            let futs = (query.from_block..to_block)
155                .step_by(step)
156                .map(move |start| {
157                    let end = cmp::min(start + config.batch_size, to_block);
158                    let mut query = query.clone();
159                    query.from_block = start;
160                    query.to_block = Some(end);
161
162                    Self::run_query_to_end(client.clone(), query, config.retry)
163                });
164
165            let mut stream = futures::stream::iter(futs).buffered(config.concurrency);
166
167            while let Some(resps) = stream.next().await {
168                let resps = match resps {
169                    Ok(resps) => resps,
170                    Err(e) => {
171                        tx.send(Err(e)).await.ok();
172                        return;
173                    }
174                };
175
176                for resp in resps {
177                    if tx.send(Ok(resp)).await.is_err() {
178                        return;
179                    }
180                }
181            }
182        });
183
184        Ok(rx)
185    }
186
187    async fn run_query_to_end(self, query: Query, retry: bool) -> Result<Vec<QueryResponse>> {
188        let mut resps = Vec::new();
189
190        let to_block = query.to_block.unwrap();
191
192        let mut query = query;
193
194        loop {
195            let resp = if retry {
196                self.send_with_retry::<crate::ArrowIpc>(&query)
197                    .await
198                    .context("send query")?
199            } else {
200                self.send::<crate::ArrowIpc>(&query)
201                    .await
202                    .context("send query")?
203            };
204
205            let next_block = resp.next_block;
206
207            resps.push(resp);
208
209            if next_block >= to_block {
210                break;
211            } else {
212                query.from_block = next_block;
213            }
214        }
215
216        Ok(resps)
217    }
218
219    /// Send a query request to the source hypersync instance.
220    ///
221    /// Returns a query response which contains block, tx and log data.
222    /// Format can be ArrowIpc or Parquet.
223    pub async fn send<Format: TransportFormat>(&self, query: &Query) -> Result<QueryResponse> {
224        let mut url = self.cfg.url.clone();
225        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
226        segments.push("query");
227        segments.push(Format::path());
228        std::mem::drop(segments);
229        let mut req = self.http_client.request(Method::POST, url);
230
231        if let Some(bearer_token) = &self.cfg.bearer_token {
232            req = req.bearer_auth(bearer_token);
233        }
234
235        let res = req.json(&query).send().await.context("execute http req")?;
236
237        let status = res.status();
238        if !status.is_success() {
239            let text = res.text().await.context("read text to see error")?;
240
241            return Err(anyhow!(
242                "http response status code {}, err body: {}",
243                status,
244                text
245            ));
246        }
247
248        let bytes = res.bytes().await.context("read response body bytes")?;
249
250        let res = tokio::task::block_in_place(|| {
251            Self::parse_query_response::<Format>(&bytes).context("parse query response")
252        })?;
253
254        Ok(res)
255    }
256
257    /// Send a query request to the source hypersync instance.
258    /// Internally calls send.
259    /// On an error from the source hypersync instance, sleeps for
260    /// 1 second (increasing by 1 each failure up to max of 5 seconds)
261    /// and retries query until success.
262    ///
263    /// Returns a query response which contains block, tx and log data.
264    /// Format can be ArrowIpc or Parquet.
265    pub async fn send_with_retry<Format: TransportFormat>(
266        &self,
267        query: &Query,
268    ) -> Result<QueryResponse> {
269        let mut base = 1;
270
271        loop {
272            match self.send::<Format>(query).await {
273                Ok(res) => return Ok(res),
274                Err(e) => {
275                    log::error!("failed to send request to skar server: {:?}", e);
276                }
277            }
278
279            let secs = Duration::from_secs(base);
280            let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
281
282            tokio::time::sleep(secs + millis).await;
283
284            base = std::cmp::min(base + 1, 5);
285        }
286    }
287
288    fn parse_query_response<Format: TransportFormat>(bytes: &[u8]) -> Result<QueryResponse> {
289        let mut opts = capnp::message::ReaderOptions::new();
290        opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
291        let message_reader =
292            capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?;
293
294        let query_response = message_reader
295            .get_root::<skar_net_types_capnp::query_response::Reader>()
296            .context("get root")?;
297
298        let archive_height = match query_response.get_archive_height() {
299            -1 => None,
300            h => Some(
301                h.try_into()
302                    .context("invalid archive height returned from server")?,
303            ),
304        };
305
306        let rollback_guard = if query_response.has_rollback_guard() {
307            let rg = query_response
308                .get_rollback_guard()
309                .context("get rollback guard")?;
310
311            Some(RollbackGuard {
312                block_number: rg.get_block_number(),
313                timestamp: rg.get_timestamp(),
314                hash: rg
315                    .get_hash()
316                    .context("get rollback guard hash")?
317                    .try_into()
318                    .context("hash size")?,
319                first_block_number: rg.get_first_block_number(),
320                first_parent_hash: rg
321                    .get_first_parent_hash()
322                    .context("get rollback guard first parent hash")?
323                    .try_into()
324                    .context("hash size")?,
325            })
326        } else {
327            None
328        };
329
330        let data = query_response.get_data().context("read data")?;
331
332        let blocks = Format::read_chunks(data.get_blocks().context("get data")?)
333            .context("parse block data")?;
334        let transactions = Format::read_chunks(data.get_transactions().context("get data")?)
335            .context("parse tx data")?;
336        let logs =
337            Format::read_chunks(data.get_logs().context("get data")?).context("parse log data")?;
338        let traces = if data.has_traces() {
339            Format::read_chunks(data.get_traces().context("get data")?)
340                .context("parse traces data")?
341        } else {
342            Vec::new()
343        };
344
345        Ok(QueryResponse {
346            archive_height,
347            next_block: query_response.get_next_block(),
348            total_execution_time: query_response.get_total_execution_time(),
349            data: QueryResponseData {
350                blocks,
351                transactions,
352                logs,
353                traces,
354            },
355            rollback_guard,
356        })
357    }
358
359    /// Returns a query for all Blocks and Transactions within the block range (from_block, to_block]
360    /// If to_block is None then query runs to the head of the chain.
361    /// Note: this is only for quickstart purposes.  For the best performance, create a custom query
362    /// that only includes the fields you'll use in `field_selection`.
363    pub fn preset_query_blocks_and_transactions(from_block: u64, to_block: Option<u64>) -> Query {
364        let all_block_fields: BTreeSet<String> = skar_schema::block_header()
365            .fields
366            .iter()
367            .map(|x| x.name.clone())
368            .collect();
369
370        let all_tx_fields: BTreeSet<String> = skar_schema::transaction()
371            .fields
372            .iter()
373            .map(|x| x.name.clone())
374            .collect();
375
376        Query {
377            from_block,
378            to_block,
379            include_all_blocks: true,
380            transactions: vec![TransactionSelection::default()],
381            field_selection: FieldSelection {
382                block: all_block_fields,
383                transaction: all_tx_fields,
384                ..Default::default()
385            },
386            ..Default::default()
387        }
388    }
389
390    /// Returns a query object for all Blocks and hashes of the Transactions within the block range
391    /// (from_block, to_block].  Also returns the block_hash and block_number fields on each Transaction
392    /// so it can be mapped to a block.  If to_block is None then query runs to the head of the chain.
393    /// Note: this is only for quickstart purposes.  For the best performance, create a custom query
394    /// that only includes the fields you'll use in `field_selection`.
395    pub fn preset_query_blocks_and_transaction_hashes(
396        from_block: u64,
397        to_block: Option<u64>,
398    ) -> Query {
399        let mut tx_field_selection = BTreeSet::new();
400        tx_field_selection.insert("block_hash".to_owned());
401        tx_field_selection.insert("block_number".to_owned());
402        tx_field_selection.insert("hash".to_owned());
403
404        let all_block_fields: BTreeSet<String> = skar_schema::block_header()
405            .fields
406            .iter()
407            .map(|x| x.name.clone())
408            .collect();
409
410        Query {
411            from_block,
412            to_block,
413            include_all_blocks: true,
414            transactions: vec![TransactionSelection::default()],
415            field_selection: FieldSelection {
416                block: all_block_fields,
417                transaction: tx_field_selection,
418                ..Default::default()
419            },
420            ..Default::default()
421        }
422    }
423
424    /// Returns a query object for all Logs within the block range (from_block, to_block] from
425    /// the given address.  If to_block is None then query runs to the head of the chain.
426    /// Note: this is only for quickstart purposes.  For the best performance, create a custom query
427    /// that only includes the fields you'll use in `field_selection`.
428    pub fn preset_query_logs<A>(from_block: u64, to_block: Option<u64>, address: A) -> Result<Query>
429    where
430        A: TryInto<Address>,
431        <A as TryInto<Address>>::Error: Error + Send + Sync + 'static,
432    {
433        let address = address.try_into().context("convert Address type")?;
434
435        let all_log_fields: BTreeSet<String> = skar_schema::log()
436            .fields
437            .iter()
438            .map(|x| x.name.clone())
439            .collect();
440
441        Ok(Query {
442            from_block,
443            to_block,
444            logs: vec![LogSelection {
445                address: vec![address],
446                ..Default::default()
447            }],
448            field_selection: FieldSelection {
449                log: all_log_fields,
450                ..Default::default()
451            },
452            ..Default::default()
453        })
454    }
455
456    /// Returns a query for all Logs within the block range (from_block, to_block] from the
457    /// given address with a matching topic0 event signature.  Topic0 is the keccak256 hash
458    /// of the event signature.  If to_block is None then query runs to the head of the chain.
459    /// Note: this is only for quickstart purposes.  For the best performance, create a custom query
460    /// that only includes the fields you'll use in `field_selection`.
461    pub fn preset_query_logs_of_event<A, T>(
462        from_block: u64,
463        to_block: Option<u64>,
464        topic0: T,
465        address: A,
466    ) -> Result<Query>
467    where
468        A: TryInto<Address>,
469        <A as TryInto<Address>>::Error: Error + Send + Sync + 'static,
470        T: TryInto<LogArgument>,
471        <T as TryInto<LogArgument>>::Error: Error + Send + Sync + 'static,
472    {
473        let topic0 = topic0.try_into().context("convert Topic0 type")?;
474        let mut topics = ArrayVec::<Vec<LogArgument>, 4>::new();
475        topics.insert(0, vec![topic0]);
476
477        let address = address.try_into().context("convert Address type")?;
478
479        let all_log_fields: BTreeSet<String> = skar_schema::log()
480            .fields
481            .iter()
482            .map(|x| x.name.clone())
483            .collect();
484
485        Ok(Query {
486            from_block,
487            to_block,
488            logs: vec![LogSelection {
489                address: vec![address],
490                topics,
491            }],
492            field_selection: FieldSelection {
493                log: all_log_fields,
494                ..Default::default()
495            },
496            ..Default::default()
497        })
498    }
499
500    /// Returns a query object for all transactions within the block range (from_block, to_block].
501    /// If to_block is None then query runs to the head of the chain.
502    /// Note: this is only for quickstart purposes.  For the best performance, create a custom query
503    /// that only includes the fields you'll use in `field_selection`.
504    pub fn preset_query_transactions(from_block: u64, to_block: Option<u64>) -> Query {
505        let all_txn_fields: BTreeSet<String> = skar_schema::transaction()
506            .fields
507            .iter()
508            .map(|x| x.name.clone())
509            .collect();
510
511        Query {
512            from_block,
513            to_block,
514            transactions: vec![TransactionSelection::default()],
515            field_selection: FieldSelection {
516                transaction: all_txn_fields,
517                ..Default::default()
518            },
519            ..Default::default()
520        }
521    }
522
523    /// Returns a query object for all transactions from an address within the block range
524    /// (from_block, to_block].  If to_block is None then query runs to the head of the chain.
525    /// Note: this is only for quickstart purposes.  For the best performance, create a custom query
526    /// that only includes the fields you'll use in `field_selection`.
527    pub fn preset_query_transactions_from_address<A>(
528        from_block: u64,
529        to_block: Option<u64>,
530        address: A,
531    ) -> Result<Query>
532    where
533        A: TryInto<Address>,
534        <A as TryInto<Address>>::Error: Error + Send + Sync + 'static,
535    {
536        let address = address.try_into().context("convert Address type")?;
537
538        let all_txn_fields: BTreeSet<String> = skar_schema::transaction()
539            .fields
540            .iter()
541            .map(|x| x.name.clone())
542            .collect();
543
544        Ok(Query {
545            from_block,
546            to_block,
547            transactions: vec![TransactionSelection {
548                from: vec![address],
549                ..Default::default()
550            }],
551            field_selection: FieldSelection {
552                transaction: all_txn_fields,
553                ..Default::default()
554            },
555            ..Default::default()
556        })
557    }
558}