hypersync_client/
lib.rs

1#![deny(missing_docs)]
2//! Hypersync client library for interacting with hypersync server.
3use std::{num::NonZeroU64, sync::Arc, time::Duration};
4
5use anyhow::{anyhow, Context, Result};
6use hypersync_net_types::{ArchiveHeight, ChainId, Query};
7use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk};
8use reqwest::Method;
9
10mod column_mapping;
11mod config;
12mod decode;
13mod decode_call;
14mod from_arrow;
15mod parquet_out;
16mod parse_response;
17pub mod preset_query;
18mod rayon_async;
19pub mod simple_types;
20mod stream;
21#[cfg(feature = "ethers")]
22pub mod to_ethers;
23mod types;
24mod util;
25
26pub use from_arrow::FromArrow;
27pub use hypersync_format as format;
28pub use hypersync_net_types as net_types;
29pub use hypersync_schema as schema;
30
31use parse_response::parse_query_response;
32use simple_types::Event;
33use tokio::sync::mpsc;
34use types::{EventResponse, ResponseData};
35use url::Url;
36
37pub use column_mapping::{ColumnMapping, DataType};
38pub use config::HexOutput;
39pub use config::{ClientConfig, StreamConfig};
40pub use decode::Decoder;
41pub use decode_call::CallDecoder;
42pub use types::{ArrowBatch, ArrowResponse, ArrowResponseData, QueryResponse};
43
44type ArrowChunk = Chunk<Box<dyn Array>>;
45
46/// Internal client to handle http requests and retries.
47#[derive(Clone, Debug)]
48pub struct Client {
49    /// Initialized reqwest instance for client url.
50    http_client: reqwest::Client,
51    /// HyperSync server URL.
52    url: Url,
53    /// HyperSync server bearer token.
54    bearer_token: Option<String>,
55    /// Number of retries to attempt before returning error.
56    max_num_retries: usize,
57    /// Milliseconds that would be used for retry backoff increasing.
58    retry_backoff_ms: u64,
59    /// Initial wait time for request backoff.
60    retry_base_ms: u64,
61    /// Ceiling time for request backoff.
62    retry_ceiling_ms: u64,
63}
64
65impl Client {
66    /// Creates a new client with the given configuration.
67    pub fn new(cfg: ClientConfig) -> Result<Self> {
68        let timeout = cfg
69            .http_req_timeout_millis
70            .unwrap_or(NonZeroU64::new(30_000).unwrap());
71
72        let http_client = reqwest::Client::builder()
73            .no_gzip()
74            .timeout(Duration::from_millis(timeout.get()))
75            .build()
76            .unwrap();
77
78        Ok(Self {
79            http_client,
80            url: cfg
81                .url
82                .unwrap_or("https://eth.hypersync.xyz".parse().context("parse url")?),
83            bearer_token: cfg.bearer_token,
84            max_num_retries: cfg.max_num_retries.unwrap_or(12),
85            retry_backoff_ms: cfg.retry_backoff_ms.unwrap_or(500),
86            retry_base_ms: cfg.retry_base_ms.unwrap_or(200),
87            retry_ceiling_ms: cfg.retry_ceiling_ms.unwrap_or(5_000),
88        })
89    }
90
91    /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
92    /// query and stream configuration.
93    ///
94    /// ### Implementation
95    /// Runs multiple queries simultaneously based on config.concurrency.
96    ///
97    /// Each query runs until it reaches query.to, server height, any max_num_* query param,
98    /// or execution timed out by server.
99    pub async fn collect(
100        self: Arc<Self>,
101        query: Query,
102        config: StreamConfig,
103    ) -> Result<QueryResponse> {
104        check_simple_stream_params(&config)?;
105
106        let mut recv = stream::stream_arrow(self, query, config)
107            .await
108            .context("start stream")?;
109
110        let mut data = ResponseData::default();
111        let mut archive_height = None;
112        let mut next_block = 0;
113        let mut total_execution_time = 0;
114
115        while let Some(res) = recv.recv().await {
116            let res = res.context("get response")?;
117            let res: QueryResponse = QueryResponse::from(&res);
118
119            for batch in res.data.blocks {
120                data.blocks.push(batch);
121            }
122            for batch in res.data.transactions {
123                data.transactions.push(batch);
124            }
125            for batch in res.data.logs {
126                data.logs.push(batch);
127            }
128            for batch in res.data.traces {
129                data.traces.push(batch);
130            }
131
132            archive_height = res.archive_height;
133            next_block = res.next_block;
134            total_execution_time += res.total_execution_time
135        }
136
137        Ok(QueryResponse {
138            archive_height,
139            next_block,
140            total_execution_time,
141            data,
142            rollback_guard: None,
143        })
144    }
145
146    /// Retrieves events through a stream using the provided query and stream configuration.
147    pub async fn collect_events(
148        self: Arc<Self>,
149        mut query: Query,
150        config: StreamConfig,
151    ) -> Result<EventResponse> {
152        check_simple_stream_params(&config)?;
153
154        add_event_join_fields_to_selection(&mut query);
155
156        let mut recv = stream::stream_arrow(self, query, config)
157            .await
158            .context("start stream")?;
159
160        let mut data = Vec::new();
161        let mut archive_height = None;
162        let mut next_block = 0;
163        let mut total_execution_time = 0;
164
165        while let Some(res) = recv.recv().await {
166            let res = res.context("get response")?;
167            let res: QueryResponse = QueryResponse::from(&res);
168            let events: Vec<Event> = res.data.into();
169
170            data.push(events);
171
172            archive_height = res.archive_height;
173            next_block = res.next_block;
174            total_execution_time += res.total_execution_time
175        }
176
177        Ok(EventResponse {
178            archive_height,
179            next_block,
180            total_execution_time,
181            data,
182            rollback_guard: None,
183        })
184    }
185
186    /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
187    /// the provided query and stream configuration.
188    pub async fn collect_arrow(
189        self: Arc<Self>,
190        query: Query,
191        config: StreamConfig,
192    ) -> Result<ArrowResponse> {
193        let mut recv = stream::stream_arrow(self, query, config)
194            .await
195            .context("start stream")?;
196
197        let mut data = ArrowResponseData::default();
198        let mut archive_height = None;
199        let mut next_block = 0;
200        let mut total_execution_time = 0;
201
202        while let Some(res) = recv.recv().await {
203            let res = res.context("get response")?;
204
205            for batch in res.data.blocks {
206                data.blocks.push(batch);
207            }
208            for batch in res.data.transactions {
209                data.transactions.push(batch);
210            }
211            for batch in res.data.logs {
212                data.logs.push(batch);
213            }
214            for batch in res.data.traces {
215                data.traces.push(batch);
216            }
217            for batch in res.data.decoded_logs {
218                data.decoded_logs.push(batch);
219            }
220
221            archive_height = res.archive_height;
222            next_block = res.next_block;
223            total_execution_time += res.total_execution_time
224        }
225
226        Ok(ArrowResponse {
227            archive_height,
228            next_block,
229            total_execution_time,
230            data,
231            rollback_guard: None,
232        })
233    }
234
235    /// Writes parquet file getting data through a stream using the provided path, query,
236    /// and stream configuration.
237    pub async fn collect_parquet(
238        self: Arc<Self>,
239        path: &str,
240        query: Query,
241        config: StreamConfig,
242    ) -> Result<()> {
243        parquet_out::collect_parquet(self, path, query, config).await
244    }
245
246    /// Internal implementation of getting chain_id from server
247    async fn get_chain_id_impl(&self) -> Result<u64> {
248        let mut url = self.url.clone();
249        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
250        segments.push("chain_id");
251        std::mem::drop(segments);
252        let mut req = self.http_client.request(Method::GET, url);
253
254        if let Some(bearer_token) = &self.bearer_token {
255            req = req.bearer_auth(bearer_token);
256        }
257
258        let res = req.send().await.context("execute http req")?;
259
260        let status = res.status();
261        if !status.is_success() {
262            return Err(anyhow!("http response status code {}", status));
263        }
264
265        let chain_id: ChainId = res.json().await.context("read response body json")?;
266
267        Ok(chain_id.chain_id)
268    }
269
270    /// Internal implementation of getting height from server
271    async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
272        let mut url = self.url.clone();
273        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
274        segments.push("height");
275        std::mem::drop(segments);
276        let mut req = self.http_client.request(Method::GET, url);
277
278        if let Some(bearer_token) = &self.bearer_token {
279            req = req.bearer_auth(bearer_token);
280        }
281
282        if let Some(http_timeout_override) = http_timeout_override {
283            req = req.timeout(http_timeout_override);
284        }
285
286        let res = req.send().await.context("execute http req")?;
287
288        let status = res.status();
289        if !status.is_success() {
290            return Err(anyhow!("http response status code {}", status));
291        }
292
293        let height: ArchiveHeight = res.json().await.context("read response body json")?;
294
295        Ok(height.height.unwrap_or(0))
296    }
297
298    /// Get the chain_id from the server with retries.
299    pub async fn get_chain_id(&self) -> Result<u64> {
300        let mut base = self.retry_base_ms;
301
302        let mut err = anyhow!("");
303
304        for _ in 0..self.max_num_retries + 1 {
305            match self.get_chain_id_impl().await {
306                Ok(res) => return Ok(res),
307                Err(e) => {
308                    log::error!(
309                        "failed to get chain_id from server, retrying... The error was: {:?}",
310                        e
311                    );
312                    err = err.context(format!("{:?}", e));
313                }
314            }
315
316            let base_ms = Duration::from_millis(base);
317            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
318                rand::random(),
319                self.retry_backoff_ms,
320            ));
321
322            tokio::time::sleep(base_ms + jitter).await;
323
324            base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
325        }
326
327        Err(err)
328    }
329
330    /// Get the height of from server with retries.
331    pub async fn get_height(&self) -> Result<u64> {
332        let mut base = self.retry_base_ms;
333
334        let mut err = anyhow!("");
335
336        for _ in 0..self.max_num_retries + 1 {
337            match self.get_height_impl(None).await {
338                Ok(res) => return Ok(res),
339                Err(e) => {
340                    log::error!(
341                        "failed to get height from server, retrying... The error was: {:?}",
342                        e
343                    );
344                    err = err.context(format!("{:?}", e));
345                }
346            }
347
348            let base_ms = Duration::from_millis(base);
349            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
350                rand::random(),
351                self.retry_backoff_ms,
352            ));
353
354            tokio::time::sleep(base_ms + jitter).await;
355
356            base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
357        }
358
359        Err(err)
360    }
361
362    /// Get the height of the Client instance for health checks.
363    /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
364    pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
365        self.get_height_impl(http_req_timeout).await
366    }
367
368    /// Executes query with retries and returns the response.
369    pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
370        let arrow_response = self.get_arrow(query).await.context("get data")?;
371        Ok(QueryResponse::from(&arrow_response))
372    }
373
374    /// Add block, transaction and log fields selection to the query, executes it with retries
375    /// and returns the response.
376    pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
377        add_event_join_fields_to_selection(&mut query);
378        let arrow_response = self.get_arrow(&query).await.context("get data")?;
379        Ok(EventResponse::from(&arrow_response))
380    }
381
382    /// Executes query once and returns the result in (Arrow, size) format.
383    async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
384        let mut url = self.url.clone();
385        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
386        segments.push("query");
387        segments.push("arrow-ipc");
388        std::mem::drop(segments);
389        let mut req = self.http_client.request(Method::POST, url);
390
391        if let Some(bearer_token) = &self.bearer_token {
392            req = req.bearer_auth(bearer_token);
393        }
394
395        let res = req.json(&query).send().await.context("execute http req")?;
396
397        let status = res.status();
398        if !status.is_success() {
399            let text = res.text().await.context("read text to see error")?;
400
401            return Err(anyhow!(
402                "http response status code {}, err body: {}",
403                status,
404                text
405            ));
406        }
407
408        let bytes = res.bytes().await.context("read response body bytes")?;
409
410        let res = tokio::task::block_in_place(|| {
411            parse_query_response(&bytes).context("parse query response")
412        })?;
413
414        Ok((res, bytes.len().try_into().unwrap()))
415    }
416
417    /// Executes query with retries and returns the response in Arrow format.
418    pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
419        self.get_arrow_with_size(query).await.map(|res| res.0)
420    }
421
422    /// Internal implementation for get_arrow.
423    async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
424        let mut base = self.retry_base_ms;
425
426        let mut err = anyhow!("");
427
428        for _ in 0..self.max_num_retries + 1 {
429            match self.get_arrow_impl(query).await {
430                Ok(res) => return Ok(res),
431                Err(e) => {
432                    log::error!(
433                        "failed to get arrow data from server, retrying... The error was: {:?}",
434                        e
435                    );
436                    err = err.context(format!("{:?}", e));
437                }
438            }
439
440            let base_ms = Duration::from_millis(base);
441            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
442                rand::random(),
443                self.retry_backoff_ms,
444            ));
445
446            tokio::time::sleep(base_ms + jitter).await;
447
448            base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
449        }
450
451        Err(err)
452    }
453
454    /// Spawns task to execute query and return data via a channel.
455    pub async fn stream(
456        self: Arc<Self>,
457        query: Query,
458        config: StreamConfig,
459    ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
460        check_simple_stream_params(&config)?;
461
462        let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
463            mpsc::channel(config.concurrency.unwrap_or(10));
464
465        let mut inner_rx = self
466            .stream_arrow(query, config)
467            .await
468            .context("start inner stream")?;
469
470        tokio::spawn(async move {
471            while let Some(resp) = inner_rx.recv().await {
472                let is_err = resp.is_err();
473                if tx
474                    .send(resp.map(|r| QueryResponse::from(&r)))
475                    .await
476                    .is_err()
477                    || is_err
478                {
479                    return;
480                }
481            }
482        });
483
484        Ok(rx)
485    }
486
487    /// Add block, transaction and log fields selection to the query and spawns task to execute it,
488    /// returning data via a channel.
489    pub async fn stream_events(
490        self: Arc<Self>,
491        mut query: Query,
492        config: StreamConfig,
493    ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
494        check_simple_stream_params(&config)?;
495
496        add_event_join_fields_to_selection(&mut query);
497
498        let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
499            mpsc::channel(config.concurrency.unwrap_or(10));
500
501        let mut inner_rx = self
502            .stream_arrow(query, config)
503            .await
504            .context("start inner stream")?;
505
506        tokio::spawn(async move {
507            while let Some(resp) = inner_rx.recv().await {
508                let is_err = resp.is_err();
509                if tx
510                    .send(resp.map(|r| EventResponse::from(&r)))
511                    .await
512                    .is_err()
513                    || is_err
514                {
515                    return;
516                }
517            }
518        });
519
520        Ok(rx)
521    }
522
523    /// Spawns task to execute query and return data via a channel in Arrow format.
524    pub async fn stream_arrow(
525        self: Arc<Self>,
526        query: Query,
527        config: StreamConfig,
528    ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
529        stream::stream_arrow(self, query, config).await
530    }
531
532    /// Getter for url field.
533    pub fn url(&self) -> &Url {
534        &self.url
535    }
536}
537
538fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
539    if config.event_signature.is_some() {
540        return Err(anyhow!(
541            "config.event_signature can't be passed to simple type function. User is expected to \
542             decode the logs using Decoder."
543        ));
544    }
545    if config.column_mapping.is_some() {
546        return Err(anyhow!(
547            "config.column_mapping can't be passed to single type function. User is expected to \
548             map values manually."
549        ));
550    }
551
552    Ok(())
553}
554
555fn add_event_join_fields_to_selection(query: &mut Query) {
556    // Field lists for implementing event based API, these fields are used for joining
557    // so they should always be added to the field selection.
558    const BLOCK_JOIN_FIELDS: &[&str] = &["number"];
559    const TX_JOIN_FIELDS: &[&str] = &["hash"];
560    const LOG_JOIN_FIELDS: &[&str] = &["transaction_hash", "block_number"];
561
562    if !query.field_selection.block.is_empty() {
563        for field in BLOCK_JOIN_FIELDS.iter() {
564            query.field_selection.block.insert(field.to_string());
565        }
566    }
567
568    if !query.field_selection.transaction.is_empty() {
569        for field in TX_JOIN_FIELDS.iter() {
570            query.field_selection.transaction.insert(field.to_string());
571        }
572    }
573
574    if !query.field_selection.log.is_empty() {
575        for field in LOG_JOIN_FIELDS.iter() {
576            query.field_selection.log.insert(field.to_string());
577        }
578    }
579}