Skip to main content

hypersync_client/
lib.rs

1#![deny(missing_docs)]
2// README.md is a symlink (ln -s ../README.md README.md) to the root workspace
3// README. This is needed so that both `cargo doc` and `cargo package` work:
4// - `cargo doc`: include_str! resolves the symlink to the root README.
5// - `cargo package`: cargo copies the symlink target into the tarball, so
6//   include_str! finds README.md at the package root during verification.
7#![doc = include_str!("../README.md")]
8use std::time::Instant;
9use std::{sync::Arc, time::Duration};
10
11use anyhow::{anyhow, Context, Result};
12use futures::StreamExt;
13use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
14use reqwest::{Method, StatusCode};
15use reqwest_eventsource::retry::ExponentialBackoff;
16use reqwest_eventsource::{Event, EventSource};
17
18pub mod arrow_reader;
19mod column_mapping;
20mod config;
21mod decode;
22mod decode_call;
23mod from_arrow;
24mod parquet_out;
25mod parse_response;
26pub mod preset_query;
27mod rate_limit;
28mod rayon_async;
29pub mod simple_types;
30mod stream;
31mod types;
32mod util;
33
34pub use hypersync_format as format;
35pub use hypersync_net_types as net_types;
36pub use hypersync_schema as schema;
37
38use parse_response::parse_query_response;
39use tokio::sync::mpsc;
40use types::ResponseData;
41use url::Url;
42
43pub use column_mapping::{ColumnMapping, DataType};
44pub use config::HexOutput;
45pub use config::{ClientConfig, SerializationFormat, StreamConfig};
46pub use decode::Decoder;
47pub use decode_call::CallDecoder;
48pub use rate_limit::RateLimitInfo;
49pub use types::{
50    ArrowResponse, ArrowResponseData, EventResponse, QueryResponse, RateLimitResponse,
51};
52
53use crate::parse_response::read_query_response;
54use crate::simple_types::InternalEventJoinStrategy;
55
56#[derive(Debug)]
57struct HttpClientWrapper {
58    /// Mutable state that needs to be refreshed periodically
59    inner: std::sync::Mutex<HttpClientWrapperInner>,
60    /// HyperSync server api token.
61    api_token: String,
62    /// Standard timeout for http requests.
63    timeout: Duration,
64    /// Pools are never idle since polling often happens on the client
65    /// so connections never timeout and dns lookups never happen again
66    /// this is problematic if the underlying ip address changes during
67    /// failovers on hypersync. Since reqwest doesn't implement this we
68    /// simply recreate the client every time
69    max_connection_age: Duration,
70    /// For refreshing the client
71    user_agent: String,
72}
73
74#[derive(Debug)]
75struct HttpClientWrapperInner {
76    /// Initialized reqwest instance for client url.
77    client: reqwest::Client,
78    /// Timestamp when the client was created
79    created_at: Instant,
80}
81
82impl HttpClientWrapper {
83    fn new(user_agent: String, api_token: String, timeout: Duration) -> Self {
84        let client = reqwest::Client::builder()
85            .no_gzip()
86            .user_agent(&user_agent)
87            .build()
88            .unwrap();
89
90        Self {
91            inner: std::sync::Mutex::new(HttpClientWrapperInner {
92                client,
93                created_at: Instant::now(),
94            }),
95            api_token,
96            timeout,
97            max_connection_age: Duration::from_secs(60),
98            user_agent,
99        }
100    }
101
102    fn get_client(&self) -> reqwest::Client {
103        let mut inner = self.inner.lock().expect("HttpClientWrapper mutex poisoned");
104
105        // Check if client needs refresh due to age
106        if inner.created_at.elapsed() > self.max_connection_age {
107            // Recreate client to force new DNS lookup for failover scenarios
108            inner.client = reqwest::Client::builder()
109                .no_gzip()
110                .user_agent(&self.user_agent)
111                .build()
112                .unwrap();
113            inner.created_at = Instant::now();
114        }
115
116        // Clone is cheap for reqwest::Client (it's Arc-wrapped internally)
117        inner.client.clone()
118    }
119
120    fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
121        let client = self.get_client();
122        client
123            .request(method, url)
124            .timeout(self.timeout)
125            .bearer_auth(&self.api_token)
126    }
127
128    fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
129        let client = self.get_client();
130        client.request(method, url).bearer_auth(&self.api_token)
131    }
132}
133
134/// Internal client state to handle http requests and retries.
135#[derive(Debug)]
136struct ClientInner {
137    http_client: HttpClientWrapper,
138    /// HyperSync server URL.
139    url: Url,
140    /// Number of retries to attempt before returning error.
141    max_num_retries: usize,
142    /// Milliseconds that would be used for retry backoff increasing.
143    retry_backoff_ms: u64,
144    /// Initial wait time for request backoff.
145    retry_base_ms: u64,
146    /// Ceiling time for request backoff.
147    retry_ceiling_ms: u64,
148    /// Query serialization format to use for HTTP requests.
149    serialization_format: SerializationFormat,
150    /// Most recently observed rate limit info from the server, paired with the
151    /// instant it was captured so elapsed time can be subtracted from `reset_secs`.
152    rate_limit_state: std::sync::Mutex<Option<(RateLimitInfo, Instant)>>,
153    /// Whether to proactively sleep when the rate limit is exhausted.
154    proactive_rate_limit_sleep: bool,
155}
156
157/// Client to handle http requests and retries.
158#[derive(Clone, Debug)]
159pub struct Client {
160    inner: Arc<ClientInner>,
161}
162
163impl Client {
164    /// Creates a new client with the given configuration.
165    ///
166    /// Configuration must include the `url` and `api_token` fields.
167    /// # Example
168    /// ```
169    /// use hypersync_client::{Client, ClientConfig};
170    ///
171    /// let config = ClientConfig {
172    ///     url: "https://eth.hypersync.xyz".to_string(),
173    ///     api_token: std::env::var("ENVIO_API_TOKEN")?,
174    ///     ..Default::default()
175    /// };
176    /// let client = Client::new(config)?;
177    /// # Ok::<(), anyhow::Error>(())
178    /// ```
179    ///
180    /// # Errors
181    /// This method fails if the config is invalid.
182    pub fn new(cfg: ClientConfig) -> Result<Self> {
183        // hscr stands for hypersync client rust
184        cfg.validate().context("invalid ClientConfig")?;
185        let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
186        Self::new_internal(cfg, user_agent)
187    }
188
189    /// Creates a new client builder.
190    ///
191    /// # Example
192    /// ```
193    /// use hypersync_client::Client;
194    ///
195    /// let client = Client::builder()
196    ///     .chain_id(1)
197    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
198    ///     .build()
199    ///     .unwrap();
200    /// ```
201    pub fn builder() -> ClientBuilder {
202        ClientBuilder::new()
203    }
204
205    #[doc(hidden)]
206    pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
207        // Creates a new client with the given configuration and custom user agent.
208        // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
209        Self::new_internal(cfg, user_agent.into())
210    }
211
212    /// Internal constructor that takes both config and user agent.
213    fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
214        let http_client = HttpClientWrapper::new(
215            user_agent,
216            cfg.api_token,
217            Duration::from_millis(cfg.http_req_timeout_millis),
218        );
219
220        let url = Url::parse(&cfg.url).context("url is malformed")?;
221
222        Ok(Self {
223            inner: Arc::new(ClientInner {
224                http_client,
225                url,
226                max_num_retries: cfg.max_num_retries,
227                retry_backoff_ms: cfg.retry_backoff_ms,
228                retry_base_ms: cfg.retry_base_ms,
229                retry_ceiling_ms: cfg.retry_ceiling_ms,
230                serialization_format: cfg.serialization_format,
231                rate_limit_state: std::sync::Mutex::new(None),
232                proactive_rate_limit_sleep: cfg.proactive_rate_limit_sleep,
233            }),
234        })
235    }
236
237    /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
238    /// query and stream configuration.
239    ///
240    /// ### Implementation
241    /// Runs multiple queries simultaneously based on config.concurrency.
242    ///
243    /// Each query runs until it reaches query.to, server height, any max_num_* query param,
244    /// or execution timed out by server.
245    ///
246    /// # ⚠️ Important Warning
247    ///
248    /// This method will continue executing until the query has run to completion from beginning
249    /// to the end of the block range defined in the query. For heavy queries with large block
250    /// ranges or high data volumes, consider:
251    ///
252    /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
253    /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
254    /// - Break large queries into smaller block ranges
255    ///
256    /// # Example
257    /// ```no_run
258    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
259    ///
260    /// # async fn example() -> anyhow::Result<()> {
261    /// let client = Client::builder()
262    ///     .chain_id(1)
263    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
264    ///     .build()?;
265    ///
266    /// // Query ERC20 transfer events
267    /// let query = Query::new()
268    ///     .from_block(19000000)
269    ///     .to_block_excl(19000010)
270    ///     .where_logs(
271    ///         LogFilter::all()
272    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
273    ///     )
274    ///     .select_log_fields([LogField::Address, LogField::Data]);
275    /// let response = client.collect(query, StreamConfig::default()).await?;
276    ///
277    /// println!("Collected {} events", response.data.logs.len());
278    /// # Ok(())
279    /// # }
280    /// ```
281    pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
282        check_simple_stream_params(&config)?;
283
284        let mut recv = stream::stream_arrow(self, query, config)
285            .await
286            .context("start stream")?;
287
288        let mut data = ResponseData::default();
289        let mut archive_height = None;
290        let mut next_block = 0;
291        let mut total_execution_time = 0;
292
293        while let Some(res) = recv.recv().await {
294            let res = res.context("get response")?;
295            let res: QueryResponse =
296                QueryResponse::try_from(&res).context("convert arrow response")?;
297
298            for batch in res.data.blocks {
299                data.blocks.push(batch);
300            }
301            for batch in res.data.transactions {
302                data.transactions.push(batch);
303            }
304            for batch in res.data.logs {
305                data.logs.push(batch);
306            }
307            for batch in res.data.traces {
308                data.traces.push(batch);
309            }
310
311            archive_height = res.archive_height;
312            next_block = res.next_block;
313            total_execution_time += res.total_execution_time
314        }
315
316        Ok(QueryResponse {
317            archive_height,
318            next_block,
319            total_execution_time,
320            data,
321            rollback_guard: None,
322        })
323    }
324
325    /// Retrieves events through a stream using the provided query and stream configuration.
326    ///
327    /// # ⚠️ Important Warning
328    ///
329    /// This method will continue executing until the query has run to completion from beginning
330    /// to the end of the block range defined in the query. For heavy queries with large block
331    /// ranges or high data volumes, consider:
332    ///
333    /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
334    /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
335    /// - Break large queries into smaller block ranges
336    ///
337    /// # Example
338    /// ```no_run
339    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
340    ///
341    /// # async fn example() -> anyhow::Result<()> {
342    /// let client = Client::builder()
343    ///     .chain_id(1)
344    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
345    ///     .build()?;
346    ///
347    /// // Query transactions to a specific address
348    /// let query = Query::new()
349    ///     .from_block(19000000)
350    ///     .to_block_excl(19000100)
351    ///     .where_transactions(
352    ///         TransactionFilter::all()
353    ///             .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
354    ///     )
355    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
356    /// let response = client.collect_events(query, StreamConfig::default()).await?;
357    ///
358    /// println!("Collected {} events", response.data.len());
359    /// # Ok(())
360    /// # }
361    /// ```
362    pub async fn collect_events(
363        &self,
364        mut query: Query,
365        config: StreamConfig,
366    ) -> Result<EventResponse> {
367        check_simple_stream_params(&config)?;
368
369        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
370        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
371
372        let mut recv = stream::stream_arrow(self, query, config)
373            .await
374            .context("start stream")?;
375
376        let mut data = Vec::new();
377        let mut archive_height = None;
378        let mut next_block = 0;
379        let mut total_execution_time = 0;
380
381        while let Some(res) = recv.recv().await {
382            let res = res.context("get response")?;
383            let res: QueryResponse =
384                QueryResponse::try_from(&res).context("convert arrow response")?;
385            let events = event_join_strategy.join_from_response_data(res.data);
386
387            data.extend(events);
388
389            archive_height = res.archive_height;
390            next_block = res.next_block;
391            total_execution_time += res.total_execution_time
392        }
393
394        Ok(EventResponse {
395            archive_height,
396            next_block,
397            total_execution_time,
398            data,
399            rollback_guard: None,
400        })
401    }
402
403    /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
404    /// the provided query and stream configuration.
405    ///
406    /// Returns data in Apache Arrow format for high-performance columnar processing.
407    /// Useful for analytics workloads or when working with Arrow-compatible tools.
408    ///
409    /// # ⚠️ Important Warning
410    ///
411    /// This method will continue executing until the query has run to completion from beginning
412    /// to the end of the block range defined in the query. For heavy queries with large block
413    /// ranges or high data volumes, consider:
414    ///
415    /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
416    /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
417    /// - Break large queries into smaller block ranges
418    ///
419    /// # Example
420    /// ```no_run
421    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
422    ///
423    /// # async fn example() -> anyhow::Result<()> {
424    /// let client = Client::builder()
425    ///     .chain_id(1)
426    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
427    ///     .build()?;
428    ///
429    /// // Get block data in Arrow format for analytics
430    /// let query = Query::new()
431    ///     .from_block(19000000)
432    ///     .to_block_excl(19000100)
433    ///     .include_all_blocks()
434    ///     .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
435    /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
436    ///
437    /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
438    /// # Ok(())
439    /// # }
440    /// ```
441    pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
442        let mut recv = stream::stream_arrow(self, query, config)
443            .await
444            .context("start stream")?;
445
446        let mut data = ArrowResponseData::default();
447        let mut archive_height = None;
448        let mut next_block = 0;
449        let mut total_execution_time = 0;
450
451        while let Some(res) = recv.recv().await {
452            let res = res.context("get response")?;
453
454            for batch in res.data.blocks {
455                data.blocks.push(batch);
456            }
457            for batch in res.data.transactions {
458                data.transactions.push(batch);
459            }
460            for batch in res.data.logs {
461                data.logs.push(batch);
462            }
463            for batch in res.data.traces {
464                data.traces.push(batch);
465            }
466            for batch in res.data.decoded_logs {
467                data.decoded_logs.push(batch);
468            }
469
470            archive_height = res.archive_height;
471            next_block = res.next_block;
472            total_execution_time += res.total_execution_time
473        }
474
475        Ok(ArrowResponse {
476            archive_height,
477            next_block,
478            total_execution_time,
479            data,
480            rollback_guard: None,
481        })
482    }
483
484    /// Writes parquet file getting data through a stream using the provided path, query,
485    /// and stream configuration.
486    ///
487    /// Streams data directly to a Parquet file for efficient storage and later analysis.
488    /// Perfect for data exports or ETL pipelines.
489    ///
490    /// # ⚠️ Important Warning
491    ///
492    /// This method will continue executing until the query has run to completion from beginning
493    /// to the end of the block range defined in the query. For heavy queries with large block
494    /// ranges or high data volumes, consider:
495    ///
496    /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
497    /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
498    /// - Break large queries into smaller block ranges
499    ///
500    /// # Example
501    /// ```no_run
502    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
503    ///
504    /// # async fn example() -> anyhow::Result<()> {
505    /// let client = Client::builder()
506    ///     .chain_id(1)
507    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
508    ///     .build()?;
509    ///
510    /// // Export all DEX trades to Parquet for analysis
511    /// let query = Query::new()
512    ///     .from_block(19000000)
513    ///     .to_block_excl(19010000)
514    ///     .where_logs(
515    ///         LogFilter::all()
516    ///             .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
517    ///     )
518    ///     .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
519    /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
520    ///
521    /// println!("Trade data exported to trades.parquet");
522    /// # Ok(())
523    /// # }
524    /// ```
525    pub async fn collect_parquet(
526        &self,
527        path: &str,
528        query: Query,
529        config: StreamConfig,
530    ) -> Result<()> {
531        parquet_out::collect_parquet(self, path, query, config).await
532    }
533
534    /// Internal implementation of getting chain_id from server
535    async fn get_chain_id_impl(&self) -> Result<u64> {
536        let mut url = self.inner.url.clone();
537        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
538        segments.push("chain_id");
539        std::mem::drop(segments);
540        let req = self.inner.http_client.request(Method::GET, url);
541
542        let res = req.send().await.context("execute http req")?;
543
544        let status = res.status();
545        if !status.is_success() {
546            return Err(anyhow!("http response status code {}", status));
547        }
548
549        let chain_id: ChainId = res.json().await.context("read response body json")?;
550
551        Ok(chain_id.chain_id)
552    }
553
554    /// Internal implementation of getting height from server
555    async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
556        let mut url = self.inner.url.clone();
557        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
558        segments.push("height");
559        std::mem::drop(segments);
560        let mut req = self.inner.http_client.request(Method::GET, url);
561        if let Some(http_timeout_override) = http_timeout_override {
562            req = req.timeout(http_timeout_override);
563        }
564
565        let res = req.send().await.context("execute http req")?;
566
567        let status = res.status();
568        if !status.is_success() {
569            return Err(anyhow!("http response status code {}", status));
570        }
571
572        let height: ArchiveHeight = res.json().await.context("read response body json")?;
573
574        Ok(height.height.unwrap_or(0))
575    }
576
577    /// Get the chain_id from the server with retries.
578    ///
579    /// # Example
580    /// ```no_run
581    /// use hypersync_client::Client;
582    ///
583    /// # async fn example() -> anyhow::Result<()> {
584    /// let client = Client::builder()
585    ///     .chain_id(1)
586    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
587    ///     .build()?;
588    ///
589    /// let chain_id = client.get_chain_id().await?;
590    /// println!("Connected to chain ID: {}", chain_id);
591    /// # Ok(())
592    /// # }
593    /// ```
594    pub async fn get_chain_id(&self) -> Result<u64> {
595        let mut base = self.inner.retry_base_ms;
596
597        let mut err = anyhow!("");
598
599        for _ in 0..self.inner.max_num_retries + 1 {
600            match self.get_chain_id_impl().await {
601                Ok(res) => return Ok(res),
602                Err(e) => {
603                    log::error!(
604                        "failed to get chain_id from server, retrying... The error was: {e:?}"
605                    );
606                    err = err.context(format!("{e:?}"));
607                }
608            }
609
610            let base_ms = Duration::from_millis(base);
611            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
612                rand::random(),
613                self.inner.retry_backoff_ms,
614            ));
615
616            tokio::time::sleep(base_ms + jitter).await;
617
618            base = std::cmp::min(
619                base + self.inner.retry_backoff_ms,
620                self.inner.retry_ceiling_ms,
621            );
622        }
623
624        Err(err)
625    }
626
627    /// Get the height of from server with retries.
628    ///
629    /// # Example
630    /// ```no_run
631    /// use hypersync_client::Client;
632    ///
633    /// # async fn example() -> anyhow::Result<()> {
634    /// let client = Client::builder()
635    ///     .chain_id(1)
636    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
637    ///     .build()?;
638    ///
639    /// let height = client.get_height().await?;
640    /// println!("Current block height: {}", height);
641    /// # Ok(())
642    /// # }
643    /// ```
644    pub async fn get_height(&self) -> Result<u64> {
645        let mut base = self.inner.retry_base_ms;
646
647        let mut err = anyhow!("");
648
649        for _ in 0..self.inner.max_num_retries + 1 {
650            match self.get_height_impl(None).await {
651                Ok(res) => return Ok(res),
652                Err(e) => {
653                    log::error!(
654                        "failed to get height from server, retrying... The error was: {e:?}"
655                    );
656                    err = err.context(format!("{e:?}"));
657                }
658            }
659
660            let base_ms = Duration::from_millis(base);
661            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
662                rand::random(),
663                self.inner.retry_backoff_ms,
664            ));
665
666            tokio::time::sleep(base_ms + jitter).await;
667
668            base = std::cmp::min(
669                base + self.inner.retry_backoff_ms,
670                self.inner.retry_ceiling_ms,
671            );
672        }
673
674        Err(err)
675    }
676
677    /// Get the height of the Client instance for health checks.
678    ///
679    /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
680    ///
681    /// # Example
682    /// ```no_run
683    /// use hypersync_client::Client;
684    /// use std::time::Duration;
685    ///
686    /// # async fn example() -> anyhow::Result<()> {
687    /// let client = Client::builder()
688    ///     .chain_id(1)
689    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
690    ///     .build()?;
691    ///
692    /// // Quick health check with 5 second timeout
693    /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
694    /// println!("Server is healthy at block: {}", height);
695    /// # Ok(())
696    /// # }
697    /// ```
698    pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
699        self.get_height_impl(http_req_timeout).await
700    }
701
702    /// Executes query with retries and returns the response.
703    ///
704    /// # Example
705    /// ```no_run
706    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
707    ///
708    /// # async fn example() -> anyhow::Result<()> {
709    /// let client = Client::builder()
710    ///     .chain_id(1)
711    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
712    ///     .build()?;
713    ///
714    /// // Query all blocks from a specific range
715    /// let query = Query::new()
716    ///     .from_block(19000000)
717    ///     .to_block_excl(19000010)
718    ///     .include_all_blocks()
719    ///     .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
720    /// let response = client.get(&query).await?;
721    ///
722    /// println!("Retrieved {} blocks", response.data.blocks.len());
723    /// # Ok(())
724    /// # }
725    /// ```
726    pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
727        let arrow_response = self.get_arrow(query).await.context("get data")?;
728        let converted =
729            QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
730        Ok(converted)
731    }
732
733    /// Add block, transaction and log fields selection to the query, executes it with retries
734    /// and returns the response.
735    ///
736    /// This method automatically joins blocks, transactions, and logs into unified events,
737    /// making it easier to work with related blockchain data.
738    ///
739    /// # Example
740    /// ```no_run
741    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
742    ///
743    /// # async fn example() -> anyhow::Result<()> {
744    /// let client = Client::builder()
745    ///     .chain_id(1)
746    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
747    ///     .build()?;
748    ///
749    /// // Query ERC20 transfers with transaction context
750    /// let query = Query::new()
751    ///     .from_block(19000000)
752    ///     .to_block_excl(19000010)
753    ///     .where_logs(
754    ///         LogFilter::all()
755    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
756    ///     )
757    ///     .select_log_fields([LogField::Address, LogField::Data])
758    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
759    /// let response = client.get_events(query).await?;
760    ///
761    /// println!("Retrieved {} joined events", response.data.len());
762    /// # Ok(())
763    /// # }
764    /// ```
765    pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
766        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
767        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
768        let arrow_response = self.get_arrow(&query).await.context("get data")?;
769        EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
770    }
771
772    /// Executes query once and returns the result using JSON serialization.
773    async fn get_arrow_impl_json(
774        &self,
775        query: &Query,
776    ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
777        let mut url = self.inner.url.clone();
778        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
779        segments.push("query");
780        segments.push("arrow-ipc");
781        std::mem::drop(segments);
782        let req = self.inner.http_client.request(Method::POST, url);
783
784        let res = req.json(&query).send().await.context("execute http req")?;
785
786        let status = res.status();
787        let rate_limit = RateLimitInfo::from_response(&res);
788
789        if status == StatusCode::PAYLOAD_TOO_LARGE {
790            return Err(HyperSyncResponseError::PayloadTooLarge);
791        }
792        if status == StatusCode::TOO_MANY_REQUESTS {
793            return Err(HyperSyncResponseError::RateLimited { rate_limit });
794        }
795        if !status.is_success() {
796            let text = res.text().await.context("read text to see error")?;
797
798            return Err(HyperSyncResponseError::Other(anyhow!(
799                "http response status code {}, err body: {}",
800                status,
801                text
802            )));
803        }
804
805        let bytes = res.bytes().await.context("read response body bytes")?;
806
807        let res = tokio::task::block_in_place(|| {
808            parse_query_response(&bytes).context("parse query response")
809        })?;
810
811        Ok(ArrowImplResponse {
812            response: res,
813            response_bytes: bytes.len().try_into().unwrap(),
814            rate_limit,
815        })
816    }
817
818    fn should_cache_queries(&self) -> bool {
819        matches!(
820            self.inner.serialization_format,
821            SerializationFormat::CapnProto {
822                should_cache_queries: true
823            }
824        )
825    }
826
827    /// Executes query once and returns the result using Cap'n Proto serialization.
828    async fn get_arrow_impl_capnp(
829        &self,
830        query: &Query,
831    ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
832        let mut url = self.inner.url.clone();
833        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
834        segments.push("query");
835        segments.push("arrow-ipc");
836        segments.push("capnp");
837        std::mem::drop(segments);
838
839        let should_cache = self.should_cache_queries();
840
841        if should_cache {
842            let query_with_id = {
843                let mut message = capnp::message::Builder::new_default();
844                let mut request_builder =
845                    message.init_root::<hypersync_net_types_capnp::request::Builder>();
846
847                request_builder
848                    .build_query_id_from_query(query)
849                    .context("build query id")?;
850                let mut query_with_id = Vec::new();
851                capnp::serialize_packed::write_message(&mut query_with_id, &message)
852                    .context("write capnp message")?;
853                query_with_id
854            };
855
856            let req = self.inner.http_client.request(Method::POST, url.clone());
857
858            let res = req
859                .body(query_with_id)
860                .send()
861                .await
862                .context("execute http req")?;
863
864            let status = res.status();
865            let rate_limit = RateLimitInfo::from_response(&res);
866
867            if status == StatusCode::PAYLOAD_TOO_LARGE {
868                return Err(HyperSyncResponseError::PayloadTooLarge);
869            }
870            if status == StatusCode::TOO_MANY_REQUESTS {
871                return Err(HyperSyncResponseError::RateLimited { rate_limit });
872            }
873            if status.is_success() {
874                let bytes = res.bytes().await.context("read response body bytes")?;
875
876                let mut opts = capnp::message::ReaderOptions::new();
877                opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
878                let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
879                    .context("create message reader")?;
880                let query_response = message_reader
881                    .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
882                    .context("get cached_query_response root")?;
883                match query_response.get_either().which().context("get either")? {
884                hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
885                    query_response,
886                ) => {
887                    let res = tokio::task::block_in_place(|| {
888                        let res = query_response?;
889                        read_query_response(&res).context("parse query response cached")
890                    })?;
891                    return Ok(ArrowImplResponse {
892                        response: res,
893                        response_bytes: bytes.len().try_into().unwrap(),
894                        rate_limit,
895                    });
896                }
897                hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
898                    log::trace!("query was not cached, retrying with full query");
899                }
900            }
901            } else {
902                let text = res.text().await.context("read text to see error")?;
903                log::warn!(
904                    "Failed cache query, will retry full query. {}, err body: {}",
905                    status,
906                    text
907                );
908            }
909        };
910
911        let full_query_bytes = {
912            let mut message = capnp::message::Builder::new_default();
913            let mut query_builder =
914                message.init_root::<hypersync_net_types_capnp::request::Builder>();
915
916            query_builder
917                .build_full_query_from_query(query, should_cache)
918                .context("build full query")?;
919            let mut bytes = Vec::new();
920            capnp::serialize_packed::write_message(&mut bytes, &message)
921                .context("write full query capnp message")?;
922            bytes
923        };
924
925        let req = self.inner.http_client.request(Method::POST, url);
926
927        let res = req
928            .body(full_query_bytes)
929            .send()
930            .await
931            .context("execute http req")?;
932
933        let status = res.status();
934        let rate_limit = RateLimitInfo::from_response(&res);
935
936        if status == StatusCode::PAYLOAD_TOO_LARGE {
937            return Err(HyperSyncResponseError::PayloadTooLarge);
938        }
939        if status == StatusCode::TOO_MANY_REQUESTS {
940            return Err(HyperSyncResponseError::RateLimited { rate_limit });
941        }
942        if !status.is_success() {
943            let text = res.text().await.context("read text to see error")?;
944
945            return Err(HyperSyncResponseError::Other(anyhow!(
946                "http response status code {}, err body: {}",
947                status,
948                text
949            )));
950        }
951
952        let bytes = res.bytes().await.context("read response body bytes")?;
953
954        let res = tokio::task::block_in_place(|| {
955            parse_query_response(&bytes).context("parse query response")
956        })?;
957
958        Ok(ArrowImplResponse {
959            response: res,
960            response_bytes: bytes.len().try_into().unwrap(),
961            rate_limit,
962        })
963    }
964
965    /// Executes query once and returns the result, handling payload-too-large by halving block range.
966    async fn get_arrow_impl(
967        &self,
968        query: &Query,
969    ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
970        let mut query = query.clone();
971        loop {
972            let res = match self.inner.serialization_format {
973                SerializationFormat::Json => self.get_arrow_impl_json(&query).await,
974                SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(&query).await,
975            };
976            match res {
977                Ok(res) => return Ok(res),
978                Err(
979                    e @ (HyperSyncResponseError::Other(_)
980                    | HyperSyncResponseError::RateLimited { .. }),
981                ) => return Err(e),
982                Err(HyperSyncResponseError::PayloadTooLarge) => {
983                    let block_range = if let Some(to_block) = query.to_block {
984                        let current = to_block - query.from_block;
985                        if current < 2 {
986                            return Err(HyperSyncResponseError::Other(anyhow!(
987                                "Payload is too large and query is using the minimum block range."
988                            )));
989                        }
990                        // Half the current block range
991                        current / 2
992                    } else {
993                        200
994                    };
995                    let to_block = query.from_block + block_range;
996                    query.to_block = Some(to_block);
997
998                    log::trace!(
999                        "Payload is too large, retrying with block range from: {} to: {}",
1000                        query.from_block,
1001                        to_block
1002                    );
1003                }
1004            }
1005        }
1006    }
1007
1008    /// Executes query with retries and returns the response in Arrow format.
1009    pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
1010        const WAIT_ON_RATE_LIMIT: bool = true;
1011        self.get_arrow_with_size(query, WAIT_ON_RATE_LIMIT)
1012            .await
1013            .map(|res| res.response)
1014    }
1015
1016    /// Internal implementation for get_arrow.
1017    ///
1018    /// When `wait_on_rate_limit` is `false`, a 429 response is returned
1019    /// immediately with the rate limit info instead of being retried.
1020    async fn get_arrow_with_size(
1021        &self,
1022        query: &Query,
1023        wait_on_rate_limit: bool,
1024    ) -> Result<ArrowImplResponse> {
1025        let mut base = self.inner.retry_base_ms;
1026
1027        let mut err = anyhow!("");
1028
1029        if self.inner.proactive_rate_limit_sleep {
1030            if wait_on_rate_limit {
1031                self.wait_for_rate_limit().await;
1032            } else if let Some(rate_limit) = self.get_proactive_rate_limit_info() {
1033                return Err(anyhow::anyhow!(HyperSyncResponseError::RateLimited {
1034                    rate_limit
1035                }));
1036            }
1037        }
1038
1039        for _ in 0..self.inner.max_num_retries + 1 {
1040            match self.get_arrow_impl(query).await {
1041                Ok(res) => {
1042                    self.update_rate_limit_state(&res.rate_limit);
1043                    return Ok(res);
1044                }
1045                Err(HyperSyncResponseError::RateLimited { rate_limit }) => {
1046                    self.update_rate_limit_state(&rate_limit);
1047                    if !wait_on_rate_limit {
1048                        return Err(anyhow::anyhow!(HyperSyncResponseError::RateLimited {
1049                            rate_limit
1050                        }));
1051                    }
1052                    let wait_secs = rate_limit.suggested_wait_secs().unwrap_or(1) + 1;
1053                    log::warn!(
1054                        "rate limited by server ({rate_limit}), waiting {wait_secs}s before retry. To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens"
1055                    );
1056                    err = err.context(format!("rate limited by server ({rate_limit}). To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens"));
1057                    tokio::time::sleep(Duration::from_secs(wait_secs)).await;
1058                    continue;
1059                }
1060                Err(HyperSyncResponseError::Other(e)) => {
1061                    log::error!(
1062                        "failed to get arrow data from server, retrying... The error was: {e:?}"
1063                    );
1064                    err = err.context(format!("{e:?}"));
1065                }
1066                Err(HyperSyncResponseError::PayloadTooLarge) => {
1067                    // This shouldn't happen since get_arrow_impl handles it, but just in case
1068                    log::error!("unexpected PayloadTooLarge from get_arrow_impl, retrying...");
1069                    err = err.context("unexpected PayloadTooLarge");
1070                }
1071            }
1072
1073            let base_ms = Duration::from_millis(base);
1074            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
1075                rand::random(),
1076                self.inner.retry_backoff_ms,
1077            ));
1078
1079            tokio::time::sleep(base_ms + jitter).await;
1080
1081            base = std::cmp::min(
1082                base + self.inner.retry_backoff_ms,
1083                self.inner.retry_ceiling_ms,
1084            );
1085        }
1086
1087        Err(err)
1088    }
1089
1090    /// Spawns task to execute query and return data via a channel.
1091    ///
1092    /// # Example
1093    /// ```no_run
1094    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
1095    ///
1096    /// # async fn example() -> anyhow::Result<()> {
1097    /// let client = Client::builder()
1098    ///     .chain_id(1)
1099    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1100    ///     .build()?;
1101    ///
1102    /// // Stream all ERC20 transfer events
1103    /// let query = Query::new()
1104    ///     .from_block(19000000)
1105    ///     .where_logs(
1106    ///         LogFilter::all()
1107    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1108    ///     )
1109    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
1110    /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
1111    ///
1112    /// while let Some(response) = receiver.recv().await {
1113    ///     let response = response?;
1114    ///     println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
1115    /// }
1116    /// # Ok(())
1117    /// # }
1118    /// ```
1119    pub async fn stream(
1120        &self,
1121        query: Query,
1122        config: StreamConfig,
1123    ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1124        check_simple_stream_params(&config)?;
1125
1126        let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1127            mpsc::channel(config.concurrency);
1128
1129        let mut inner_rx = self
1130            .stream_arrow(query, config)
1131            .await
1132            .context("start inner stream")?;
1133
1134        tokio::spawn(async move {
1135            while let Some(resp) = inner_rx.recv().await {
1136                let msg = resp
1137                    .context("inner receiver")
1138                    .and_then(|r| QueryResponse::try_from(&r));
1139                let is_err = msg.is_err();
1140                if tx.send(msg).await.is_err() || is_err {
1141                    return;
1142                }
1143            }
1144        });
1145
1146        Ok(rx)
1147    }
1148
1149    /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1150    /// returning data via a channel.
1151    ///
1152    /// This method automatically joins blocks, transactions, and logs into unified events,
1153    /// then streams them via a channel for real-time processing.
1154    ///
1155    /// # Example
1156    /// ```no_run
1157    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1158    ///
1159    /// # async fn example() -> anyhow::Result<()> {
1160    /// let client = Client::builder()
1161    ///     .chain_id(1)
1162    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1163    ///     .build()?;
1164    ///
1165    /// // Stream NFT transfer events with transaction context
1166    /// let query = Query::new()
1167    ///     .from_block(19000000)
1168    ///     .where_logs(
1169    ///         LogFilter::all()
1170    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1171    ///     )
1172    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1173    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1174    /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1175    ///
1176    /// while let Some(response) = receiver.recv().await {
1177    ///     let response = response?;
1178    ///     println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1179    /// }
1180    /// # Ok(())
1181    /// # }
1182    /// ```
1183    pub async fn stream_events(
1184        &self,
1185        mut query: Query,
1186        config: StreamConfig,
1187    ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1188        check_simple_stream_params(&config)?;
1189
1190        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1191
1192        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1193
1194        let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1195            mpsc::channel(config.concurrency);
1196
1197        let mut inner_rx = self
1198            .stream_arrow(query, config)
1199            .await
1200            .context("start inner stream")?;
1201
1202        tokio::spawn(async move {
1203            while let Some(resp) = inner_rx.recv().await {
1204                let msg = resp
1205                    .context("inner receiver")
1206                    .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1207                let is_err = msg.is_err();
1208                if tx.send(msg).await.is_err() || is_err {
1209                    return;
1210                }
1211            }
1212        });
1213
1214        Ok(rx)
1215    }
1216
1217    /// Spawns task to execute query and return data via a channel in Arrow format.
1218    ///
1219    /// Returns raw Apache Arrow data via a channel for high-performance processing.
1220    /// Ideal for applications that need to work directly with columnar data.
1221    ///
1222    /// # Example
1223    /// ```no_run
1224    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1225    ///
1226    /// # async fn example() -> anyhow::Result<()> {
1227    /// let client = Client::builder()
1228    ///     .chain_id(1)
1229    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1230    ///     .build()?;
1231    ///
1232    /// // Stream transaction data in Arrow format for analytics
1233    /// let query = Query::new()
1234    ///     .from_block(19000000)
1235    ///     .to_block_excl(19000100)
1236    ///     .where_transactions(
1237    ///         TransactionFilter::all()
1238    ///             .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1239    ///     )
1240    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1241    /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1242    ///
1243    /// while let Some(response) = receiver.recv().await {
1244    ///     let response = response?;
1245    ///     println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1246    /// }
1247    /// # Ok(())
1248    /// # }
1249    /// ```
1250    pub async fn stream_arrow(
1251        &self,
1252        query: Query,
1253        config: StreamConfig,
1254    ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1255        stream::stream_arrow(self, query, config).await
1256    }
1257
1258    /// Executes query and returns the response in Arrow format along with
1259    /// rate limit information from the server.
1260    ///
1261    /// Unlike [`get_arrow`](Self::get_arrow), this method does **not** retry on
1262    /// HTTP 429 responses. Instead it returns
1263    /// [`RateLimitResponse::RateLimited`] so the caller can implement their own
1264    /// back-off. Other transient errors are still retried normally.
1265    pub async fn get_arrow_with_rate_limit(
1266        &self,
1267        query: &Query,
1268    ) -> Result<RateLimitResponse<ArrowResponseData>> {
1269        const WAIT_ON_RATE_LIMIT: bool = false;
1270        match self.get_arrow_with_size(query, WAIT_ON_RATE_LIMIT).await {
1271            Ok(result) => Ok(RateLimitResponse::Success {
1272                response: result.response,
1273                rate_limit: result.rate_limit,
1274            }),
1275            Err(e) => match e.downcast::<HyperSyncResponseError>() {
1276                Ok(HyperSyncResponseError::RateLimited { rate_limit }) => {
1277                    Ok(RateLimitResponse::RateLimited(rate_limit))
1278                }
1279                Ok(other) => Err(other.into()),
1280                Err(e) => Err(e),
1281            },
1282        }
1283    }
1284
1285    /// Executes query and returns the response along with
1286    /// rate limit information from the server.
1287    ///
1288    /// Unlike [`get`](Self::get), this method does **not** retry on HTTP 429
1289    /// responses. Instead it returns
1290    /// [`RateLimitResponse::RateLimited`] so the caller can implement their own
1291    /// back-off. Other transient errors are still retried normally.
1292    pub async fn get_with_rate_limit(
1293        &self,
1294        query: &Query,
1295    ) -> Result<RateLimitResponse<ResponseData>> {
1296        match self.get_arrow_with_rate_limit(query).await? {
1297            RateLimitResponse::Success {
1298                response,
1299                rate_limit,
1300            } => {
1301                let converted =
1302                    QueryResponse::try_from(&response).context("convert arrow response")?;
1303                Ok(RateLimitResponse::Success {
1304                    response: converted,
1305                    rate_limit,
1306                })
1307            }
1308            RateLimitResponse::RateLimited(info) => Ok(RateLimitResponse::RateLimited(info)),
1309        }
1310    }
1311
1312    /// Executes query and returns joined events along with rate limit
1313    /// information from the server.
1314    ///
1315    /// Unlike [`get_events`](Self::get_events), this method does **not** retry
1316    /// on HTTP 429 responses. Instead it returns
1317    /// [`RateLimitResponse::RateLimited`] so the caller can implement their own
1318    /// back-off. Other transient errors are still retried normally.
1319    pub async fn get_events_with_rate_limit(
1320        &self,
1321        mut query: Query,
1322    ) -> Result<RateLimitResponse<Vec<simple_types::Event>>> {
1323        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1324        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1325        match self.get_arrow_with_rate_limit(&query).await? {
1326            RateLimitResponse::Success {
1327                response,
1328                rate_limit,
1329            } => {
1330                let converted =
1331                    EventResponse::try_from_arrow_response(&response, &event_join_strategy)?;
1332                Ok(RateLimitResponse::Success {
1333                    response: converted,
1334                    rate_limit,
1335                })
1336            }
1337            RateLimitResponse::RateLimited(info) => Ok(RateLimitResponse::RateLimited(info)),
1338        }
1339    }
1340
1341    /// Returns the most recently observed rate limit information, if any.
1342    ///
1343    /// Updated after every request (including inside streams). Returns `None`
1344    /// if no requests have been made yet or the server hasn't returned rate limit headers.
1345    pub fn rate_limit_info(&self) -> Option<RateLimitInfo> {
1346        self.inner
1347            .rate_limit_state
1348            .lock()
1349            .expect("rate_limit_state mutex poisoned")
1350            .as_ref()
1351            .map(|(info, _captured_at)| info.clone())
1352    }
1353
1354    /// Returns the current rate limit info if the client is known to be rate-limited
1355    /// and the reset window has not yet elapsed.
1356    fn get_proactive_rate_limit_info(&self) -> Option<RateLimitInfo> {
1357        let state = self
1358            .inner
1359            .rate_limit_state
1360            .lock()
1361            .expect("rate_limit_state mutex poisoned");
1362        match state.as_ref() {
1363            Some((info, captured_at)) if info.is_rate_limited() => {
1364                let remaining_wait = info.suggested_wait_secs().map(|secs| {
1365                    let elapsed = captured_at.elapsed().as_secs();
1366                    secs.saturating_sub(elapsed)
1367                });
1368                if remaining_wait.unwrap_or(0) > 0 {
1369                    Some(info.clone())
1370                } else {
1371                    None
1372                }
1373            }
1374            _ => None,
1375        }
1376    }
1377
1378    /// Waits until the current rate limit window resets, if the client is rate limited.
1379    ///
1380    /// Returns immediately if:
1381    /// - No rate limit information has been observed yet
1382    /// - There is remaining quota in the current window
1383    ///
1384    /// This method is useful for consumers who want to explicitly wait before making
1385    /// requests, for example when coordinating rate limits across multiple systems.
1386    pub async fn wait_for_rate_limit(&self) {
1387        if let Some(info) = self.get_proactive_rate_limit_info() {
1388            let secs = info.suggested_wait_secs().unwrap_or(0);
1389            if secs > 0 {
1390                log::warn!(
1391                    "rate limit exhausted ({info}), proactively waiting {secs}s for window reset. To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens"
1392                );
1393                tokio::time::sleep(Duration::from_secs(secs)).await;
1394            }
1395        }
1396    }
1397
1398    /// Updates the internally tracked rate limit state with the current timestamp.
1399    fn update_rate_limit_state(&self, rate_limit: &RateLimitInfo) {
1400        // Only update if the response actually contained rate limit headers
1401        if rate_limit.limit.is_some()
1402            || rate_limit.remaining.is_some()
1403            || rate_limit.reset_secs.is_some()
1404        {
1405            let mut state = self
1406                .inner
1407                .rate_limit_state
1408                .lock()
1409                .expect("rate_limit_state mutex poisoned");
1410            *state = Some((rate_limit.clone(), Instant::now()));
1411        }
1412    }
1413
1414    /// Getter for url field.
1415    ///
1416    /// # Example
1417    /// ```
1418    /// use hypersync_client::Client;
1419    ///
1420    /// let client = Client::builder()
1421    ///     .chain_id(1)
1422    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1423    ///     .build()
1424    ///     .unwrap();
1425    ///
1426    /// println!("Client URL: {}", client.url());
1427    /// ```
1428    pub fn url(&self) -> &Url {
1429        &self.inner.url
1430    }
1431}
1432
1433/// Builder for creating a hypersync client with configuration options.
1434///
1435/// This builder provides a fluent API for configuring client settings like URL,
1436/// authentication, timeouts, and retry behavior.
1437///
1438/// # Example
1439/// ```
1440/// use hypersync_client::{Client, SerializationFormat};
1441///
1442/// let client = Client::builder()
1443///     .chain_id(1)
1444///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1445///     .http_req_timeout_millis(30000)
1446///     .max_num_retries(3)
1447///     .build()
1448///     .unwrap();
1449/// ```
1450#[derive(Debug, Clone, Default)]
1451pub struct ClientBuilder(ClientConfig);
1452
1453impl ClientBuilder {
1454    /// Creates a new ClientBuilder with default configuration.
1455    pub fn new() -> Self {
1456        Self::default()
1457    }
1458
1459    /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1460    ///
1461    /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1462    ///
1463    /// # Arguments
1464    /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1465    ///
1466    /// # Example
1467    /// ```
1468    /// use hypersync_client::Client;
1469    ///
1470    /// let client = Client::builder()
1471    ///     .chain_id(1) // Ethereum mainnet
1472    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1473    ///     .build()
1474    ///     .unwrap();
1475    /// ```
1476    pub fn chain_id(mut self, chain_id: u64) -> Self {
1477        self.0.url = format!("https://{chain_id}.hypersync.xyz");
1478        self
1479    }
1480
1481    /// Sets a custom URL for the hypersync server.
1482    ///
1483    /// Use this method when you need to connect to a custom hypersync endpoint
1484    /// instead of the default public endpoints.
1485    ///
1486    /// # Arguments
1487    /// * `url` - The hypersync server URL
1488    ///
1489    /// # Example
1490    /// ```
1491    /// use hypersync_client::Client;
1492    ///
1493    /// let client = Client::builder()
1494    ///     .url("https://my-custom-hypersync.example.com")
1495    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1496    ///     .build()
1497    ///     .unwrap();
1498    /// ```
1499    pub fn url<S: ToString>(mut self, url: S) -> Self {
1500        self.0.url = url.to_string();
1501        self
1502    }
1503
1504    /// Sets the api token for authentication.
1505    ///
1506    /// Required for accessing authenticated hypersync endpoints.
1507    ///
1508    /// # Arguments
1509    /// * `api_token` - The authentication token
1510    ///
1511    /// # Example
1512    /// ```
1513    /// use hypersync_client::Client;
1514    ///
1515    /// let client = Client::builder()
1516    ///     .chain_id(1)
1517    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1518    ///     .build()
1519    ///     .unwrap();
1520    /// ```
1521    pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1522        self.0.api_token = api_token.to_string();
1523        self
1524    }
1525
1526    /// Sets the HTTP request timeout in milliseconds.
1527    ///
1528    /// # Arguments
1529    /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1530    ///
1531    /// # Example
1532    /// ```
1533    /// use hypersync_client::Client;
1534    ///
1535    /// let client = Client::builder()
1536    ///     .chain_id(1)
1537    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1538    ///     .http_req_timeout_millis(60000) // 60 second timeout
1539    ///     .build()
1540    ///     .unwrap();
1541    /// ```
1542    pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1543        self.0.http_req_timeout_millis = http_req_timeout_millis;
1544        self
1545    }
1546
1547    /// Sets the maximum number of retries for failed requests.
1548    ///
1549    /// # Arguments
1550    /// * `max_num_retries` - Maximum number of retries (default: 10)
1551    ///
1552    /// # Example
1553    /// ```
1554    /// use hypersync_client::Client;
1555    ///
1556    /// let client = Client::builder()
1557    ///     .chain_id(1)
1558    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1559    ///     .max_num_retries(5)
1560    ///     .build()
1561    ///     .unwrap();
1562    /// ```
1563    pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1564        self.0.max_num_retries = max_num_retries;
1565        self
1566    }
1567
1568    /// Sets the backoff increment for retry delays.
1569    ///
1570    /// This value is added to the base delay on each retry attempt.
1571    ///
1572    /// # Arguments
1573    /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1574    ///
1575    /// # Example
1576    /// ```
1577    /// use hypersync_client::Client;
1578    ///
1579    /// let client = Client::builder()
1580    ///     .chain_id(1)
1581    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1582    ///     .retry_backoff_ms(1000) // 1 second backoff increment
1583    ///     .build()
1584    ///     .unwrap();
1585    /// ```
1586    pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1587        self.0.retry_backoff_ms = retry_backoff_ms;
1588        self
1589    }
1590
1591    /// Sets the initial delay for retry attempts.
1592    ///
1593    /// # Arguments
1594    /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1595    ///
1596    /// # Example
1597    /// ```
1598    /// use hypersync_client::Client;
1599    ///
1600    /// let client = Client::builder()
1601    ///     .chain_id(1)
1602    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1603    ///     .retry_base_ms(1000) // Start with 1 second delay
1604    ///     .build()
1605    ///     .unwrap();
1606    /// ```
1607    pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1608        self.0.retry_base_ms = retry_base_ms;
1609        self
1610    }
1611
1612    /// Sets the maximum delay for retry attempts.
1613    ///
1614    /// The retry delay will not exceed this value, even with backoff increments.
1615    ///
1616    /// # Arguments
1617    /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1618    ///
1619    /// # Example
1620    /// ```
1621    /// use hypersync_client::Client;
1622    ///
1623    /// let client = Client::builder()
1624    ///     .chain_id(1)
1625    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1626    ///     .retry_ceiling_ms(30000) // Cap at 30 seconds
1627    ///     .build()
1628    ///     .unwrap();
1629    /// ```
1630    pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1631        self.0.retry_ceiling_ms = retry_ceiling_ms;
1632        self
1633    }
1634
1635    /// Sets whether to proactively sleep when rate limited.
1636    ///
1637    /// When enabled (default), the client will wait for the rate limit window to reset
1638    /// before sending requests it knows will be rejected. Set to `false` to disable
1639    /// this behavior and handle rate limits yourself.
1640    ///
1641    /// # Arguments
1642    /// * `proactive_rate_limit_sleep` - Whether to enable proactive rate limit sleeping (default: true)
1643    pub fn proactive_rate_limit_sleep(mut self, proactive_rate_limit_sleep: bool) -> Self {
1644        self.0.proactive_rate_limit_sleep = proactive_rate_limit_sleep;
1645        self
1646    }
1647
1648    /// Sets the serialization format for client-server communication.
1649    ///
1650    /// # Arguments
1651    /// * `serialization_format` - The format to use (JSON or CapnProto)
1652    ///
1653    /// # Example
1654    /// ```
1655    /// use hypersync_client::{Client, SerializationFormat};
1656    ///
1657    /// let client = Client::builder()
1658    ///     .chain_id(1)
1659    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1660    ///     .serialization_format(SerializationFormat::Json)
1661    ///     .build()
1662    ///     .unwrap();
1663    /// ```
1664    pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1665        self.0.serialization_format = serialization_format;
1666        self
1667    }
1668
1669    /// Builds the client with the configured settings.
1670    ///
1671    /// # Returns
1672    /// * `Result<Client>` - The configured client or an error if configuration is invalid
1673    ///
1674    /// # Errors
1675    /// Returns an error if:
1676    /// * The URL is malformed
1677    /// * Required configuration is missing
1678    ///
1679    /// # Example
1680    /// ```
1681    /// use hypersync_client::Client;
1682    ///
1683    /// let client = Client::builder()
1684    ///     .chain_id(1)
1685    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1686    ///     .build()
1687    ///     .unwrap();
1688    /// ```
1689    pub fn build(self) -> Result<Client> {
1690        if self.0.url.is_empty() {
1691            anyhow::bail!(
1692                "endpoint needs to be set, try using builder.chain_id(1) or\
1693                builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1694            )
1695        }
1696        Client::new(self.0)
1697    }
1698}
1699
1700/// 200ms
1701const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1702const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1703/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1704/// so we timeout after 15s (3x the ping interval).
1705const READ_TIMEOUT: Duration = Duration::from_secs(15);
1706
1707/// Events emitted by the height stream.
1708#[derive(Debug, Clone, PartialEq, Eq)]
1709pub enum HeightStreamEvent {
1710    /// Successfully connected or reconnected to the SSE stream.
1711    Connected,
1712    /// Received a height update from the server.
1713    Height(u64),
1714    /// Connection lost, will attempt to reconnect after the specified delay.
1715    Reconnecting {
1716        /// Duration to wait before attempting reconnection.
1717        delay: Duration,
1718        /// The error that caused the reconnection.
1719        error_msg: String,
1720    },
1721}
1722
1723enum InternalStreamEvent {
1724    Publish(HeightStreamEvent),
1725    Ping,
1726    Unknown(String),
1727}
1728
1729impl Client {
1730    fn get_es_stream(&self) -> Result<EventSource> {
1731        // Build the GET /height/sse request
1732        let mut url = self.inner.url.clone();
1733        url.path_segments_mut()
1734            .ok()
1735            .context("invalid base URL")?
1736            .extend(&["height", "sse"]);
1737
1738        let req = self
1739            .inner
1740            .http_client
1741            // Don't set timeout for SSE stream
1742            .request_no_timeout(Method::GET, url);
1743
1744        // Configure exponential backoff for library-level retries
1745        let retry_policy = ExponentialBackoff::new(
1746            INITIAL_RECONNECT_DELAY,
1747            2.0,
1748            Some(MAX_RECONNECT_DELAY),
1749            None, // unlimited retries
1750        );
1751
1752        // Turn the request into an EventSource stream with retries
1753        let mut es = reqwest_eventsource::EventSource::new(req)
1754            .context("unexpected error creating EventSource")?;
1755        es.set_retry_policy(Box::new(retry_policy));
1756        Ok(es)
1757    }
1758
1759    async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1760        let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1761            .await
1762            .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1763        else {
1764            return Ok(None);
1765        };
1766
1767        let e = match res.context("failed response")? {
1768            Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1769            Event::Message(event) => match event.event.as_str() {
1770                "height" => {
1771                    let height = event
1772                        .data
1773                        .trim()
1774                        .parse::<u64>()
1775                        .context("parsing height from event data")?;
1776                    InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1777                }
1778                "ping" => InternalStreamEvent::Ping,
1779                _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1780            },
1781        };
1782
1783        Ok(Some(e))
1784    }
1785
1786    async fn stream_height_events(
1787        es: &mut EventSource,
1788        tx: &mpsc::Sender<HeightStreamEvent>,
1789    ) -> Result<bool> {
1790        let mut received_an_event = false;
1791        while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1792            match event {
1793                InternalStreamEvent::Publish(event) => {
1794                    received_an_event = true;
1795                    if tx.send(event).await.is_err() {
1796                        return Ok(received_an_event); // Receiver dropped, exit task
1797                    }
1798                }
1799                InternalStreamEvent::Ping => (), // ignore pings
1800                InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1801            }
1802        }
1803        Ok(received_an_event)
1804    }
1805
1806    fn get_delay(consecutive_failures: u32) -> Duration {
1807        if consecutive_failures > 0 {
1808            /// helper function to calculate 2^x
1809            /// optimization using bit shifting
1810            const fn two_to_pow(x: u32) -> u32 {
1811                1 << x
1812            }
1813            // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1814            INITIAL_RECONNECT_DELAY
1815                .saturating_mul(two_to_pow(consecutive_failures - 1))
1816                .min(MAX_RECONNECT_DELAY)
1817        } else {
1818            // On zero consecutive failures, 0 delay
1819            Duration::from_millis(0)
1820        }
1821    }
1822
1823    async fn stream_height_events_with_retry(
1824        &self,
1825        tx: &mpsc::Sender<HeightStreamEvent>,
1826    ) -> Result<()> {
1827        let mut consecutive_failures = 0u32;
1828
1829        loop {
1830            // should always be able to creat a new es stream
1831            // something is wrong with the req builder otherwise
1832            let mut es = self.get_es_stream().context("get es stream")?;
1833
1834            let mut error = anyhow!("");
1835
1836            match Self::stream_height_events(&mut es, tx).await {
1837                Ok(received_an_event) => {
1838                    if received_an_event {
1839                        consecutive_failures = 0; // Reset after successful connection that then failed
1840                    }
1841                    log::trace!("Stream height exited");
1842                }
1843                Err(e) => {
1844                    log::trace!("Stream height failed: {e:?}");
1845                    error = e;
1846                }
1847            }
1848
1849            es.close();
1850
1851            // If the receiver is closed, exit the task
1852            if tx.is_closed() {
1853                break;
1854            }
1855
1856            let delay = Self::get_delay(consecutive_failures);
1857            log::trace!("Reconnecting in {:?}...", delay);
1858
1859            let error_msg = format!("{error:?}");
1860
1861            if tx
1862                .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1863                .await
1864                .is_err()
1865            {
1866                return Ok(()); // Receiver dropped, exit task
1867            }
1868            tokio::time::sleep(delay).await;
1869
1870            // increment consecutive failures so that on the next try
1871            // it will start using back offs
1872            consecutive_failures += 1;
1873        }
1874
1875        Ok(())
1876    }
1877
1878    /// Streams archive height updates from the server via Server-Sent Events.
1879    ///
1880    /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1881    /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1882    ///
1883    /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1884    /// and height updates. This allows applications to display connection status to users.
1885    ///
1886    /// # Returns
1887    /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1888    /// lifecycle and sends events through this channel.
1889    ///
1890    /// # Example
1891    /// ```
1892    /// # use hypersync_client::{Client, HeightStreamEvent};
1893    /// # async fn example() -> anyhow::Result<()> {
1894    /// let client = Client::builder()
1895    ///     .url("https://eth.hypersync.xyz")
1896    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1897    ///     .build()?;
1898    ///
1899    /// let mut rx = client.stream_height();
1900    ///
1901    /// while let Some(event) = rx.recv().await {
1902    ///     match event {
1903    ///         HeightStreamEvent::Connected => println!("Connected to stream"),
1904    ///         HeightStreamEvent::Height(h) => println!("Height: {}", h),
1905    ///         HeightStreamEvent::Reconnecting { delay, error_msg } => {
1906    ///             println!("Reconnecting in {delay:?} due to error: {error_msg}")
1907    ///         }
1908    ///     }
1909    /// }
1910    /// # Ok(())
1911    /// # }
1912    /// ```
1913    pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1914        let (tx, rx) = mpsc::channel(16);
1915        let client = self.clone();
1916
1917        tokio::spawn(async move {
1918            if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1919                log::error!("Stream height failed unexpectedly: {e:?}");
1920            }
1921        });
1922
1923        rx
1924    }
1925}
1926
1927fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1928    if config.event_signature.is_some() {
1929        return Err(anyhow!(
1930            "config.event_signature can't be passed to simple type function. User is expected to \
1931             decode the logs using Decoder."
1932        ));
1933    }
1934    if config.column_mapping.is_some() {
1935        return Err(anyhow!(
1936            "config.column_mapping can't be passed to single type function. User is expected to \
1937             map values manually."
1938        ));
1939    }
1940
1941    Ok(())
1942}
1943
1944/// Used to indicate whether or not a retry should be attempted.
1945#[derive(Debug, thiserror::Error)]
1946pub enum HyperSyncResponseError {
1947    /// Means that the client should retry with a smaller block range.
1948    #[error("hypersync responded with 'payload too large' error")]
1949    PayloadTooLarge,
1950    /// Server responded with 429 Too Many Requests.
1951    #[error("rate limited by server. To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens")]
1952    RateLimited {
1953        /// Rate limit information from the 429 response headers.
1954        rate_limit: RateLimitInfo,
1955    },
1956    /// Any other server error.
1957    #[error(transparent)]
1958    Other(#[from] anyhow::Error),
1959}
1960
1961/// Result of a single Arrow query execution, before retry logic.
1962struct ArrowImplResponse {
1963    /// The parsed Arrow response data.
1964    response: ArrowResponse,
1965    /// Size of the response body in bytes.
1966    response_bytes: u64,
1967    /// Rate limit information parsed from response headers.
1968    rate_limit: RateLimitInfo,
1969}
1970
1971#[cfg(test)]
1972mod tests {
1973    use super::*;
1974    #[test]
1975    fn test_get_delay() {
1976        assert_eq!(
1977            Client::get_delay(0),
1978            Duration::from_millis(0),
1979            "starts with 0 delay"
1980        );
1981        // powers of 2 backoff
1982        assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1983        assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1984        assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1985        // maxes out at 30s
1986        assert_eq!(
1987            Client::get_delay(9),
1988            Duration::from_secs(30),
1989            "max delay is 30s"
1990        );
1991        assert_eq!(
1992            Client::get_delay(10),
1993            Duration::from_secs(30),
1994            "max delay is 30s"
1995        );
1996    }
1997
1998    #[tokio::test]
1999    #[ignore = "integration test requiring live hs server"]
2000    async fn test_http2_is_used() -> anyhow::Result<()> {
2001        let api_token = std::env::var("ENVIO_API_TOKEN")?;
2002        let client = reqwest::Client::builder()
2003            .no_gzip()
2004            .user_agent("hscr-test")
2005            .build()?;
2006
2007        let res = client
2008            .get("https://eth.hypersync.xyz/height")
2009            .bearer_auth(&api_token)
2010            .send()
2011            .await?;
2012
2013        assert_eq!(
2014            res.version(),
2015            reqwest::Version::HTTP_2,
2016            "expected HTTP/2 but got {:?}",
2017            res.version()
2018        );
2019        assert!(res.status().is_success());
2020        Ok(())
2021    }
2022
2023    #[tokio::test]
2024    #[ignore = "integration test with live hs server for height stream"]
2025    async fn test_stream_height_events() -> anyhow::Result<()> {
2026        let (tx, mut rx) = mpsc::channel(16);
2027        let handle = tokio::spawn(async move {
2028            let client = Client::builder()
2029                .url("https://monad-testnet.hypersync.xyz")
2030                .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
2031                .build()?;
2032            let mut es = client.get_es_stream().context("get es stream")?;
2033            Client::stream_height_events(&mut es, &tx).await
2034        });
2035
2036        let val = rx.recv().await;
2037        assert!(val.is_some());
2038        assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
2039        let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
2040            panic!("should have received height")
2041        };
2042        let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
2043            panic!("should have received height")
2044        };
2045        assert!(height2 > height);
2046        drop(rx);
2047
2048        let res = handle.await.expect("should have joined");
2049        let received_an_event =
2050            res.expect("should have ended the stream gracefully after dropping rx");
2051        assert!(received_an_event, "should have received an event");
2052
2053        Ok(())
2054    }
2055}