hypersync_client/
lib.rs

1#![deny(missing_docs)]
2//! Hypersync client library for interacting with hypersync server.
3use std::{num::NonZeroU64, sync::Arc, time::Duration};
4
5use anyhow::{anyhow, Context, Result};
6use hypersync_net_types::{ArchiveHeight, ChainId, Query};
7use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk};
8use reqwest::Method;
9
10mod column_mapping;
11mod config;
12mod decode;
13mod decode_call;
14mod from_arrow;
15mod parquet_out;
16mod parse_response;
17pub mod preset_query;
18mod rayon_async;
19pub mod simple_types;
20mod stream;
21#[cfg(feature = "ethers")]
22pub mod to_ethers;
23mod types;
24mod util;
25
26pub use from_arrow::FromArrow;
27pub use hypersync_format as format;
28pub use hypersync_net_types as net_types;
29pub use hypersync_schema as schema;
30
31use parse_response::parse_query_response;
32use tokio::sync::mpsc;
33use types::{EventResponse, ResponseData};
34use url::Url;
35
36pub use column_mapping::{ColumnMapping, DataType};
37pub use config::HexOutput;
38pub use config::{ClientConfig, StreamConfig};
39pub use decode::Decoder;
40pub use decode_call::CallDecoder;
41pub use types::{ArrowBatch, ArrowResponse, ArrowResponseData, QueryResponse};
42
43use crate::simple_types::InternalEventJoinStrategy;
44
45type ArrowChunk = Chunk<Box<dyn Array>>;
46
47/// Internal client to handle http requests and retries.
48#[derive(Clone, Debug)]
49pub struct Client {
50    /// Initialized reqwest instance for client url.
51    http_client: reqwest::Client,
52    /// HyperSync server URL.
53    url: Url,
54    /// HyperSync server bearer token.
55    bearer_token: Option<String>,
56    /// Number of retries to attempt before returning error.
57    max_num_retries: usize,
58    /// Milliseconds that would be used for retry backoff increasing.
59    retry_backoff_ms: u64,
60    /// Initial wait time for request backoff.
61    retry_base_ms: u64,
62    /// Ceiling time for request backoff.
63    retry_ceiling_ms: u64,
64}
65
66impl Client {
67    /// Creates a new client with the given configuration.
68    pub fn new(cfg: ClientConfig) -> Result<Self> {
69        let timeout = cfg
70            .http_req_timeout_millis
71            .unwrap_or(NonZeroU64::new(30_000).unwrap());
72
73        let http_client = reqwest::Client::builder()
74            .no_gzip()
75            .timeout(Duration::from_millis(timeout.get()))
76            .build()
77            .unwrap();
78
79        Ok(Self {
80            http_client,
81            url: cfg
82                .url
83                .unwrap_or("https://eth.hypersync.xyz".parse().context("parse url")?),
84            bearer_token: cfg.bearer_token,
85            max_num_retries: cfg.max_num_retries.unwrap_or(12),
86            retry_backoff_ms: cfg.retry_backoff_ms.unwrap_or(500),
87            retry_base_ms: cfg.retry_base_ms.unwrap_or(200),
88            retry_ceiling_ms: cfg.retry_ceiling_ms.unwrap_or(5_000),
89        })
90    }
91
92    /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
93    /// query and stream configuration.
94    ///
95    /// ### Implementation
96    /// Runs multiple queries simultaneously based on config.concurrency.
97    ///
98    /// Each query runs until it reaches query.to, server height, any max_num_* query param,
99    /// or execution timed out by server.
100    pub async fn collect(
101        self: Arc<Self>,
102        query: Query,
103        config: StreamConfig,
104    ) -> Result<QueryResponse> {
105        check_simple_stream_params(&config)?;
106
107        let mut recv = stream::stream_arrow(self, query, config)
108            .await
109            .context("start stream")?;
110
111        let mut data = ResponseData::default();
112        let mut archive_height = None;
113        let mut next_block = 0;
114        let mut total_execution_time = 0;
115
116        while let Some(res) = recv.recv().await {
117            let res = res.context("get response")?;
118            let res: QueryResponse = QueryResponse::from(&res);
119
120            for batch in res.data.blocks {
121                data.blocks.push(batch);
122            }
123            for batch in res.data.transactions {
124                data.transactions.push(batch);
125            }
126            for batch in res.data.logs {
127                data.logs.push(batch);
128            }
129            for batch in res.data.traces {
130                data.traces.push(batch);
131            }
132
133            archive_height = res.archive_height;
134            next_block = res.next_block;
135            total_execution_time += res.total_execution_time
136        }
137
138        Ok(QueryResponse {
139            archive_height,
140            next_block,
141            total_execution_time,
142            data,
143            rollback_guard: None,
144        })
145    }
146
147    /// Retrieves events through a stream using the provided query and stream configuration.
148    pub async fn collect_events(
149        self: Arc<Self>,
150        mut query: Query,
151        config: StreamConfig,
152    ) -> Result<EventResponse> {
153        check_simple_stream_params(&config)?;
154
155        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
156        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
157
158        let mut recv = stream::stream_arrow(self, query, config)
159            .await
160            .context("start stream")?;
161
162        let mut data = Vec::new();
163        let mut archive_height = None;
164        let mut next_block = 0;
165        let mut total_execution_time = 0;
166
167        while let Some(res) = recv.recv().await {
168            let res = res.context("get response")?;
169            let res: QueryResponse = QueryResponse::from(&res);
170            let events = event_join_strategy.join_from_response_data(res.data);
171
172            data.extend(events);
173
174            archive_height = res.archive_height;
175            next_block = res.next_block;
176            total_execution_time += res.total_execution_time
177        }
178
179        Ok(EventResponse {
180            archive_height,
181            next_block,
182            total_execution_time,
183            data,
184            rollback_guard: None,
185        })
186    }
187
188    /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
189    /// the provided query and stream configuration.
190    pub async fn collect_arrow(
191        self: Arc<Self>,
192        query: Query,
193        config: StreamConfig,
194    ) -> Result<ArrowResponse> {
195        let mut recv = stream::stream_arrow(self, query, config)
196            .await
197            .context("start stream")?;
198
199        let mut data = ArrowResponseData::default();
200        let mut archive_height = None;
201        let mut next_block = 0;
202        let mut total_execution_time = 0;
203
204        while let Some(res) = recv.recv().await {
205            let res = res.context("get response")?;
206
207            for batch in res.data.blocks {
208                data.blocks.push(batch);
209            }
210            for batch in res.data.transactions {
211                data.transactions.push(batch);
212            }
213            for batch in res.data.logs {
214                data.logs.push(batch);
215            }
216            for batch in res.data.traces {
217                data.traces.push(batch);
218            }
219            for batch in res.data.decoded_logs {
220                data.decoded_logs.push(batch);
221            }
222
223            archive_height = res.archive_height;
224            next_block = res.next_block;
225            total_execution_time += res.total_execution_time
226        }
227
228        Ok(ArrowResponse {
229            archive_height,
230            next_block,
231            total_execution_time,
232            data,
233            rollback_guard: None,
234        })
235    }
236
237    /// Writes parquet file getting data through a stream using the provided path, query,
238    /// and stream configuration.
239    pub async fn collect_parquet(
240        self: Arc<Self>,
241        path: &str,
242        query: Query,
243        config: StreamConfig,
244    ) -> Result<()> {
245        parquet_out::collect_parquet(self, path, query, config).await
246    }
247
248    /// Internal implementation of getting chain_id from server
249    async fn get_chain_id_impl(&self) -> Result<u64> {
250        let mut url = self.url.clone();
251        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
252        segments.push("chain_id");
253        std::mem::drop(segments);
254        let mut req = self.http_client.request(Method::GET, url);
255
256        if let Some(bearer_token) = &self.bearer_token {
257            req = req.bearer_auth(bearer_token);
258        }
259
260        let res = req.send().await.context("execute http req")?;
261
262        let status = res.status();
263        if !status.is_success() {
264            return Err(anyhow!("http response status code {}", status));
265        }
266
267        let chain_id: ChainId = res.json().await.context("read response body json")?;
268
269        Ok(chain_id.chain_id)
270    }
271
272    /// Internal implementation of getting height from server
273    async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
274        let mut url = self.url.clone();
275        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
276        segments.push("height");
277        std::mem::drop(segments);
278        let mut req = self.http_client.request(Method::GET, url);
279
280        if let Some(bearer_token) = &self.bearer_token {
281            req = req.bearer_auth(bearer_token);
282        }
283
284        if let Some(http_timeout_override) = http_timeout_override {
285            req = req.timeout(http_timeout_override);
286        }
287
288        let res = req.send().await.context("execute http req")?;
289
290        let status = res.status();
291        if !status.is_success() {
292            return Err(anyhow!("http response status code {}", status));
293        }
294
295        let height: ArchiveHeight = res.json().await.context("read response body json")?;
296
297        Ok(height.height.unwrap_or(0))
298    }
299
300    /// Get the chain_id from the server with retries.
301    pub async fn get_chain_id(&self) -> Result<u64> {
302        let mut base = self.retry_base_ms;
303
304        let mut err = anyhow!("");
305
306        for _ in 0..self.max_num_retries + 1 {
307            match self.get_chain_id_impl().await {
308                Ok(res) => return Ok(res),
309                Err(e) => {
310                    log::error!(
311                        "failed to get chain_id from server, retrying... The error was: {e:?}"
312                    );
313                    err = err.context(format!("{e:?}"));
314                }
315            }
316
317            let base_ms = Duration::from_millis(base);
318            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
319                rand::random(),
320                self.retry_backoff_ms,
321            ));
322
323            tokio::time::sleep(base_ms + jitter).await;
324
325            base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
326        }
327
328        Err(err)
329    }
330
331    /// Get the height of from server with retries.
332    pub async fn get_height(&self) -> Result<u64> {
333        let mut base = self.retry_base_ms;
334
335        let mut err = anyhow!("");
336
337        for _ in 0..self.max_num_retries + 1 {
338            match self.get_height_impl(None).await {
339                Ok(res) => return Ok(res),
340                Err(e) => {
341                    log::error!(
342                        "failed to get height from server, retrying... The error was: {e:?}"
343                    );
344                    err = err.context(format!("{e:?}"));
345                }
346            }
347
348            let base_ms = Duration::from_millis(base);
349            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
350                rand::random(),
351                self.retry_backoff_ms,
352            ));
353
354            tokio::time::sleep(base_ms + jitter).await;
355
356            base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
357        }
358
359        Err(err)
360    }
361
362    /// Get the height of the Client instance for health checks.
363    /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
364    pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
365        self.get_height_impl(http_req_timeout).await
366    }
367
368    /// Executes query with retries and returns the response.
369    pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
370        let arrow_response = self.get_arrow(query).await.context("get data")?;
371        Ok(QueryResponse::from(&arrow_response))
372    }
373
374    /// Add block, transaction and log fields selection to the query, executes it with retries
375    /// and returns the response.
376    pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
377        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
378        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
379        let arrow_response = self.get_arrow(&query).await.context("get data")?;
380        Ok(EventResponse::from_arrow_response(
381            &arrow_response,
382            &event_join_strategy,
383        ))
384    }
385
386    /// Executes query once and returns the result in (Arrow, size) format.
387    async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
388        let mut url = self.url.clone();
389        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
390        segments.push("query");
391        segments.push("arrow-ipc");
392        std::mem::drop(segments);
393        let mut req = self.http_client.request(Method::POST, url);
394
395        if let Some(bearer_token) = &self.bearer_token {
396            req = req.bearer_auth(bearer_token);
397        }
398
399        let res = req.json(&query).send().await.context("execute http req")?;
400
401        let status = res.status();
402        if !status.is_success() {
403            let text = res.text().await.context("read text to see error")?;
404
405            return Err(anyhow!(
406                "http response status code {}, err body: {}",
407                status,
408                text
409            ));
410        }
411
412        let bytes = res.bytes().await.context("read response body bytes")?;
413
414        let res = tokio::task::block_in_place(|| {
415            parse_query_response(&bytes).context("parse query response")
416        })?;
417
418        Ok((res, bytes.len().try_into().unwrap()))
419    }
420
421    /// Executes query with retries and returns the response in Arrow format.
422    pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
423        self.get_arrow_with_size(query).await.map(|res| res.0)
424    }
425
426    /// Internal implementation for get_arrow.
427    async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
428        let mut base = self.retry_base_ms;
429
430        let mut err = anyhow!("");
431
432        for _ in 0..self.max_num_retries + 1 {
433            match self.get_arrow_impl(query).await {
434                Ok(res) => return Ok(res),
435                Err(e) => {
436                    log::error!(
437                        "failed to get arrow data from server, retrying... The error was: {e:?}"
438                    );
439                    err = err.context(format!("{e:?}"));
440                }
441            }
442
443            let base_ms = Duration::from_millis(base);
444            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
445                rand::random(),
446                self.retry_backoff_ms,
447            ));
448
449            tokio::time::sleep(base_ms + jitter).await;
450
451            base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
452        }
453
454        Err(err)
455    }
456
457    /// Spawns task to execute query and return data via a channel.
458    pub async fn stream(
459        self: Arc<Self>,
460        query: Query,
461        config: StreamConfig,
462    ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
463        check_simple_stream_params(&config)?;
464
465        let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
466            mpsc::channel(config.concurrency.unwrap_or(10));
467
468        let mut inner_rx = self
469            .stream_arrow(query, config)
470            .await
471            .context("start inner stream")?;
472
473        tokio::spawn(async move {
474            while let Some(resp) = inner_rx.recv().await {
475                let is_err = resp.is_err();
476                if tx
477                    .send(resp.map(|r| QueryResponse::from(&r)))
478                    .await
479                    .is_err()
480                    || is_err
481                {
482                    return;
483                }
484            }
485        });
486
487        Ok(rx)
488    }
489
490    /// Add block, transaction and log fields selection to the query and spawns task to execute it,
491    /// returning data via a channel.
492    pub async fn stream_events(
493        self: Arc<Self>,
494        mut query: Query,
495        config: StreamConfig,
496    ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
497        check_simple_stream_params(&config)?;
498
499        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
500
501        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
502
503        let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
504            mpsc::channel(config.concurrency.unwrap_or(10));
505
506        let mut inner_rx = self
507            .stream_arrow(query, config)
508            .await
509            .context("start inner stream")?;
510
511        tokio::spawn(async move {
512            while let Some(resp) = inner_rx.recv().await {
513                let is_err = resp.is_err();
514                if tx
515                    .send(
516                        resp.map(|r| EventResponse::from_arrow_response(&r, &event_join_strategy)),
517                    )
518                    .await
519                    .is_err()
520                    || is_err
521                {
522                    return;
523                }
524            }
525        });
526
527        Ok(rx)
528    }
529
530    /// Spawns task to execute query and return data via a channel in Arrow format.
531    pub async fn stream_arrow(
532        self: Arc<Self>,
533        query: Query,
534        config: StreamConfig,
535    ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
536        stream::stream_arrow(self, query, config).await
537    }
538
539    /// Getter for url field.
540    pub fn url(&self) -> &Url {
541        &self.url
542    }
543}
544
545fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
546    if config.event_signature.is_some() {
547        return Err(anyhow!(
548            "config.event_signature can't be passed to simple type function. User is expected to \
549             decode the logs using Decoder."
550        ));
551    }
552    if config.column_mapping.is_some() {
553        return Err(anyhow!(
554            "config.column_mapping can't be passed to single type function. User is expected to \
555             map values manually."
556        ));
557    }
558
559    Ok(())
560}