hyperfuel_client/
lib.rs

1#![deny(missing_docs)]
2//! Hypersync client library for interacting with hypersync server.
3
4use std::{num::NonZeroU64, sync::Arc, time::Duration};
5
6use anyhow::{anyhow, Context, Result};
7use hyperfuel_net_types::{ArchiveHeight, ChainId, Query};
8use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk};
9use reqwest::Method;
10
11mod column_mapping;
12mod config;
13mod from_arrow;
14mod parquet_out;
15mod parse_response;
16mod rayon_async;
17// pub mod simple_types;
18mod stream;
19mod types;
20mod util;
21
22pub use from_arrow::FromArrow;
23pub use hyperfuel_format as format;
24pub use hyperfuel_net_types as net_types;
25pub use hyperfuel_schema as schema;
26
27use parse_response::parse_query_response;
28// use simple_types::Event;
29use tokio::sync::mpsc;
30use url::Url;
31
32pub use column_mapping::{ColumnMapping, DataType};
33pub use config::HexOutput;
34pub use config::{ClientConfig, StreamConfig};
35pub use types::{ArrowBatch, ArrowResponse, ArrowResponseData, QueryResponse};
36
37/// ArrowChunk
38pub type ArrowChunk = Chunk<Box<dyn Array>>;
39
40/// Internal client to handle http requests and retries.
41#[derive(Clone, Debug)]
42pub struct Client {
43    /// Initialized reqwest instance for client url.
44    http_client: reqwest::Client,
45    /// HyperSync server URL.
46    url: Url,
47    /// HyperSync server bearer token.
48    bearer_token: Option<String>,
49    /// Number of retries to attempt before returning error.
50    max_num_retries: usize,
51    /// Milliseconds that would be used for retry backoff increasing.
52    retry_backoff_ms: u64,
53    /// Initial wait time for request backoff.
54    retry_base_ms: u64,
55    /// Ceiling time for request backoff.
56    retry_ceiling_ms: u64,
57}
58
59impl Client {
60    /// Creates a new client with the given configuration.
61    pub fn new(cfg: ClientConfig) -> Result<Self> {
62        let timeout = cfg
63            .http_req_timeout_millis
64            .unwrap_or(NonZeroU64::new(30_000).unwrap());
65
66        let http_client = reqwest::Client::builder()
67            .no_gzip()
68            .timeout(Duration::from_millis(timeout.get()))
69            .build()
70            .unwrap();
71
72        Ok(Self {
73            http_client,
74            url: cfg
75                .url
76                .unwrap_or("https://eth.hypersync.xyz".parse().context("parse url")?),
77            bearer_token: cfg.bearer_token,
78            max_num_retries: cfg.max_num_retries.unwrap_or(12),
79            retry_backoff_ms: cfg.retry_backoff_ms.unwrap_or(500),
80            retry_base_ms: cfg.retry_base_ms.unwrap_or(200),
81            retry_ceiling_ms: cfg.retry_ceiling_ms.unwrap_or(5_000),
82        })
83    }
84
85    /*
86    /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
87    /// query and stream configuration.
88    ///
89    /// ### Implementation
90    /// Runs multiple queries simultaneously based on config.concurrency.
91    ///
92    /// Each query runs until it reaches query.to, server height, any max_num_* query param,
93    /// or execution timed out by server.
94    pub async fn collect(
95        self: Arc<Self>,
96        query: Query,
97        config: StreamConfig,
98    ) -> Result<QueryResponse> {
99        check_simple_stream_params(&config)?;
100
101        let mut recv = stream::stream_arrow(self, query, config)
102            .await
103            .context("start stream")?;
104
105        let mut data = ResponseData::default();
106        let mut archive_height = None;
107        let mut next_block = 0;
108        let mut total_execution_time = 0;
109
110        while let Some(res) = recv.recv().await {
111            let res = res.context("get response")?;
112            let res: QueryResponse = QueryResponse::from(&res);
113
114            for batch in res.data.blocks {
115                data.blocks.push(batch);
116            }
117            for batch in res.data.transactions {
118                data.transactions.push(batch);
119            }
120            for batch in res.data.logs {
121                data.logs.push(batch);
122            }
123            for batch in res.data.traces {
124                data.traces.push(batch);
125            }
126
127            archive_height = res.archive_height;
128            next_block = res.next_block;
129            total_execution_time += res.total_execution_time
130        }
131
132        Ok(QueryResponse {
133            archive_height,
134            next_block,
135            total_execution_time,
136            data,
137            rollback_guard: None,
138        })
139    } */
140
141    /*
142    /// Retrieves events through a stream using the provided query and stream configuration.
143    pub async fn collect_events(
144        self: Arc<Self>,
145        mut query: Query,
146        config: StreamConfig,
147    ) -> Result<EventResponse> {
148        check_simple_stream_params(&config)?;
149
150        add_event_join_fields_to_selection(&mut query);
151
152        let mut recv = stream::stream_arrow(self, query, config)
153            .await
154            .context("start stream")?;
155
156        let mut data = Vec::new();
157        let mut archive_height = None;
158        let mut next_block = 0;
159        let mut total_execution_time = 0;
160
161        while let Some(res) = recv.recv().await {
162            let res = res.context("get response")?;
163            let res: QueryResponse = QueryResponse::from(&res);
164            let events: Vec<Event> = res.data.into();
165
166            data.push(events);
167
168            archive_height = res.archive_height;
169            next_block = res.next_block;
170            total_execution_time += res.total_execution_time
171        }
172
173        Ok(EventResponse {
174            archive_height,
175            next_block,
176            total_execution_time,
177            data,
178            rollback_guard: None,
179        })
180    } */
181
182    /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
183    /// the provided query and stream configuration.
184    pub async fn collect_arrow(
185        self: Arc<Self>,
186        query: Query,
187        config: StreamConfig,
188    ) -> Result<ArrowResponse> {
189        let mut recv = stream::stream_arrow(self, query, config)
190            .await
191            .context("start stream")?;
192
193        let mut data = ArrowResponseData::default();
194        let mut archive_height = None;
195        let mut next_block = 0;
196        let mut total_execution_time = 0;
197
198        while let Some(res) = recv.recv().await {
199            let res = res.context("get response")?;
200
201            for batch in res.data.blocks {
202                data.blocks.push(batch);
203            }
204            for batch in res.data.transactions {
205                data.transactions.push(batch);
206            }
207            for batch in res.data.receipts {
208                data.receipts.push(batch);
209            }
210            for batch in res.data.inputs {
211                data.inputs.push(batch);
212            }
213            for batch in res.data.outputs {
214                data.outputs.push(batch);
215            }
216
217            archive_height = res.archive_height;
218            next_block = res.next_block;
219            total_execution_time += res.total_execution_time
220        }
221
222        Ok(ArrowResponse {
223            archive_height,
224            next_block,
225            total_execution_time,
226            data,
227        })
228    }
229
230    /// Writes parquet file getting data through a stream using the provided path, query,
231    /// and stream configuration.
232    pub async fn collect_parquet(
233        self: Arc<Self>,
234        path: &str,
235        query: Query,
236        config: StreamConfig,
237    ) -> Result<()> {
238        parquet_out::collect_parquet(self, path, query, config).await
239    }
240
241    /// Internal implementation of getting chain_id from server
242    async fn get_chain_id_impl(&self) -> Result<u64> {
243        let mut url = self.url.clone();
244        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
245        segments.push("chain_id");
246        std::mem::drop(segments);
247        let mut req = self.http_client.request(Method::GET, url);
248
249        if let Some(bearer_token) = &self.bearer_token {
250            req = req.bearer_auth(bearer_token);
251        }
252
253        let res = req.send().await.context("execute http req")?;
254
255        let status = res.status();
256        if !status.is_success() {
257            return Err(anyhow!("http response status code {}", status));
258        }
259
260        let chain_id: ChainId = res.json().await.context("read response body json")?;
261
262        Ok(chain_id.chain_id)
263    }
264
265    /// Internal implementation of getting height from server
266    async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
267        let mut url = self.url.clone();
268        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
269        segments.push("height");
270        std::mem::drop(segments);
271        let mut req = self.http_client.request(Method::GET, url);
272
273        if let Some(bearer_token) = &self.bearer_token {
274            req = req.bearer_auth(bearer_token);
275        }
276
277        if let Some(http_timeout_override) = http_timeout_override {
278            req = req.timeout(http_timeout_override);
279        }
280
281        let res = req.send().await.context("execute http req")?;
282
283        let status = res.status();
284        if !status.is_success() {
285            return Err(anyhow!("http response status code {}", status));
286        }
287
288        let height: ArchiveHeight = res.json().await.context("read response body json")?;
289
290        Ok(height.height.unwrap_or(0))
291    }
292
293    /// Get the chain_id from the server with retries.
294    pub async fn get_chain_id(&self) -> Result<u64> {
295        let mut base = self.retry_base_ms;
296
297        let mut err = anyhow!("");
298
299        for _ in 0..self.max_num_retries + 1 {
300            match self.get_chain_id_impl().await {
301                Ok(res) => return Ok(res),
302                Err(e) => {
303                    log::error!(
304                        "failed to get chain_id from server, retrying... The error was: {:?}",
305                        e
306                    );
307                    err = err.context(format!("{:?}", e));
308                }
309            }
310
311            let base_ms = Duration::from_millis(base);
312            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
313                rand::random(),
314                self.retry_backoff_ms,
315            ));
316
317            tokio::time::sleep(base_ms + jitter).await;
318
319            base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
320        }
321
322        Err(err)
323    }
324
325    /// Get the height of from server with retries.
326    pub async fn get_height(&self) -> Result<u64> {
327        let mut base = self.retry_base_ms;
328
329        let mut err = anyhow!("");
330
331        for _ in 0..self.max_num_retries + 1 {
332            match self.get_height_impl(None).await {
333                Ok(res) => return Ok(res),
334                Err(e) => {
335                    log::error!(
336                        "failed to get height from server, retrying... The error was: {:?}",
337                        e
338                    );
339                    err = err.context(format!("{:?}", e));
340                }
341            }
342
343            let base_ms = Duration::from_millis(base);
344            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
345                rand::random(),
346                self.retry_backoff_ms,
347            ));
348
349            tokio::time::sleep(base_ms + jitter).await;
350
351            base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
352        }
353
354        Err(err)
355    }
356
357    /// Get the height of the Client instance for health checks.
358    /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
359    pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
360        self.get_height_impl(http_req_timeout).await
361    }
362
363    /// Executes query with retries and returns the response.
364    pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
365        let arrow_response = self.get_arrow(query).await.context("get data")?;
366        Ok(QueryResponse::from(&arrow_response))
367    }
368    /*
369       /// Add block, transaction and log fields selection to the query, executes it with retries
370       /// and returns the response.
371       pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
372           add_event_join_fields_to_selection(&mut query);
373           let arrow_response = self.get_arrow(&query).await.context("get data")?;
374           Ok(EventResponse::from(&arrow_response))
375       }
376    */
377    /// Executes query once and returns the result in (Arrow, size) format.
378    async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
379        let mut url = self.url.clone();
380        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
381        segments.push("query");
382        segments.push("arrow-ipc");
383        std::mem::drop(segments);
384        let mut req = self.http_client.request(Method::POST, url);
385
386        if let Some(bearer_token) = &self.bearer_token {
387            req = req.bearer_auth(bearer_token);
388        }
389
390        let res = req.json(&query).send().await.context("execute http req")?;
391
392        let status = res.status();
393        if !status.is_success() {
394            let text = res.text().await.context("read text to see error")?;
395
396            return Err(anyhow!(
397                "http response status code {}, err body: {}",
398                status,
399                text
400            ));
401        }
402
403        let bytes = res.bytes().await.context("read response body bytes")?;
404
405        let res = tokio::task::block_in_place(|| {
406            parse_query_response(&bytes).context("parse query response")
407        })?;
408
409        Ok((res, bytes.len().try_into().unwrap()))
410    }
411
412    /// Executes query with retries and returns the response in Arrow format.
413    pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
414        self.get_arrow_with_size(query).await.map(|res| res.0)
415    }
416
417    /// Internal implementation for get_arrow.
418    async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
419        let mut base = self.retry_base_ms;
420
421        let mut err = anyhow!("");
422
423        for _ in 0..self.max_num_retries + 1 {
424            match self.get_arrow_impl(query).await {
425                Ok(res) => return Ok(res),
426                Err(e) => {
427                    log::error!(
428                        "failed to get arrow data from server, retrying... The error was: {:?}",
429                        e
430                    );
431                    err = err.context(format!("{:?}", e));
432                }
433            }
434
435            let base_ms = Duration::from_millis(base);
436            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
437                rand::random(),
438                self.retry_backoff_ms,
439            ));
440
441            tokio::time::sleep(base_ms + jitter).await;
442
443            base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
444        }
445
446        Err(err)
447    }
448
449    // /// Spawns task to execute query and return data via a channel.
450    // pub async fn stream(
451    //     self: Arc<Self>,
452    //     query: Query,
453    //     config: StreamConfig,
454    // ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
455    //     check_simple_stream_params(&config)?;
456
457    //     let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
458    //         mpsc::channel(config.concurrency.unwrap_or(10));
459
460    //     let mut inner_rx = self
461    //         .stream_arrow(query, config)
462    //         .await
463    //         .context("start inner stream")?;
464
465    //     tokio::spawn(async move {
466    //         while let Some(resp) = inner_rx.recv().await {
467    //             let is_err = resp.is_err();
468    //             if tx
469    //                 .send(resp.map(|r| QueryResponse::from(&r)))
470    //                 .await
471    //                 .is_err()
472    //                 || is_err
473    //             {
474    //                 return;
475    //             }
476    //         }
477    //     });
478
479    //     Ok(rx)
480    // }
481
482    /*
483    /// Add block, transaction and log fields selection to the query and spawns task to execute it,
484    /// returning data via a channel.
485    pub async fn stream_events(
486        self: Arc<Self>,
487        mut query: Query,
488        config: StreamConfig,
489    ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
490        check_simple_stream_params(&config)?;
491
492        add_event_join_fields_to_selection(&mut query);
493
494        let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
495            mpsc::channel(config.concurrency.unwrap_or(10));
496
497        let mut inner_rx = self
498            .stream_arrow(query, config)
499            .await
500            .context("start inner stream")?;
501
502        tokio::spawn(async move {
503            while let Some(resp) = inner_rx.recv().await {
504                let is_err = resp.is_err();
505                if tx
506                    .send(resp.map(|r| EventResponse::from(&r)))
507                    .await
508                    .is_err()
509                    || is_err
510                {
511                    return;
512                }
513            }
514        });
515
516        Ok(rx)
517    }
518    */
519
520    /// Spawns task to execute query and return data via a channel in Arrow format.
521    pub async fn stream_arrow(
522        self: Arc<Self>,
523        query: Query,
524        config: StreamConfig,
525    ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
526        stream::stream_arrow(self, query, config).await
527    }
528
529    /// Getter for url field.
530    pub fn url(&self) -> &Url {
531        &self.url
532    }
533}
534
535#[allow(dead_code)]
536fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
537    if config.column_mapping.is_some() {
538        return Err(anyhow!("config.column_mapping can't be passed to single type function. User is expected to map values manually."));
539    }
540
541    Ok(())
542}
543
544#[allow(dead_code)]
545fn add_event_join_fields_to_selection(query: &mut Query) {
546    // Field lists for implementing event based API, these fields are used for joining
547    // so they should always be added to the field selection.
548    const BLOCK_JOIN_FIELDS: &[&str] = &["height"]; // Or is it "block_height"?
549    const TX_JOIN_FIELDS: &[&str] = &["id"];
550    const RECEIPT_JOIN_FIELDS: &[&str] = &["tx_id", "block_height"];
551    const INPUT_JOIN_FIELDS: &[&str] = &["tx_id", "block_height"];
552    const OUTPUT_JOIN_FIELDS: &[&str] = &["tx_id", "block_height"];
553
554    if !query.field_selection.block.is_empty() {
555        for field in BLOCK_JOIN_FIELDS.iter() {
556            query.field_selection.block.insert(field.to_string());
557        }
558    }
559
560    if !query.field_selection.transaction.is_empty() {
561        for field in TX_JOIN_FIELDS.iter() {
562            query.field_selection.transaction.insert(field.to_string());
563        }
564    }
565
566    if !query.field_selection.receipt.is_empty() {
567        for field in RECEIPT_JOIN_FIELDS.iter() {
568            query.field_selection.receipt.insert(field.to_string());
569        }
570    }
571
572    if !query.field_selection.input.is_empty() {
573        for field in INPUT_JOIN_FIELDS.iter() {
574            query.field_selection.input.insert(field.to_string());
575        }
576    }
577
578    if !query.field_selection.output.is_empty() {
579        for field in OUTPUT_JOIN_FIELDS.iter() {
580            query.field_selection.output.insert(field.to_string());
581        }
582    }
583}