Skip to main content

hypersync_client/
lib.rs

1#![deny(missing_docs)]
2// README.md is a symlink (ln -s ../README.md README.md) to the root workspace
3// README. This is needed so that both `cargo doc` and `cargo package` work:
4// - `cargo doc`: include_str! resolves the symlink to the root README.
5// - `cargo package`: cargo copies the symlink target into the tarball, so
6//   include_str! finds README.md at the package root during verification.
7#![doc = include_str!("../README.md")]
8use std::time::Instant;
9use std::{sync::Arc, time::Duration};
10
11use anyhow::{anyhow, Context, Result};
12use futures::StreamExt;
13use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
14use reqwest::{Method, StatusCode};
15use reqwest_eventsource::retry::ExponentialBackoff;
16use reqwest_eventsource::{Event, EventSource};
17
18pub mod arrow_reader;
19mod column_mapping;
20mod config;
21mod decode;
22mod decode_call;
23mod from_arrow;
24pub mod metrics;
25mod parquet_out;
26mod parse_response;
27pub mod preset_query;
28mod rate_limit;
29mod rayon_async;
30pub mod simple_types;
31mod stream;
32mod types;
33mod util;
34
35pub use hypersync_format as format;
36pub use hypersync_net_types as net_types;
37pub use hypersync_schema as schema;
38
39use parse_response::parse_query_response;
40use tokio::sync::mpsc;
41use types::ResponseData;
42use url::Url;
43
44pub use column_mapping::{ColumnMapping, DataType};
45pub use config::HexOutput;
46pub use config::{ClientConfig, SerializationFormat, StreamConfig};
47pub use decode::Decoder;
48pub use decode_call::CallDecoder;
49pub use metrics::{
50    RequestKind, RequestStats, StreamMetrics, StreamObserver, StreamSummary, NUM_SIZE_BUCKETS,
51    SIZE_BUCKET_LABELS,
52};
53pub use rate_limit::RateLimitInfo;
54pub use types::{
55    ArrowResponse, ArrowResponseData, EventResponse, QueryResponse, RateLimitResponse,
56};
57
58use crate::parse_response::read_query_response;
59use crate::simple_types::InternalEventJoinStrategy;
60
61#[derive(Debug)]
62struct HttpClientWrapper {
63    /// Mutable state that needs to be refreshed periodically
64    inner: std::sync::Mutex<HttpClientWrapperInner>,
65    /// HyperSync server api token.
66    api_token: String,
67    /// Standard timeout for http requests.
68    timeout: Duration,
69    /// Pools are never idle since polling often happens on the client
70    /// so connections never timeout and dns lookups never happen again
71    /// this is problematic if the underlying ip address changes during
72    /// failovers on hypersync. Since reqwest doesn't implement this we
73    /// simply recreate the client every time
74    max_connection_age: Duration,
75    /// For refreshing the client
76    user_agent: String,
77}
78
79#[derive(Debug)]
80struct HttpClientWrapperInner {
81    /// Initialized reqwest instance for client url.
82    client: reqwest::Client,
83    /// Timestamp when the client was created
84    created_at: Instant,
85}
86
87impl HttpClientWrapper {
88    fn new(user_agent: String, api_token: String, timeout: Duration) -> Self {
89        let client = reqwest::Client::builder()
90            .no_gzip()
91            .user_agent(&user_agent)
92            .build()
93            .unwrap();
94
95        Self {
96            inner: std::sync::Mutex::new(HttpClientWrapperInner {
97                client,
98                created_at: Instant::now(),
99            }),
100            api_token,
101            timeout,
102            max_connection_age: Duration::from_secs(60),
103            user_agent,
104        }
105    }
106
107    fn get_client(&self) -> reqwest::Client {
108        let mut inner = self.inner.lock().expect("HttpClientWrapper mutex poisoned");
109
110        // Check if client needs refresh due to age
111        if inner.created_at.elapsed() > self.max_connection_age {
112            // Recreate client to force new DNS lookup for failover scenarios
113            inner.client = reqwest::Client::builder()
114                .no_gzip()
115                .user_agent(&self.user_agent)
116                .build()
117                .unwrap();
118            inner.created_at = Instant::now();
119        }
120
121        // Clone is cheap for reqwest::Client (it's Arc-wrapped internally)
122        inner.client.clone()
123    }
124
125    fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
126        let client = self.get_client();
127        client
128            .request(method, url)
129            .timeout(self.timeout)
130            .bearer_auth(&self.api_token)
131    }
132
133    fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
134        let client = self.get_client();
135        client.request(method, url).bearer_auth(&self.api_token)
136    }
137}
138
139/// Internal client state to handle http requests and retries.
140#[derive(Debug)]
141struct ClientInner {
142    http_client: HttpClientWrapper,
143    /// HyperSync server URL.
144    url: Url,
145    /// Number of retries to attempt before returning error.
146    max_num_retries: usize,
147    /// Milliseconds that would be used for retry backoff increasing.
148    retry_backoff_ms: u64,
149    /// Initial wait time for request backoff.
150    retry_base_ms: u64,
151    /// Ceiling time for request backoff.
152    retry_ceiling_ms: u64,
153    /// Query serialization format to use for HTTP requests.
154    serialization_format: SerializationFormat,
155    /// Most recently observed rate limit info from the server, paired with the
156    /// instant it was captured so elapsed time can be subtracted from `reset_secs`.
157    rate_limit_state: std::sync::Mutex<Option<(RateLimitInfo, Instant)>>,
158    /// Whether to proactively sleep when the rate limit is exhausted.
159    proactive_rate_limit_sleep: bool,
160}
161
162/// Client to handle http requests and retries.
163#[derive(Clone, Debug)]
164pub struct Client {
165    inner: Arc<ClientInner>,
166}
167
168impl Client {
169    /// Creates a new client with the given configuration.
170    ///
171    /// Configuration must include the `url` and `api_token` fields.
172    /// # Example
173    /// ```
174    /// use hypersync_client::{Client, ClientConfig};
175    ///
176    /// let config = ClientConfig {
177    ///     url: "https://eth.hypersync.xyz".to_string(),
178    ///     api_token: std::env::var("ENVIO_API_TOKEN")?,
179    ///     ..Default::default()
180    /// };
181    /// let client = Client::new(config)?;
182    /// # Ok::<(), anyhow::Error>(())
183    /// ```
184    ///
185    /// # Errors
186    /// This method fails if the config is invalid.
187    pub fn new(cfg: ClientConfig) -> Result<Self> {
188        // hscr stands for hypersync client rust
189        cfg.validate().context("invalid ClientConfig")?;
190        let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
191        Self::new_internal(cfg, user_agent)
192    }
193
194    /// Creates a new client builder.
195    ///
196    /// # Example
197    /// ```
198    /// use hypersync_client::Client;
199    ///
200    /// let client = Client::builder()
201    ///     .chain_id(1)
202    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
203    ///     .build()
204    ///     .unwrap();
205    /// ```
206    pub fn builder() -> ClientBuilder {
207        ClientBuilder::new()
208    }
209
210    #[doc(hidden)]
211    pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
212        // Creates a new client with the given configuration and custom user agent.
213        // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
214        Self::new_internal(cfg, user_agent.into())
215    }
216
217    /// Internal constructor that takes both config and user agent.
218    fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
219        let http_client = HttpClientWrapper::new(
220            user_agent,
221            cfg.api_token,
222            Duration::from_millis(cfg.http_req_timeout_millis),
223        );
224
225        let url = Url::parse(&cfg.url).context("url is malformed")?;
226
227        Ok(Self {
228            inner: Arc::new(ClientInner {
229                http_client,
230                url,
231                max_num_retries: cfg.max_num_retries,
232                retry_backoff_ms: cfg.retry_backoff_ms,
233                retry_base_ms: cfg.retry_base_ms,
234                retry_ceiling_ms: cfg.retry_ceiling_ms,
235                serialization_format: cfg.serialization_format,
236                rate_limit_state: std::sync::Mutex::new(None),
237                proactive_rate_limit_sleep: cfg.proactive_rate_limit_sleep,
238            }),
239        })
240    }
241
242    /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
243    /// query and stream configuration.
244    ///
245    /// ### Implementation
246    /// Runs multiple queries simultaneously based on config.concurrency.
247    ///
248    /// Each query runs until it reaches query.to, server height, any max_num_* query param,
249    /// or execution timed out by server.
250    ///
251    /// # ⚠️ Important Warning
252    ///
253    /// This method will continue executing until the query has run to completion from beginning
254    /// to the end of the block range defined in the query. For heavy queries with large block
255    /// ranges or high data volumes, consider:
256    ///
257    /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
258    /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
259    /// - Break large queries into smaller block ranges
260    ///
261    /// # Example
262    /// ```no_run
263    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
264    ///
265    /// # async fn example() -> anyhow::Result<()> {
266    /// let client = Client::builder()
267    ///     .chain_id(1)
268    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
269    ///     .build()?;
270    ///
271    /// // Query ERC20 transfer events
272    /// let query = Query::new()
273    ///     .from_block(19000000)
274    ///     .to_block_excl(19000010)
275    ///     .where_logs(
276    ///         LogFilter::all()
277    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
278    ///     )
279    ///     .select_log_fields([LogField::Address, LogField::Data]);
280    /// let response = client.collect(query, StreamConfig::default()).await?;
281    ///
282    /// println!("Collected {} events", response.data.logs.len());
283    /// # Ok(())
284    /// # }
285    /// ```
286    pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
287        check_simple_stream_params(&config)?;
288
289        let mut recv = stream::stream_arrow(self, query, config)
290            .await
291            .context("start stream")?;
292
293        let mut data = ResponseData::default();
294        let mut archive_height = None;
295        let mut next_block = 0;
296        let mut total_execution_time = 0;
297
298        while let Some(res) = recv.recv().await {
299            let res = res.context("get response")?;
300            let res: QueryResponse =
301                QueryResponse::try_from(&res).context("convert arrow response")?;
302
303            for batch in res.data.blocks {
304                data.blocks.push(batch);
305            }
306            for batch in res.data.transactions {
307                data.transactions.push(batch);
308            }
309            for batch in res.data.logs {
310                data.logs.push(batch);
311            }
312            for batch in res.data.traces {
313                data.traces.push(batch);
314            }
315
316            archive_height = res.archive_height;
317            next_block = res.next_block;
318            total_execution_time += res.total_execution_time
319        }
320
321        Ok(QueryResponse {
322            archive_height,
323            next_block,
324            total_execution_time,
325            data,
326            rollback_guard: None,
327        })
328    }
329
330    /// Retrieves events through a stream using the provided query and stream configuration.
331    ///
332    /// # ⚠️ Important Warning
333    ///
334    /// This method will continue executing until the query has run to completion from beginning
335    /// to the end of the block range defined in the query. For heavy queries with large block
336    /// ranges or high data volumes, consider:
337    ///
338    /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
339    /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
340    /// - Break large queries into smaller block ranges
341    ///
342    /// # Example
343    /// ```no_run
344    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
345    ///
346    /// # async fn example() -> anyhow::Result<()> {
347    /// let client = Client::builder()
348    ///     .chain_id(1)
349    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
350    ///     .build()?;
351    ///
352    /// // Query transactions to a specific address
353    /// let query = Query::new()
354    ///     .from_block(19000000)
355    ///     .to_block_excl(19000100)
356    ///     .where_transactions(
357    ///         TransactionFilter::all()
358    ///             .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
359    ///     )
360    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
361    /// let response = client.collect_events(query, StreamConfig::default()).await?;
362    ///
363    /// println!("Collected {} events", response.data.len());
364    /// # Ok(())
365    /// # }
366    /// ```
367    pub async fn collect_events(
368        &self,
369        mut query: Query,
370        config: StreamConfig,
371    ) -> Result<EventResponse> {
372        check_simple_stream_params(&config)?;
373
374        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
375        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
376
377        let mut recv = stream::stream_arrow(self, query, config)
378            .await
379            .context("start stream")?;
380
381        let mut data = Vec::new();
382        let mut archive_height = None;
383        let mut next_block = 0;
384        let mut total_execution_time = 0;
385
386        while let Some(res) = recv.recv().await {
387            let res = res.context("get response")?;
388            let res: QueryResponse =
389                QueryResponse::try_from(&res).context("convert arrow response")?;
390            let events = event_join_strategy.join_from_response_data(res.data);
391
392            data.extend(events);
393
394            archive_height = res.archive_height;
395            next_block = res.next_block;
396            total_execution_time += res.total_execution_time
397        }
398
399        Ok(EventResponse {
400            archive_height,
401            next_block,
402            total_execution_time,
403            data,
404            rollback_guard: None,
405        })
406    }
407
408    /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
409    /// the provided query and stream configuration.
410    ///
411    /// Returns data in Apache Arrow format for high-performance columnar processing.
412    /// Useful for analytics workloads or when working with Arrow-compatible tools.
413    ///
414    /// # ⚠️ Important Warning
415    ///
416    /// This method will continue executing until the query has run to completion from beginning
417    /// to the end of the block range defined in the query. For heavy queries with large block
418    /// ranges or high data volumes, consider:
419    ///
420    /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
421    /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
422    /// - Break large queries into smaller block ranges
423    ///
424    /// # Example
425    /// ```no_run
426    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
427    ///
428    /// # async fn example() -> anyhow::Result<()> {
429    /// let client = Client::builder()
430    ///     .chain_id(1)
431    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
432    ///     .build()?;
433    ///
434    /// // Get block data in Arrow format for analytics
435    /// let query = Query::new()
436    ///     .from_block(19000000)
437    ///     .to_block_excl(19000100)
438    ///     .include_all_blocks()
439    ///     .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
440    /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
441    ///
442    /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
443    /// # Ok(())
444    /// # }
445    /// ```
446    pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
447        let mut recv = stream::stream_arrow(self, query, config)
448            .await
449            .context("start stream")?;
450
451        let mut data = ArrowResponseData::default();
452        let mut archive_height = None;
453        let mut next_block = 0;
454        let mut total_execution_time = 0;
455
456        while let Some(res) = recv.recv().await {
457            let res = res.context("get response")?;
458
459            for batch in res.data.blocks {
460                data.blocks.push(batch);
461            }
462            for batch in res.data.transactions {
463                data.transactions.push(batch);
464            }
465            for batch in res.data.logs {
466                data.logs.push(batch);
467            }
468            for batch in res.data.traces {
469                data.traces.push(batch);
470            }
471            for batch in res.data.decoded_logs {
472                data.decoded_logs.push(batch);
473            }
474
475            archive_height = res.archive_height;
476            next_block = res.next_block;
477            total_execution_time += res.total_execution_time
478        }
479
480        Ok(ArrowResponse {
481            archive_height,
482            next_block,
483            total_execution_time,
484            data,
485            rollback_guard: None,
486        })
487    }
488
489    /// Writes parquet file getting data through a stream using the provided path, query,
490    /// and stream configuration.
491    ///
492    /// Streams data directly to a Parquet file for efficient storage and later analysis.
493    /// Perfect for data exports or ETL pipelines.
494    ///
495    /// # ⚠️ Important Warning
496    ///
497    /// This method will continue executing until the query has run to completion from beginning
498    /// to the end of the block range defined in the query. For heavy queries with large block
499    /// ranges or high data volumes, consider:
500    ///
501    /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
502    /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
503    /// - Break large queries into smaller block ranges
504    ///
505    /// # Example
506    /// ```no_run
507    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
508    ///
509    /// # async fn example() -> anyhow::Result<()> {
510    /// let client = Client::builder()
511    ///     .chain_id(1)
512    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
513    ///     .build()?;
514    ///
515    /// // Export all DEX trades to Parquet for analysis
516    /// let query = Query::new()
517    ///     .from_block(19000000)
518    ///     .to_block_excl(19010000)
519    ///     .where_logs(
520    ///         LogFilter::all()
521    ///             .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
522    ///     )
523    ///     .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
524    /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
525    ///
526    /// println!("Trade data exported to trades.parquet");
527    /// # Ok(())
528    /// # }
529    /// ```
530    pub async fn collect_parquet(
531        &self,
532        path: &str,
533        query: Query,
534        config: StreamConfig,
535    ) -> Result<()> {
536        parquet_out::collect_parquet(self, path, query, config).await
537    }
538
539    /// Internal implementation of getting chain_id from server
540    async fn get_chain_id_impl(&self) -> Result<u64> {
541        let mut url = self.inner.url.clone();
542        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
543        segments.push("chain_id");
544        std::mem::drop(segments);
545        let req = self.inner.http_client.request(Method::GET, url);
546
547        let res = req.send().await.context("execute http req")?;
548
549        let status = res.status();
550        if !status.is_success() {
551            return Err(anyhow!("http response status code {}", status));
552        }
553
554        let chain_id: ChainId = res.json().await.context("read response body json")?;
555
556        Ok(chain_id.chain_id)
557    }
558
559    /// Internal implementation of getting height from server
560    async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
561        let mut url = self.inner.url.clone();
562        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
563        segments.push("height");
564        std::mem::drop(segments);
565        let mut req = self.inner.http_client.request(Method::GET, url);
566        if let Some(http_timeout_override) = http_timeout_override {
567            req = req.timeout(http_timeout_override);
568        }
569
570        let res = req.send().await.context("execute http req")?;
571
572        let status = res.status();
573        if !status.is_success() {
574            return Err(anyhow!("http response status code {}", status));
575        }
576
577        let height: ArchiveHeight = res.json().await.context("read response body json")?;
578
579        Ok(height.height.unwrap_or(0))
580    }
581
582    /// Get the chain_id from the server with retries.
583    ///
584    /// # Example
585    /// ```no_run
586    /// use hypersync_client::Client;
587    ///
588    /// # async fn example() -> anyhow::Result<()> {
589    /// let client = Client::builder()
590    ///     .chain_id(1)
591    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
592    ///     .build()?;
593    ///
594    /// let chain_id = client.get_chain_id().await?;
595    /// println!("Connected to chain ID: {}", chain_id);
596    /// # Ok(())
597    /// # }
598    /// ```
599    pub async fn get_chain_id(&self) -> Result<u64> {
600        let mut base = self.inner.retry_base_ms;
601
602        let mut err = anyhow!("");
603
604        for _ in 0..self.inner.max_num_retries + 1 {
605            match self.get_chain_id_impl().await {
606                Ok(res) => return Ok(res),
607                Err(e) => {
608                    log::error!(
609                        "failed to get chain_id from server, retrying... The error was: {e:?}"
610                    );
611                    err = err.context(format!("{e:?}"));
612                }
613            }
614
615            let base_ms = Duration::from_millis(base);
616            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
617                rand::random(),
618                self.inner.retry_backoff_ms,
619            ));
620
621            tokio::time::sleep(base_ms + jitter).await;
622
623            base = std::cmp::min(
624                base + self.inner.retry_backoff_ms,
625                self.inner.retry_ceiling_ms,
626            );
627        }
628
629        Err(err)
630    }
631
632    /// Get the height of from server with retries.
633    ///
634    /// # Example
635    /// ```no_run
636    /// use hypersync_client::Client;
637    ///
638    /// # async fn example() -> anyhow::Result<()> {
639    /// let client = Client::builder()
640    ///     .chain_id(1)
641    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
642    ///     .build()?;
643    ///
644    /// let height = client.get_height().await?;
645    /// println!("Current block height: {}", height);
646    /// # Ok(())
647    /// # }
648    /// ```
649    pub async fn get_height(&self) -> Result<u64> {
650        let mut base = self.inner.retry_base_ms;
651
652        let mut err = anyhow!("");
653
654        for _ in 0..self.inner.max_num_retries + 1 {
655            match self.get_height_impl(None).await {
656                Ok(res) => return Ok(res),
657                Err(e) => {
658                    log::error!(
659                        "failed to get height from server, retrying... The error was: {e:?}"
660                    );
661                    err = err.context(format!("{e:?}"));
662                }
663            }
664
665            let base_ms = Duration::from_millis(base);
666            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
667                rand::random(),
668                self.inner.retry_backoff_ms,
669            ));
670
671            tokio::time::sleep(base_ms + jitter).await;
672
673            base = std::cmp::min(
674                base + self.inner.retry_backoff_ms,
675                self.inner.retry_ceiling_ms,
676            );
677        }
678
679        Err(err)
680    }
681
682    /// Get the height of the Client instance for health checks.
683    ///
684    /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
685    ///
686    /// # Example
687    /// ```no_run
688    /// use hypersync_client::Client;
689    /// use std::time::Duration;
690    ///
691    /// # async fn example() -> anyhow::Result<()> {
692    /// let client = Client::builder()
693    ///     .chain_id(1)
694    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
695    ///     .build()?;
696    ///
697    /// // Quick health check with 5 second timeout
698    /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
699    /// println!("Server is healthy at block: {}", height);
700    /// # Ok(())
701    /// # }
702    /// ```
703    pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
704        self.get_height_impl(http_req_timeout).await
705    }
706
707    /// Executes query with retries and returns the response.
708    ///
709    /// # Example
710    /// ```no_run
711    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
712    ///
713    /// # async fn example() -> anyhow::Result<()> {
714    /// let client = Client::builder()
715    ///     .chain_id(1)
716    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
717    ///     .build()?;
718    ///
719    /// // Query all blocks from a specific range
720    /// let query = Query::new()
721    ///     .from_block(19000000)
722    ///     .to_block_excl(19000010)
723    ///     .include_all_blocks()
724    ///     .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
725    /// let response = client.get(&query).await?;
726    ///
727    /// println!("Retrieved {} blocks", response.data.blocks.len());
728    /// # Ok(())
729    /// # }
730    /// ```
731    pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
732        let arrow_response = self.get_arrow(query).await.context("get data")?;
733        let converted =
734            QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
735        Ok(converted)
736    }
737
738    /// Add block, transaction and log fields selection to the query, executes it with retries
739    /// and returns the response.
740    ///
741    /// This method automatically joins blocks, transactions, and logs into unified events,
742    /// making it easier to work with related blockchain data.
743    ///
744    /// # Example
745    /// ```no_run
746    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
747    ///
748    /// # async fn example() -> anyhow::Result<()> {
749    /// let client = Client::builder()
750    ///     .chain_id(1)
751    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
752    ///     .build()?;
753    ///
754    /// // Query ERC20 transfers with transaction context
755    /// let query = Query::new()
756    ///     .from_block(19000000)
757    ///     .to_block_excl(19000010)
758    ///     .where_logs(
759    ///         LogFilter::all()
760    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
761    ///     )
762    ///     .select_log_fields([LogField::Address, LogField::Data])
763    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
764    /// let response = client.get_events(query).await?;
765    ///
766    /// println!("Retrieved {} joined events", response.data.len());
767    /// # Ok(())
768    /// # }
769    /// ```
770    pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
771        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
772        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
773        let arrow_response = self.get_arrow(&query).await.context("get data")?;
774        EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
775    }
776
777    /// Executes query once and returns the result using JSON serialization.
778    async fn get_arrow_impl_json(
779        &self,
780        query: &Query,
781    ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
782        let mut url = self.inner.url.clone();
783        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
784        segments.push("query");
785        segments.push("arrow-ipc");
786        std::mem::drop(segments);
787        let req = self.inner.http_client.request(Method::POST, url);
788
789        let res = req.json(&query).send().await.context("execute http req")?;
790
791        let status = res.status();
792        let rate_limit = RateLimitInfo::from_response(&res);
793
794        if status == StatusCode::PAYLOAD_TOO_LARGE {
795            return Err(HyperSyncResponseError::PayloadTooLarge);
796        }
797        if status == StatusCode::TOO_MANY_REQUESTS {
798            return Err(HyperSyncResponseError::RateLimited { rate_limit });
799        }
800        if !status.is_success() {
801            let text = res.text().await.context("read text to see error")?;
802
803            return Err(HyperSyncResponseError::Other(anyhow!(
804                "http response status code {}, err body: {}",
805                status,
806                text
807            )));
808        }
809
810        let bytes = res.bytes().await.context("read response body bytes")?;
811
812        let res = tokio::task::block_in_place(|| {
813            parse_query_response(&bytes).context("parse query response")
814        })?;
815
816        Ok(ArrowImplResponse {
817            response: res,
818            response_bytes: bytes.len().try_into().unwrap(),
819            rate_limit,
820        })
821    }
822
823    fn should_cache_queries(&self) -> bool {
824        matches!(
825            self.inner.serialization_format,
826            SerializationFormat::CapnProto {
827                should_cache_queries: true
828            }
829        )
830    }
831
832    /// Executes query once and returns the result using Cap'n Proto serialization.
833    async fn get_arrow_impl_capnp(
834        &self,
835        query: &Query,
836    ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
837        let mut url = self.inner.url.clone();
838        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
839        segments.push("query");
840        segments.push("arrow-ipc");
841        segments.push("capnp");
842        std::mem::drop(segments);
843
844        let should_cache = self.should_cache_queries();
845
846        if should_cache {
847            let query_with_id = {
848                let mut message = capnp::message::Builder::new_default();
849                let mut request_builder =
850                    message.init_root::<hypersync_net_types_capnp::request::Builder>();
851
852                request_builder
853                    .build_query_id_from_query(query)
854                    .context("build query id")?;
855                let mut query_with_id = Vec::new();
856                capnp::serialize_packed::write_message(&mut query_with_id, &message)
857                    .context("write capnp message")?;
858                query_with_id
859            };
860
861            let req = self.inner.http_client.request(Method::POST, url.clone());
862
863            let res = req
864                .body(query_with_id)
865                .send()
866                .await
867                .context("execute http req")?;
868
869            let status = res.status();
870            let rate_limit = RateLimitInfo::from_response(&res);
871
872            if status == StatusCode::PAYLOAD_TOO_LARGE {
873                return Err(HyperSyncResponseError::PayloadTooLarge);
874            }
875            if status == StatusCode::TOO_MANY_REQUESTS {
876                return Err(HyperSyncResponseError::RateLimited { rate_limit });
877            }
878            if status.is_success() {
879                let bytes = res.bytes().await.context("read response body bytes")?;
880
881                let mut opts = capnp::message::ReaderOptions::new();
882                opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
883                let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
884                    .context("create message reader")?;
885                let query_response = message_reader
886                    .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
887                    .context("get cached_query_response root")?;
888                match query_response.get_either().which().context("get either")? {
889                hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
890                    query_response,
891                ) => {
892                    let res = tokio::task::block_in_place(|| {
893                        let res = query_response?;
894                        read_query_response(&res).context("parse query response cached")
895                    })?;
896                    return Ok(ArrowImplResponse {
897                        response: res,
898                        response_bytes: bytes.len().try_into().unwrap(),
899                        rate_limit,
900                    });
901                }
902                hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
903                    log::trace!("query was not cached, retrying with full query");
904                }
905            }
906            } else {
907                let text = res.text().await.context("read text to see error")?;
908                log::warn!(
909                    "Failed cache query, will retry full query. {}, err body: {}",
910                    status,
911                    text
912                );
913            }
914        };
915
916        let full_query_bytes = {
917            let mut message = capnp::message::Builder::new_default();
918            let mut query_builder =
919                message.init_root::<hypersync_net_types_capnp::request::Builder>();
920
921            query_builder
922                .build_full_query_from_query(query, should_cache)
923                .context("build full query")?;
924            let mut bytes = Vec::new();
925            capnp::serialize_packed::write_message(&mut bytes, &message)
926                .context("write full query capnp message")?;
927            bytes
928        };
929
930        let req = self.inner.http_client.request(Method::POST, url);
931
932        let res = req
933            .body(full_query_bytes)
934            .send()
935            .await
936            .context("execute http req")?;
937
938        let status = res.status();
939        let rate_limit = RateLimitInfo::from_response(&res);
940
941        if status == StatusCode::PAYLOAD_TOO_LARGE {
942            return Err(HyperSyncResponseError::PayloadTooLarge);
943        }
944        if status == StatusCode::TOO_MANY_REQUESTS {
945            return Err(HyperSyncResponseError::RateLimited { rate_limit });
946        }
947        if !status.is_success() {
948            let text = res.text().await.context("read text to see error")?;
949
950            return Err(HyperSyncResponseError::Other(anyhow!(
951                "http response status code {}, err body: {}",
952                status,
953                text
954            )));
955        }
956
957        let bytes = res.bytes().await.context("read response body bytes")?;
958
959        let res = tokio::task::block_in_place(|| {
960            parse_query_response(&bytes).context("parse query response")
961        })?;
962
963        Ok(ArrowImplResponse {
964            response: res,
965            response_bytes: bytes.len().try_into().unwrap(),
966            rate_limit,
967        })
968    }
969
970    /// Executes query once and returns the result, handling payload-too-large by halving block range.
971    async fn get_arrow_impl(
972        &self,
973        query: &Query,
974    ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
975        let mut query = query.clone();
976        loop {
977            let res = match self.inner.serialization_format {
978                SerializationFormat::Json => self.get_arrow_impl_json(&query).await,
979                SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(&query).await,
980            };
981            match res {
982                Ok(res) => return Ok(res),
983                Err(
984                    e @ (HyperSyncResponseError::Other(_)
985                    | HyperSyncResponseError::RateLimited { .. }),
986                ) => return Err(e),
987                Err(HyperSyncResponseError::PayloadTooLarge) => {
988                    let block_range = if let Some(to_block) = query.to_block {
989                        let current = to_block - query.from_block;
990                        if current < 2 {
991                            return Err(HyperSyncResponseError::Other(anyhow!(
992                                "Payload is too large and query is using the minimum block range."
993                            )));
994                        }
995                        // Half the current block range
996                        current / 2
997                    } else {
998                        200
999                    };
1000                    let to_block = query.from_block + block_range;
1001                    query.to_block = Some(to_block);
1002
1003                    log::trace!(
1004                        "Payload is too large, retrying with block range from: {} to: {}",
1005                        query.from_block,
1006                        to_block
1007                    );
1008                }
1009            }
1010        }
1011    }
1012
1013    /// Executes query with retries and returns the response in Arrow format.
1014    pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
1015        const WAIT_ON_RATE_LIMIT: bool = true;
1016        self.get_arrow_with_size(query, WAIT_ON_RATE_LIMIT)
1017            .await
1018            .map(|res| res.response)
1019    }
1020
1021    /// Internal implementation for get_arrow.
1022    ///
1023    /// When `wait_on_rate_limit` is `false`, a 429 response is returned
1024    /// immediately with the rate limit info instead of being retried.
1025    async fn get_arrow_with_size(
1026        &self,
1027        query: &Query,
1028        wait_on_rate_limit: bool,
1029    ) -> Result<ArrowImplResponse> {
1030        let mut base = self.inner.retry_base_ms;
1031
1032        let mut err = anyhow!("");
1033
1034        if self.inner.proactive_rate_limit_sleep {
1035            if wait_on_rate_limit {
1036                self.wait_for_rate_limit().await;
1037            } else if let Some(rate_limit) = self.get_proactive_rate_limit_info() {
1038                return Err(anyhow::anyhow!(HyperSyncResponseError::RateLimited {
1039                    rate_limit
1040                }));
1041            }
1042        }
1043
1044        for _ in 0..self.inner.max_num_retries + 1 {
1045            match self.get_arrow_impl(query).await {
1046                Ok(res) => {
1047                    self.update_rate_limit_state(&res.rate_limit);
1048                    return Ok(res);
1049                }
1050                Err(HyperSyncResponseError::RateLimited { rate_limit }) => {
1051                    self.update_rate_limit_state(&rate_limit);
1052                    if !wait_on_rate_limit {
1053                        return Err(anyhow::anyhow!(HyperSyncResponseError::RateLimited {
1054                            rate_limit
1055                        }));
1056                    }
1057                    let wait_secs = rate_limit.suggested_wait_secs().unwrap_or(1) + 1;
1058                    log::warn!(
1059                        "rate limited by server ({rate_limit}), waiting {wait_secs}s before retry. To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens"
1060                    );
1061                    err = err.context(format!("rate limited by server ({rate_limit}). To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens"));
1062                    tokio::time::sleep(Duration::from_secs(wait_secs)).await;
1063                    continue;
1064                }
1065                Err(HyperSyncResponseError::Other(e)) => {
1066                    log::error!(
1067                        "failed to get arrow data from server, retrying... The error was: {e:?}"
1068                    );
1069                    err = err.context(format!("{e:?}"));
1070                }
1071                Err(HyperSyncResponseError::PayloadTooLarge) => {
1072                    // This shouldn't happen since get_arrow_impl handles it, but just in case
1073                    log::error!("unexpected PayloadTooLarge from get_arrow_impl, retrying...");
1074                    err = err.context("unexpected PayloadTooLarge");
1075                }
1076            }
1077
1078            let base_ms = Duration::from_millis(base);
1079            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
1080                rand::random(),
1081                self.inner.retry_backoff_ms,
1082            ));
1083
1084            tokio::time::sleep(base_ms + jitter).await;
1085
1086            base = std::cmp::min(
1087                base + self.inner.retry_backoff_ms,
1088                self.inner.retry_ceiling_ms,
1089            );
1090        }
1091
1092        Err(err)
1093    }
1094
1095    /// Spawns task to execute query and return data via a channel.
1096    ///
1097    /// # Example
1098    /// ```no_run
1099    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
1100    ///
1101    /// # async fn example() -> anyhow::Result<()> {
1102    /// let client = Client::builder()
1103    ///     .chain_id(1)
1104    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1105    ///     .build()?;
1106    ///
1107    /// // Stream all ERC20 transfer events
1108    /// let query = Query::new()
1109    ///     .from_block(19000000)
1110    ///     .where_logs(
1111    ///         LogFilter::all()
1112    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1113    ///     )
1114    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
1115    /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
1116    ///
1117    /// while let Some(response) = receiver.recv().await {
1118    ///     let response = response?;
1119    ///     println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
1120    /// }
1121    /// # Ok(())
1122    /// # }
1123    /// ```
1124    pub async fn stream(
1125        &self,
1126        query: Query,
1127        config: StreamConfig,
1128    ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1129        check_simple_stream_params(&config)?;
1130
1131        let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1132            mpsc::channel(config.concurrency);
1133
1134        let mut inner_rx = self
1135            .stream_arrow(query, config)
1136            .await
1137            .context("start inner stream")?;
1138
1139        tokio::spawn(async move {
1140            while let Some(resp) = inner_rx.recv().await {
1141                let msg = resp
1142                    .context("inner receiver")
1143                    .and_then(|r| QueryResponse::try_from(&r));
1144                let is_err = msg.is_err();
1145                if tx.send(msg).await.is_err() || is_err {
1146                    return;
1147                }
1148            }
1149        });
1150
1151        Ok(rx)
1152    }
1153
1154    /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1155    /// returning data via a channel.
1156    ///
1157    /// This method automatically joins blocks, transactions, and logs into unified events,
1158    /// then streams them via a channel for real-time processing.
1159    ///
1160    /// # Example
1161    /// ```no_run
1162    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1163    ///
1164    /// # async fn example() -> anyhow::Result<()> {
1165    /// let client = Client::builder()
1166    ///     .chain_id(1)
1167    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1168    ///     .build()?;
1169    ///
1170    /// // Stream NFT transfer events with transaction context
1171    /// let query = Query::new()
1172    ///     .from_block(19000000)
1173    ///     .where_logs(
1174    ///         LogFilter::all()
1175    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1176    ///     )
1177    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1178    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1179    /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1180    ///
1181    /// while let Some(response) = receiver.recv().await {
1182    ///     let response = response?;
1183    ///     println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1184    /// }
1185    /// # Ok(())
1186    /// # }
1187    /// ```
1188    pub async fn stream_events(
1189        &self,
1190        mut query: Query,
1191        config: StreamConfig,
1192    ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1193        check_simple_stream_params(&config)?;
1194
1195        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1196
1197        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1198
1199        let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1200            mpsc::channel(config.concurrency);
1201
1202        let mut inner_rx = self
1203            .stream_arrow(query, config)
1204            .await
1205            .context("start inner stream")?;
1206
1207        tokio::spawn(async move {
1208            while let Some(resp) = inner_rx.recv().await {
1209                let msg = resp
1210                    .context("inner receiver")
1211                    .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1212                let is_err = msg.is_err();
1213                if tx.send(msg).await.is_err() || is_err {
1214                    return;
1215                }
1216            }
1217        });
1218
1219        Ok(rx)
1220    }
1221
1222    /// Spawns task to execute query and return data via a channel in Arrow format.
1223    ///
1224    /// Returns raw Apache Arrow data via a channel for high-performance processing.
1225    /// Ideal for applications that need to work directly with columnar data.
1226    ///
1227    /// # Example
1228    /// ```no_run
1229    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1230    ///
1231    /// # async fn example() -> anyhow::Result<()> {
1232    /// let client = Client::builder()
1233    ///     .chain_id(1)
1234    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1235    ///     .build()?;
1236    ///
1237    /// // Stream transaction data in Arrow format for analytics
1238    /// let query = Query::new()
1239    ///     .from_block(19000000)
1240    ///     .to_block_excl(19000100)
1241    ///     .where_transactions(
1242    ///         TransactionFilter::all()
1243    ///             .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1244    ///     )
1245    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1246    /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1247    ///
1248    /// while let Some(response) = receiver.recv().await {
1249    ///     let response = response?;
1250    ///     println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1251    /// }
1252    /// # Ok(())
1253    /// # }
1254    /// ```
1255    pub async fn stream_arrow(
1256        &self,
1257        query: Query,
1258        config: StreamConfig,
1259    ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1260        stream::stream_arrow(self, query, config).await
1261    }
1262
1263    /// Like [`stream_arrow`](Self::stream_arrow), but reports streaming metrics to
1264    /// the given [`StreamObserver`].
1265    ///
1266    /// The engine calls [`StreamObserver::on_request`] for every completed HTTP
1267    /// request, [`StreamObserver::on_progress`] once per scheduler iteration, and
1268    /// [`StreamObserver::on_finish`] with the final [`StreamSummary`]. Pass an
1269    /// `Arc<`[`StreamMetrics`]`>` for a ready-made aggregate handle you can read
1270    /// during or after the stream, or implement the trait for custom exporters.
1271    ///
1272    /// This is the only entry point that does any metrics work; the plain
1273    /// `stream*` methods have zero observability overhead.
1274    ///
1275    /// [`StreamMetrics`]: crate::StreamMetrics
1276    /// [`StreamObserver`]: crate::StreamObserver
1277    /// [`StreamSummary`]: crate::StreamSummary
1278    pub async fn stream_arrow_with_observer(
1279        &self,
1280        query: Query,
1281        config: StreamConfig,
1282        observer: Arc<dyn crate::StreamObserver>,
1283    ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1284        stream::stream_arrow_with_observer(self, query, config, observer).await
1285    }
1286
1287    /// Executes query and returns the response in Arrow format along with
1288    /// rate limit information from the server.
1289    ///
1290    /// Unlike [`get_arrow`](Self::get_arrow), this method does **not** retry on
1291    /// HTTP 429 responses. Instead it returns
1292    /// [`RateLimitResponse::RateLimited`] so the caller can implement their own
1293    /// back-off. Other transient errors are still retried normally.
1294    pub async fn get_arrow_with_rate_limit(
1295        &self,
1296        query: &Query,
1297    ) -> Result<RateLimitResponse<ArrowResponseData>> {
1298        const WAIT_ON_RATE_LIMIT: bool = false;
1299        match self.get_arrow_with_size(query, WAIT_ON_RATE_LIMIT).await {
1300            Ok(result) => Ok(RateLimitResponse::Success {
1301                response: result.response,
1302                rate_limit: result.rate_limit,
1303            }),
1304            Err(e) => match e.downcast::<HyperSyncResponseError>() {
1305                Ok(HyperSyncResponseError::RateLimited { rate_limit }) => {
1306                    Ok(RateLimitResponse::RateLimited(rate_limit))
1307                }
1308                Ok(other) => Err(other.into()),
1309                Err(e) => Err(e),
1310            },
1311        }
1312    }
1313
1314    /// Executes query and returns the response along with
1315    /// rate limit information from the server.
1316    ///
1317    /// Unlike [`get`](Self::get), this method does **not** retry on HTTP 429
1318    /// responses. Instead it returns
1319    /// [`RateLimitResponse::RateLimited`] so the caller can implement their own
1320    /// back-off. Other transient errors are still retried normally.
1321    pub async fn get_with_rate_limit(
1322        &self,
1323        query: &Query,
1324    ) -> Result<RateLimitResponse<ResponseData>> {
1325        match self.get_arrow_with_rate_limit(query).await? {
1326            RateLimitResponse::Success {
1327                response,
1328                rate_limit,
1329            } => {
1330                let converted =
1331                    QueryResponse::try_from(&response).context("convert arrow response")?;
1332                Ok(RateLimitResponse::Success {
1333                    response: converted,
1334                    rate_limit,
1335                })
1336            }
1337            RateLimitResponse::RateLimited(info) => Ok(RateLimitResponse::RateLimited(info)),
1338        }
1339    }
1340
1341    /// Executes query and returns joined events along with rate limit
1342    /// information from the server.
1343    ///
1344    /// Unlike [`get_events`](Self::get_events), this method does **not** retry
1345    /// on HTTP 429 responses. Instead it returns
1346    /// [`RateLimitResponse::RateLimited`] so the caller can implement their own
1347    /// back-off. Other transient errors are still retried normally.
1348    pub async fn get_events_with_rate_limit(
1349        &self,
1350        mut query: Query,
1351    ) -> Result<RateLimitResponse<Vec<simple_types::Event>>> {
1352        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1353        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1354        match self.get_arrow_with_rate_limit(&query).await? {
1355            RateLimitResponse::Success {
1356                response,
1357                rate_limit,
1358            } => {
1359                let converted =
1360                    EventResponse::try_from_arrow_response(&response, &event_join_strategy)?;
1361                Ok(RateLimitResponse::Success {
1362                    response: converted,
1363                    rate_limit,
1364                })
1365            }
1366            RateLimitResponse::RateLimited(info) => Ok(RateLimitResponse::RateLimited(info)),
1367        }
1368    }
1369
1370    /// Returns the most recently observed rate limit information, if any.
1371    ///
1372    /// Updated after every request (including inside streams). Returns `None`
1373    /// if no requests have been made yet or the server hasn't returned rate limit headers.
1374    pub fn rate_limit_info(&self) -> Option<RateLimitInfo> {
1375        self.inner
1376            .rate_limit_state
1377            .lock()
1378            .expect("rate_limit_state mutex poisoned")
1379            .as_ref()
1380            .map(|(info, _captured_at)| info.clone())
1381    }
1382
1383    /// Returns the current rate limit info if the client is known to be rate-limited
1384    /// and the reset window has not yet elapsed.
1385    fn get_proactive_rate_limit_info(&self) -> Option<RateLimitInfo> {
1386        let state = self
1387            .inner
1388            .rate_limit_state
1389            .lock()
1390            .expect("rate_limit_state mutex poisoned");
1391        match state.as_ref() {
1392            Some((info, captured_at)) if info.is_rate_limited() => {
1393                let remaining_wait = info.suggested_wait_secs().map(|secs| {
1394                    let elapsed = captured_at.elapsed().as_secs();
1395                    secs.saturating_sub(elapsed)
1396                });
1397                if remaining_wait.unwrap_or(0) > 0 {
1398                    Some(info.clone())
1399                } else {
1400                    None
1401                }
1402            }
1403            _ => None,
1404        }
1405    }
1406
1407    /// Waits until the current rate limit window resets, if the client is rate limited.
1408    ///
1409    /// Returns immediately if:
1410    /// - No rate limit information has been observed yet
1411    /// - There is remaining quota in the current window
1412    ///
1413    /// This method is useful for consumers who want to explicitly wait before making
1414    /// requests, for example when coordinating rate limits across multiple systems.
1415    pub async fn wait_for_rate_limit(&self) {
1416        if let Some(info) = self.get_proactive_rate_limit_info() {
1417            let secs = info.suggested_wait_secs().unwrap_or(0);
1418            if secs > 0 {
1419                log::warn!(
1420                    "rate limit exhausted ({info}), proactively waiting {secs}s for window reset. To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens"
1421                );
1422                tokio::time::sleep(Duration::from_secs(secs)).await;
1423            }
1424        }
1425    }
1426
1427    /// Updates the internally tracked rate limit state with the current timestamp.
1428    fn update_rate_limit_state(&self, rate_limit: &RateLimitInfo) {
1429        // Only update if the response actually contained rate limit headers
1430        if rate_limit.limit.is_some()
1431            || rate_limit.remaining.is_some()
1432            || rate_limit.reset_secs.is_some()
1433        {
1434            let mut state = self
1435                .inner
1436                .rate_limit_state
1437                .lock()
1438                .expect("rate_limit_state mutex poisoned");
1439            *state = Some((rate_limit.clone(), Instant::now()));
1440        }
1441    }
1442
1443    /// Getter for url field.
1444    ///
1445    /// # Example
1446    /// ```
1447    /// use hypersync_client::Client;
1448    ///
1449    /// let client = Client::builder()
1450    ///     .chain_id(1)
1451    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1452    ///     .build()
1453    ///     .unwrap();
1454    ///
1455    /// println!("Client URL: {}", client.url());
1456    /// ```
1457    pub fn url(&self) -> &Url {
1458        &self.inner.url
1459    }
1460}
1461
1462/// Builder for creating a hypersync client with configuration options.
1463///
1464/// This builder provides a fluent API for configuring client settings like URL,
1465/// authentication, timeouts, and retry behavior.
1466///
1467/// # Example
1468/// ```
1469/// use hypersync_client::{Client, SerializationFormat};
1470///
1471/// let client = Client::builder()
1472///     .chain_id(1)
1473///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1474///     .http_req_timeout_millis(30000)
1475///     .max_num_retries(3)
1476///     .build()
1477///     .unwrap();
1478/// ```
1479#[derive(Debug, Clone, Default)]
1480pub struct ClientBuilder(ClientConfig);
1481
1482impl ClientBuilder {
1483    /// Creates a new ClientBuilder with default configuration.
1484    pub fn new() -> Self {
1485        Self::default()
1486    }
1487
1488    /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1489    ///
1490    /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1491    ///
1492    /// # Arguments
1493    /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1494    ///
1495    /// # Example
1496    /// ```
1497    /// use hypersync_client::Client;
1498    ///
1499    /// let client = Client::builder()
1500    ///     .chain_id(1) // Ethereum mainnet
1501    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1502    ///     .build()
1503    ///     .unwrap();
1504    /// ```
1505    pub fn chain_id(mut self, chain_id: u64) -> Self {
1506        self.0.url = format!("https://{chain_id}.hypersync.xyz");
1507        self
1508    }
1509
1510    /// Sets a custom URL for the hypersync server.
1511    ///
1512    /// Use this method when you need to connect to a custom hypersync endpoint
1513    /// instead of the default public endpoints.
1514    ///
1515    /// # Arguments
1516    /// * `url` - The hypersync server URL
1517    ///
1518    /// # Example
1519    /// ```
1520    /// use hypersync_client::Client;
1521    ///
1522    /// let client = Client::builder()
1523    ///     .url("https://my-custom-hypersync.example.com")
1524    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1525    ///     .build()
1526    ///     .unwrap();
1527    /// ```
1528    pub fn url<S: ToString>(mut self, url: S) -> Self {
1529        self.0.url = url.to_string();
1530        self
1531    }
1532
1533    /// Sets the api token for authentication.
1534    ///
1535    /// Required for accessing authenticated hypersync endpoints.
1536    ///
1537    /// # Arguments
1538    /// * `api_token` - The authentication token
1539    ///
1540    /// # Example
1541    /// ```
1542    /// use hypersync_client::Client;
1543    ///
1544    /// let client = Client::builder()
1545    ///     .chain_id(1)
1546    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1547    ///     .build()
1548    ///     .unwrap();
1549    /// ```
1550    pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1551        self.0.api_token = api_token.to_string();
1552        self
1553    }
1554
1555    /// Sets the HTTP request timeout in milliseconds.
1556    ///
1557    /// # Arguments
1558    /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1559    ///
1560    /// # Example
1561    /// ```
1562    /// use hypersync_client::Client;
1563    ///
1564    /// let client = Client::builder()
1565    ///     .chain_id(1)
1566    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1567    ///     .http_req_timeout_millis(60000) // 60 second timeout
1568    ///     .build()
1569    ///     .unwrap();
1570    /// ```
1571    pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1572        self.0.http_req_timeout_millis = http_req_timeout_millis;
1573        self
1574    }
1575
1576    /// Sets the maximum number of retries for failed requests.
1577    ///
1578    /// # Arguments
1579    /// * `max_num_retries` - Maximum number of retries (default: 10)
1580    ///
1581    /// # Example
1582    /// ```
1583    /// use hypersync_client::Client;
1584    ///
1585    /// let client = Client::builder()
1586    ///     .chain_id(1)
1587    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1588    ///     .max_num_retries(5)
1589    ///     .build()
1590    ///     .unwrap();
1591    /// ```
1592    pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1593        self.0.max_num_retries = max_num_retries;
1594        self
1595    }
1596
1597    /// Sets the backoff increment for retry delays.
1598    ///
1599    /// This value is added to the base delay on each retry attempt.
1600    ///
1601    /// # Arguments
1602    /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1603    ///
1604    /// # Example
1605    /// ```
1606    /// use hypersync_client::Client;
1607    ///
1608    /// let client = Client::builder()
1609    ///     .chain_id(1)
1610    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1611    ///     .retry_backoff_ms(1000) // 1 second backoff increment
1612    ///     .build()
1613    ///     .unwrap();
1614    /// ```
1615    pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1616        self.0.retry_backoff_ms = retry_backoff_ms;
1617        self
1618    }
1619
1620    /// Sets the initial delay for retry attempts.
1621    ///
1622    /// # Arguments
1623    /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1624    ///
1625    /// # Example
1626    /// ```
1627    /// use hypersync_client::Client;
1628    ///
1629    /// let client = Client::builder()
1630    ///     .chain_id(1)
1631    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1632    ///     .retry_base_ms(1000) // Start with 1 second delay
1633    ///     .build()
1634    ///     .unwrap();
1635    /// ```
1636    pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1637        self.0.retry_base_ms = retry_base_ms;
1638        self
1639    }
1640
1641    /// Sets the maximum delay for retry attempts.
1642    ///
1643    /// The retry delay will not exceed this value, even with backoff increments.
1644    ///
1645    /// # Arguments
1646    /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1647    ///
1648    /// # Example
1649    /// ```
1650    /// use hypersync_client::Client;
1651    ///
1652    /// let client = Client::builder()
1653    ///     .chain_id(1)
1654    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1655    ///     .retry_ceiling_ms(30000) // Cap at 30 seconds
1656    ///     .build()
1657    ///     .unwrap();
1658    /// ```
1659    pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1660        self.0.retry_ceiling_ms = retry_ceiling_ms;
1661        self
1662    }
1663
1664    /// Sets whether to proactively sleep when rate limited.
1665    ///
1666    /// When enabled (default), the client will wait for the rate limit window to reset
1667    /// before sending requests it knows will be rejected. Set to `false` to disable
1668    /// this behavior and handle rate limits yourself.
1669    ///
1670    /// # Arguments
1671    /// * `proactive_rate_limit_sleep` - Whether to enable proactive rate limit sleeping (default: true)
1672    pub fn proactive_rate_limit_sleep(mut self, proactive_rate_limit_sleep: bool) -> Self {
1673        self.0.proactive_rate_limit_sleep = proactive_rate_limit_sleep;
1674        self
1675    }
1676
1677    /// Sets the serialization format for client-server communication.
1678    ///
1679    /// # Arguments
1680    /// * `serialization_format` - The format to use (JSON or CapnProto)
1681    ///
1682    /// # Example
1683    /// ```
1684    /// use hypersync_client::{Client, SerializationFormat};
1685    ///
1686    /// let client = Client::builder()
1687    ///     .chain_id(1)
1688    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1689    ///     .serialization_format(SerializationFormat::Json)
1690    ///     .build()
1691    ///     .unwrap();
1692    /// ```
1693    pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1694        self.0.serialization_format = serialization_format;
1695        self
1696    }
1697
1698    /// Builds the client with the configured settings.
1699    ///
1700    /// # Returns
1701    /// * `Result<Client>` - The configured client or an error if configuration is invalid
1702    ///
1703    /// # Errors
1704    /// Returns an error if:
1705    /// * The URL is malformed
1706    /// * Required configuration is missing
1707    ///
1708    /// # Example
1709    /// ```
1710    /// use hypersync_client::Client;
1711    ///
1712    /// let client = Client::builder()
1713    ///     .chain_id(1)
1714    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1715    ///     .build()
1716    ///     .unwrap();
1717    /// ```
1718    pub fn build(self) -> Result<Client> {
1719        if self.0.url.is_empty() {
1720            anyhow::bail!(
1721                "endpoint needs to be set, try using builder.chain_id(1) or\
1722                builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1723            )
1724        }
1725        Client::new(self.0)
1726    }
1727}
1728
1729/// 200ms
1730const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1731const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1732/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1733/// so we timeout after 15s (3x the ping interval).
1734const READ_TIMEOUT: Duration = Duration::from_secs(15);
1735
1736/// Events emitted by the height stream.
1737#[derive(Debug, Clone, PartialEq, Eq)]
1738pub enum HeightStreamEvent {
1739    /// Successfully connected or reconnected to the SSE stream.
1740    Connected,
1741    /// Received a height update from the server.
1742    Height(u64),
1743    /// Connection lost, will attempt to reconnect after the specified delay.
1744    Reconnecting {
1745        /// Duration to wait before attempting reconnection.
1746        delay: Duration,
1747        /// The error that caused the reconnection.
1748        error_msg: String,
1749    },
1750}
1751
1752enum InternalStreamEvent {
1753    Publish(HeightStreamEvent),
1754    Ping,
1755    Unknown(String),
1756}
1757
1758impl Client {
1759    fn get_es_stream(&self) -> Result<EventSource> {
1760        // Build the GET /height/sse request
1761        let mut url = self.inner.url.clone();
1762        url.path_segments_mut()
1763            .ok()
1764            .context("invalid base URL")?
1765            .extend(&["height", "sse"]);
1766
1767        let req = self
1768            .inner
1769            .http_client
1770            // Don't set timeout for SSE stream
1771            .request_no_timeout(Method::GET, url);
1772
1773        // Configure exponential backoff for library-level retries
1774        let retry_policy = ExponentialBackoff::new(
1775            INITIAL_RECONNECT_DELAY,
1776            2.0,
1777            Some(MAX_RECONNECT_DELAY),
1778            None, // unlimited retries
1779        );
1780
1781        // Turn the request into an EventSource stream with retries
1782        let mut es = reqwest_eventsource::EventSource::new(req)
1783            .context("unexpected error creating EventSource")?;
1784        es.set_retry_policy(Box::new(retry_policy));
1785        Ok(es)
1786    }
1787
1788    async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1789        let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1790            .await
1791            .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1792        else {
1793            return Ok(None);
1794        };
1795
1796        let e = match res.context("failed response")? {
1797            Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1798            Event::Message(event) => match event.event.as_str() {
1799                "height" => {
1800                    let height = event
1801                        .data
1802                        .trim()
1803                        .parse::<u64>()
1804                        .context("parsing height from event data")?;
1805                    InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1806                }
1807                "ping" => InternalStreamEvent::Ping,
1808                _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1809            },
1810        };
1811
1812        Ok(Some(e))
1813    }
1814
1815    async fn stream_height_events(
1816        es: &mut EventSource,
1817        tx: &mpsc::Sender<HeightStreamEvent>,
1818    ) -> Result<bool> {
1819        let mut received_an_event = false;
1820        while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1821            match event {
1822                InternalStreamEvent::Publish(event) => {
1823                    received_an_event = true;
1824                    if tx.send(event).await.is_err() {
1825                        return Ok(received_an_event); // Receiver dropped, exit task
1826                    }
1827                }
1828                InternalStreamEvent::Ping => (), // ignore pings
1829                InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1830            }
1831        }
1832        Ok(received_an_event)
1833    }
1834
1835    fn get_delay(consecutive_failures: u32) -> Duration {
1836        if consecutive_failures > 0 {
1837            /// helper function to calculate 2^x
1838            /// optimization using bit shifting
1839            const fn two_to_pow(x: u32) -> u32 {
1840                1 << x
1841            }
1842            // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1843            INITIAL_RECONNECT_DELAY
1844                .saturating_mul(two_to_pow(consecutive_failures - 1))
1845                .min(MAX_RECONNECT_DELAY)
1846        } else {
1847            // On zero consecutive failures, 0 delay
1848            Duration::from_millis(0)
1849        }
1850    }
1851
1852    async fn stream_height_events_with_retry(
1853        &self,
1854        tx: &mpsc::Sender<HeightStreamEvent>,
1855    ) -> Result<()> {
1856        let mut consecutive_failures = 0u32;
1857
1858        loop {
1859            // should always be able to creat a new es stream
1860            // something is wrong with the req builder otherwise
1861            let mut es = self.get_es_stream().context("get es stream")?;
1862
1863            let mut error = anyhow!("");
1864
1865            match Self::stream_height_events(&mut es, tx).await {
1866                Ok(received_an_event) => {
1867                    if received_an_event {
1868                        consecutive_failures = 0; // Reset after successful connection that then failed
1869                    }
1870                    log::trace!("Stream height exited");
1871                }
1872                Err(e) => {
1873                    log::trace!("Stream height failed: {e:?}");
1874                    error = e;
1875                }
1876            }
1877
1878            es.close();
1879
1880            // If the receiver is closed, exit the task
1881            if tx.is_closed() {
1882                break;
1883            }
1884
1885            let delay = Self::get_delay(consecutive_failures);
1886            log::trace!("Reconnecting in {:?}...", delay);
1887
1888            let error_msg = format!("{error:?}");
1889
1890            if tx
1891                .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1892                .await
1893                .is_err()
1894            {
1895                return Ok(()); // Receiver dropped, exit task
1896            }
1897            tokio::time::sleep(delay).await;
1898
1899            // increment consecutive failures so that on the next try
1900            // it will start using back offs
1901            consecutive_failures += 1;
1902        }
1903
1904        Ok(())
1905    }
1906
1907    /// Streams archive height updates from the server via Server-Sent Events.
1908    ///
1909    /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1910    /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1911    ///
1912    /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1913    /// and height updates. This allows applications to display connection status to users.
1914    ///
1915    /// # Returns
1916    /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1917    /// lifecycle and sends events through this channel.
1918    ///
1919    /// # Example
1920    /// ```
1921    /// # use hypersync_client::{Client, HeightStreamEvent};
1922    /// # async fn example() -> anyhow::Result<()> {
1923    /// let client = Client::builder()
1924    ///     .url("https://eth.hypersync.xyz")
1925    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1926    ///     .build()?;
1927    ///
1928    /// let mut rx = client.stream_height();
1929    ///
1930    /// while let Some(event) = rx.recv().await {
1931    ///     match event {
1932    ///         HeightStreamEvent::Connected => println!("Connected to stream"),
1933    ///         HeightStreamEvent::Height(h) => println!("Height: {}", h),
1934    ///         HeightStreamEvent::Reconnecting { delay, error_msg } => {
1935    ///             println!("Reconnecting in {delay:?} due to error: {error_msg}")
1936    ///         }
1937    ///     }
1938    /// }
1939    /// # Ok(())
1940    /// # }
1941    /// ```
1942    pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1943        let (tx, rx) = mpsc::channel(16);
1944        let client = self.clone();
1945
1946        tokio::spawn(async move {
1947            if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1948                log::error!("Stream height failed unexpectedly: {e:?}");
1949            }
1950        });
1951
1952        rx
1953    }
1954}
1955
1956fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1957    if config.event_signature.is_some() {
1958        return Err(anyhow!(
1959            "config.event_signature can't be passed to simple type function. User is expected to \
1960             decode the logs using Decoder."
1961        ));
1962    }
1963    if config.column_mapping.is_some() {
1964        return Err(anyhow!(
1965            "config.column_mapping can't be passed to single type function. User is expected to \
1966             map values manually."
1967        ));
1968    }
1969
1970    Ok(())
1971}
1972
1973/// Used to indicate whether or not a retry should be attempted.
1974#[derive(Debug, thiserror::Error)]
1975pub enum HyperSyncResponseError {
1976    /// Means that the client should retry with a smaller block range.
1977    #[error("hypersync responded with 'payload too large' error")]
1978    PayloadTooLarge,
1979    /// Server responded with 429 Too Many Requests.
1980    #[error("rate limited by server. To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens")]
1981    RateLimited {
1982        /// Rate limit information from the 429 response headers.
1983        rate_limit: RateLimitInfo,
1984    },
1985    /// Any other server error.
1986    #[error(transparent)]
1987    Other(#[from] anyhow::Error),
1988}
1989
1990/// Result of a single Arrow query execution, before retry logic.
1991struct ArrowImplResponse {
1992    /// The parsed Arrow response data.
1993    response: ArrowResponse,
1994    /// Size of the response body in bytes.
1995    response_bytes: u64,
1996    /// Rate limit information parsed from response headers.
1997    rate_limit: RateLimitInfo,
1998}
1999
2000#[cfg(test)]
2001mod tests {
2002    use super::*;
2003    #[test]
2004    fn test_get_delay() {
2005        assert_eq!(
2006            Client::get_delay(0),
2007            Duration::from_millis(0),
2008            "starts with 0 delay"
2009        );
2010        // powers of 2 backoff
2011        assert_eq!(Client::get_delay(1), Duration::from_millis(200));
2012        assert_eq!(Client::get_delay(2), Duration::from_millis(400));
2013        assert_eq!(Client::get_delay(3), Duration::from_millis(800));
2014        // maxes out at 30s
2015        assert_eq!(
2016            Client::get_delay(9),
2017            Duration::from_secs(30),
2018            "max delay is 30s"
2019        );
2020        assert_eq!(
2021            Client::get_delay(10),
2022            Duration::from_secs(30),
2023            "max delay is 30s"
2024        );
2025    }
2026
2027    #[tokio::test]
2028    #[ignore = "integration test requiring live hs server"]
2029    async fn test_http2_is_used() -> anyhow::Result<()> {
2030        let api_token = std::env::var("ENVIO_API_TOKEN")?;
2031        let client = reqwest::Client::builder()
2032            .no_gzip()
2033            .user_agent("hscr-test")
2034            .build()?;
2035
2036        let res = client
2037            .get("https://eth.hypersync.xyz/height")
2038            .bearer_auth(&api_token)
2039            .send()
2040            .await?;
2041
2042        assert_eq!(
2043            res.version(),
2044            reqwest::Version::HTTP_2,
2045            "expected HTTP/2 but got {:?}",
2046            res.version()
2047        );
2048        assert!(res.status().is_success());
2049        Ok(())
2050    }
2051
2052    #[tokio::test]
2053    #[ignore = "integration test with live hs server for height stream"]
2054    async fn test_stream_height_events() -> anyhow::Result<()> {
2055        let (tx, mut rx) = mpsc::channel(16);
2056        let handle = tokio::spawn(async move {
2057            let client = Client::builder()
2058                .url("https://monad-testnet.hypersync.xyz")
2059                .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
2060                .build()?;
2061            let mut es = client.get_es_stream().context("get es stream")?;
2062            Client::stream_height_events(&mut es, &tx).await
2063        });
2064
2065        let val = rx.recv().await;
2066        assert!(val.is_some());
2067        assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
2068        let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
2069            panic!("should have received height")
2070        };
2071        let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
2072            panic!("should have received height")
2073        };
2074        assert!(height2 > height);
2075        drop(rx);
2076
2077        let res = handle.await.expect("should have joined");
2078        let received_an_event =
2079            res.expect("should have ended the stream gracefully after dropping rx");
2080        assert!(received_an_event, "should have received an event");
2081
2082        Ok(())
2083    }
2084}