Skip to main content

hypersync_client/
lib.rs

1#![deny(missing_docs)]
2//! # HyperSync Client
3//!
4//! A high-performance Rust client for the HyperSync protocol, enabling efficient retrieval
5//! of blockchain data including blocks, transactions, logs, and traces.
6//!
7//! ## Features
8//!
9//! - **High-performance streaming**: Parallel data fetching with automatic retries
10//! - **Flexible querying**: Rich query builder API for precise data selection
11//! - **Multiple data formats**: Support for Arrow, Parquet, and simple Rust types
12//! - **Event decoding**: Automatic ABI decoding for smart contract events
13//! - **Real-time updates**: Live height streaming via Server-Sent Events
14//! - **Production ready**: Built-in rate limiting, retries, and error handling
15//!
16//! ## Quick Start
17//!
18//! ```no_run
19//! use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//!     // Create a client for Ethereum mainnet
24//!     let client = Client::builder()
25//!         .chain_id(1)
26//!         .api_token(std::env::var("ENVIO_API_TOKEN")?)
27//!         .build()?;
28//!
29//!     // Query ERC20 transfer events from USDC contract
30//!     let query = Query::new()
31//!         .from_block(19000000)
32//!         .to_block_excl(19001000)
33//!         .where_logs(
34//!             LogFilter::all()
35//!                 .and_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
36//!                 .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
37//!         )
38//!         .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
39//!
40//!     // Get all data in one response
41//!     let response = client.get(&query).await?;
42//!     println!("Retrieved {} blocks", response.data.blocks.len());
43//!
44//!     // Or stream data for large ranges
45//!     let mut receiver = client.stream(query, StreamConfig::default()).await?;
46//!     while let Some(response) = receiver.recv().await {
47//!         let response = response?;
48//!         println!("Streaming: got blocks up to {}", response.next_block);
49//!     }
50//!
51//!     Ok(())
52//! }
53//! ```
54//!
55//! ## Main Types
56//!
57//! - [`Client`] - Main client for interacting with HyperSync servers
58//! - [`net_types::Query`] - Query builder for specifying what data to fetch
59//! - [`StreamConfig`] - Configuration for streaming operations
60//! - [`QueryResponse`] - Response containing blocks, transactions, logs, and traces
61//! - [`ArrowResponse`] - Response in Apache Arrow format for high-performance processing
62//!
63//! ## Authentication
64//!
65//! You'll need a HyperSync API token to access the service. Get one from
66//! [https://envio.dev/app/api-tokens](https://envio.dev/app/api-tokens).
67//!
68//! ## Examples
69//!
70//! See the `examples/` directory for more detailed usage patterns including:
71//! - ERC20 token transfers
72//! - Wallet transaction history  
73//! - Event decoding and filtering
74//! - Real-time data streaming
75use std::time::Instant;
76use std::{sync::Arc, time::Duration};
77
78use anyhow::{anyhow, Context, Result};
79use futures::StreamExt;
80use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
81use reqwest::{Method, StatusCode};
82use reqwest_eventsource::retry::ExponentialBackoff;
83use reqwest_eventsource::{Event, EventSource};
84
85pub mod arrow_reader;
86mod column_mapping;
87mod config;
88mod decode;
89mod decode_call;
90mod from_arrow;
91mod parquet_out;
92mod parse_response;
93pub mod preset_query;
94mod rate_limit;
95mod rayon_async;
96pub mod simple_types;
97mod stream;
98mod types;
99mod util;
100
101pub use hypersync_format as format;
102pub use hypersync_net_types as net_types;
103pub use hypersync_schema as schema;
104
105use parse_response::parse_query_response;
106use tokio::sync::mpsc;
107use types::ResponseData;
108use url::Url;
109
110pub use column_mapping::{ColumnMapping, DataType};
111pub use config::HexOutput;
112pub use config::{ClientConfig, SerializationFormat, StreamConfig};
113pub use decode::Decoder;
114pub use decode_call::CallDecoder;
115pub use rate_limit::RateLimitInfo;
116pub use types::{
117    ArrowResponse, ArrowResponseData, EventResponse, QueryResponse, QueryResponseWithRateLimit,
118};
119
120use crate::parse_response::read_query_response;
121use crate::simple_types::InternalEventJoinStrategy;
122
123#[derive(Debug)]
124struct HttpClientWrapper {
125    /// Mutable state that needs to be refreshed periodically
126    inner: std::sync::Mutex<HttpClientWrapperInner>,
127    /// HyperSync server api token.
128    api_token: String,
129    /// Standard timeout for http requests.
130    timeout: Duration,
131    /// Pools are never idle since polling often happens on the client
132    /// so connections never timeout and dns lookups never happen again
133    /// this is problematic if the underlying ip address changes during
134    /// failovers on hypersync. Since reqwest doesn't implement this we
135    /// simply recreate the client every time
136    max_connection_age: Duration,
137    /// For refreshing the client
138    user_agent: String,
139}
140
141#[derive(Debug)]
142struct HttpClientWrapperInner {
143    /// Initialized reqwest instance for client url.
144    client: reqwest::Client,
145    /// Timestamp when the client was created
146    created_at: Instant,
147}
148
149impl HttpClientWrapper {
150    fn new(user_agent: String, api_token: String, timeout: Duration) -> Self {
151        let client = reqwest::Client::builder()
152            .no_gzip()
153            .user_agent(&user_agent)
154            .build()
155            .unwrap();
156
157        Self {
158            inner: std::sync::Mutex::new(HttpClientWrapperInner {
159                client,
160                created_at: Instant::now(),
161            }),
162            api_token,
163            timeout,
164            max_connection_age: Duration::from_secs(60),
165            user_agent,
166        }
167    }
168
169    fn get_client(&self) -> reqwest::Client {
170        let mut inner = self.inner.lock().expect("HttpClientWrapper mutex poisoned");
171
172        // Check if client needs refresh due to age
173        if inner.created_at.elapsed() > self.max_connection_age {
174            // Recreate client to force new DNS lookup for failover scenarios
175            inner.client = reqwest::Client::builder()
176                .no_gzip()
177                .user_agent(&self.user_agent)
178                .build()
179                .unwrap();
180            inner.created_at = Instant::now();
181        }
182
183        // Clone is cheap for reqwest::Client (it's Arc-wrapped internally)
184        inner.client.clone()
185    }
186
187    fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
188        let client = self.get_client();
189        client
190            .request(method, url)
191            .timeout(self.timeout)
192            .bearer_auth(&self.api_token)
193    }
194
195    fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
196        let client = self.get_client();
197        client.request(method, url).bearer_auth(&self.api_token)
198    }
199}
200
201/// Internal client state to handle http requests and retries.
202#[derive(Debug)]
203struct ClientInner {
204    http_client: HttpClientWrapper,
205    /// HyperSync server URL.
206    url: Url,
207    /// Number of retries to attempt before returning error.
208    max_num_retries: usize,
209    /// Milliseconds that would be used for retry backoff increasing.
210    retry_backoff_ms: u64,
211    /// Initial wait time for request backoff.
212    retry_base_ms: u64,
213    /// Ceiling time for request backoff.
214    retry_ceiling_ms: u64,
215    /// Query serialization format to use for HTTP requests.
216    serialization_format: SerializationFormat,
217    /// Most recently observed rate limit info from the server, paired with the
218    /// instant it was captured so elapsed time can be subtracted from `reset_secs`.
219    rate_limit_state: std::sync::Mutex<Option<(RateLimitInfo, Instant)>>,
220    /// Whether to proactively sleep when the rate limit is exhausted.
221    proactive_rate_limit_sleep: bool,
222}
223
224/// Client to handle http requests and retries.
225#[derive(Clone, Debug)]
226pub struct Client {
227    inner: Arc<ClientInner>,
228}
229
230impl Client {
231    /// Creates a new client with the given configuration.
232    ///
233    /// Configuration must include the `url` and `api_token` fields.
234    /// # Example
235    /// ```
236    /// use hypersync_client::{Client, ClientConfig};
237    ///
238    /// let config = ClientConfig {
239    ///     url: "https://eth.hypersync.xyz".to_string(),
240    ///     api_token: std::env::var("ENVIO_API_TOKEN")?,
241    ///     ..Default::default()
242    /// };
243    /// let client = Client::new(config)?;
244    /// # Ok::<(), anyhow::Error>(())
245    /// ```
246    ///
247    /// # Errors
248    /// This method fails if the config is invalid.
249    pub fn new(cfg: ClientConfig) -> Result<Self> {
250        // hscr stands for hypersync client rust
251        cfg.validate().context("invalid ClientConfig")?;
252        let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
253        Self::new_internal(cfg, user_agent)
254    }
255
256    /// Creates a new client builder.
257    ///
258    /// # Example
259    /// ```
260    /// use hypersync_client::Client;
261    ///
262    /// let client = Client::builder()
263    ///     .chain_id(1)
264    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
265    ///     .build()
266    ///     .unwrap();
267    /// ```
268    pub fn builder() -> ClientBuilder {
269        ClientBuilder::new()
270    }
271
272    #[doc(hidden)]
273    pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
274        // Creates a new client with the given configuration and custom user agent.
275        // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
276        Self::new_internal(cfg, user_agent.into())
277    }
278
279    /// Internal constructor that takes both config and user agent.
280    fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
281        let http_client = HttpClientWrapper::new(
282            user_agent,
283            cfg.api_token,
284            Duration::from_millis(cfg.http_req_timeout_millis),
285        );
286
287        let url = Url::parse(&cfg.url).context("url is malformed")?;
288
289        Ok(Self {
290            inner: Arc::new(ClientInner {
291                http_client,
292                url,
293                max_num_retries: cfg.max_num_retries,
294                retry_backoff_ms: cfg.retry_backoff_ms,
295                retry_base_ms: cfg.retry_base_ms,
296                retry_ceiling_ms: cfg.retry_ceiling_ms,
297                serialization_format: cfg.serialization_format,
298                rate_limit_state: std::sync::Mutex::new(None),
299                proactive_rate_limit_sleep: cfg.proactive_rate_limit_sleep,
300            }),
301        })
302    }
303
304    /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
305    /// query and stream configuration.
306    ///
307    /// ### Implementation
308    /// Runs multiple queries simultaneously based on config.concurrency.
309    ///
310    /// Each query runs until it reaches query.to, server height, any max_num_* query param,
311    /// or execution timed out by server.
312    ///
313    /// # ⚠️ Important Warning
314    ///
315    /// This method will continue executing until the query has run to completion from beginning
316    /// to the end of the block range defined in the query. For heavy queries with large block
317    /// ranges or high data volumes, consider:
318    ///
319    /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
320    /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
321    /// - Break large queries into smaller block ranges
322    ///
323    /// # Example
324    /// ```no_run
325    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
326    ///
327    /// # async fn example() -> anyhow::Result<()> {
328    /// let client = Client::builder()
329    ///     .chain_id(1)
330    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
331    ///     .build()?;
332    ///
333    /// // Query ERC20 transfer events
334    /// let query = Query::new()
335    ///     .from_block(19000000)
336    ///     .to_block_excl(19000010)
337    ///     .where_logs(
338    ///         LogFilter::all()
339    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
340    ///     )
341    ///     .select_log_fields([LogField::Address, LogField::Data]);
342    /// let response = client.collect(query, StreamConfig::default()).await?;
343    ///
344    /// println!("Collected {} events", response.data.logs.len());
345    /// # Ok(())
346    /// # }
347    /// ```
348    pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
349        check_simple_stream_params(&config)?;
350
351        let mut recv = stream::stream_arrow(self, query, config)
352            .await
353            .context("start stream")?;
354
355        let mut data = ResponseData::default();
356        let mut archive_height = None;
357        let mut next_block = 0;
358        let mut total_execution_time = 0;
359
360        while let Some(res) = recv.recv().await {
361            let res = res.context("get response")?;
362            let res: QueryResponse =
363                QueryResponse::try_from(&res).context("convert arrow response")?;
364
365            for batch in res.data.blocks {
366                data.blocks.push(batch);
367            }
368            for batch in res.data.transactions {
369                data.transactions.push(batch);
370            }
371            for batch in res.data.logs {
372                data.logs.push(batch);
373            }
374            for batch in res.data.traces {
375                data.traces.push(batch);
376            }
377
378            archive_height = res.archive_height;
379            next_block = res.next_block;
380            total_execution_time += res.total_execution_time
381        }
382
383        Ok(QueryResponse {
384            archive_height,
385            next_block,
386            total_execution_time,
387            data,
388            rollback_guard: None,
389        })
390    }
391
392    /// Retrieves events through a stream using the provided query and stream configuration.
393    ///
394    /// # ⚠️ Important Warning
395    ///
396    /// This method will continue executing until the query has run to completion from beginning
397    /// to the end of the block range defined in the query. For heavy queries with large block
398    /// ranges or high data volumes, consider:
399    ///
400    /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
401    /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
402    /// - Break large queries into smaller block ranges
403    ///
404    /// # Example
405    /// ```no_run
406    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
407    ///
408    /// # async fn example() -> anyhow::Result<()> {
409    /// let client = Client::builder()
410    ///     .chain_id(1)
411    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
412    ///     .build()?;
413    ///
414    /// // Query transactions to a specific address
415    /// let query = Query::new()
416    ///     .from_block(19000000)
417    ///     .to_block_excl(19000100)
418    ///     .where_transactions(
419    ///         TransactionFilter::all()
420    ///             .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
421    ///     )
422    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
423    /// let response = client.collect_events(query, StreamConfig::default()).await?;
424    ///
425    /// println!("Collected {} events", response.data.len());
426    /// # Ok(())
427    /// # }
428    /// ```
429    pub async fn collect_events(
430        &self,
431        mut query: Query,
432        config: StreamConfig,
433    ) -> Result<EventResponse> {
434        check_simple_stream_params(&config)?;
435
436        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
437        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
438
439        let mut recv = stream::stream_arrow(self, query, config)
440            .await
441            .context("start stream")?;
442
443        let mut data = Vec::new();
444        let mut archive_height = None;
445        let mut next_block = 0;
446        let mut total_execution_time = 0;
447
448        while let Some(res) = recv.recv().await {
449            let res = res.context("get response")?;
450            let res: QueryResponse =
451                QueryResponse::try_from(&res).context("convert arrow response")?;
452            let events = event_join_strategy.join_from_response_data(res.data);
453
454            data.extend(events);
455
456            archive_height = res.archive_height;
457            next_block = res.next_block;
458            total_execution_time += res.total_execution_time
459        }
460
461        Ok(EventResponse {
462            archive_height,
463            next_block,
464            total_execution_time,
465            data,
466            rollback_guard: None,
467        })
468    }
469
470    /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
471    /// the provided query and stream configuration.
472    ///
473    /// Returns data in Apache Arrow format for high-performance columnar processing.
474    /// Useful for analytics workloads or when working with Arrow-compatible tools.
475    ///
476    /// # ⚠️ Important Warning
477    ///
478    /// This method will continue executing until the query has run to completion from beginning
479    /// to the end of the block range defined in the query. For heavy queries with large block
480    /// ranges or high data volumes, consider:
481    ///
482    /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
483    /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
484    /// - Break large queries into smaller block ranges
485    ///
486    /// # Example
487    /// ```no_run
488    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
489    ///
490    /// # async fn example() -> anyhow::Result<()> {
491    /// let client = Client::builder()
492    ///     .chain_id(1)
493    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
494    ///     .build()?;
495    ///
496    /// // Get block data in Arrow format for analytics
497    /// let query = Query::new()
498    ///     .from_block(19000000)
499    ///     .to_block_excl(19000100)
500    ///     .include_all_blocks()
501    ///     .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
502    /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
503    ///
504    /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
505    /// # Ok(())
506    /// # }
507    /// ```
508    pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
509        let mut recv = stream::stream_arrow(self, query, config)
510            .await
511            .context("start stream")?;
512
513        let mut data = ArrowResponseData::default();
514        let mut archive_height = None;
515        let mut next_block = 0;
516        let mut total_execution_time = 0;
517
518        while let Some(res) = recv.recv().await {
519            let res = res.context("get response")?;
520
521            for batch in res.data.blocks {
522                data.blocks.push(batch);
523            }
524            for batch in res.data.transactions {
525                data.transactions.push(batch);
526            }
527            for batch in res.data.logs {
528                data.logs.push(batch);
529            }
530            for batch in res.data.traces {
531                data.traces.push(batch);
532            }
533            for batch in res.data.decoded_logs {
534                data.decoded_logs.push(batch);
535            }
536
537            archive_height = res.archive_height;
538            next_block = res.next_block;
539            total_execution_time += res.total_execution_time
540        }
541
542        Ok(ArrowResponse {
543            archive_height,
544            next_block,
545            total_execution_time,
546            data,
547            rollback_guard: None,
548        })
549    }
550
551    /// Writes parquet file getting data through a stream using the provided path, query,
552    /// and stream configuration.
553    ///
554    /// Streams data directly to a Parquet file for efficient storage and later analysis.
555    /// Perfect for data exports or ETL pipelines.
556    ///
557    /// # ⚠️ Important Warning
558    ///
559    /// This method will continue executing until the query has run to completion from beginning
560    /// to the end of the block range defined in the query. For heavy queries with large block
561    /// ranges or high data volumes, consider:
562    ///
563    /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
564    /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
565    /// - Break large queries into smaller block ranges
566    ///
567    /// # Example
568    /// ```no_run
569    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
570    ///
571    /// # async fn example() -> anyhow::Result<()> {
572    /// let client = Client::builder()
573    ///     .chain_id(1)
574    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
575    ///     .build()?;
576    ///
577    /// // Export all DEX trades to Parquet for analysis
578    /// let query = Query::new()
579    ///     .from_block(19000000)
580    ///     .to_block_excl(19010000)
581    ///     .where_logs(
582    ///         LogFilter::all()
583    ///             .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
584    ///     )
585    ///     .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
586    /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
587    ///
588    /// println!("Trade data exported to trades.parquet");
589    /// # Ok(())
590    /// # }
591    /// ```
592    pub async fn collect_parquet(
593        &self,
594        path: &str,
595        query: Query,
596        config: StreamConfig,
597    ) -> Result<()> {
598        parquet_out::collect_parquet(self, path, query, config).await
599    }
600
601    /// Internal implementation of getting chain_id from server
602    async fn get_chain_id_impl(&self) -> Result<u64> {
603        let mut url = self.inner.url.clone();
604        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
605        segments.push("chain_id");
606        std::mem::drop(segments);
607        let req = self.inner.http_client.request(Method::GET, url);
608
609        let res = req.send().await.context("execute http req")?;
610
611        let status = res.status();
612        if !status.is_success() {
613            return Err(anyhow!("http response status code {}", status));
614        }
615
616        let chain_id: ChainId = res.json().await.context("read response body json")?;
617
618        Ok(chain_id.chain_id)
619    }
620
621    /// Internal implementation of getting height from server
622    async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
623        let mut url = self.inner.url.clone();
624        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
625        segments.push("height");
626        std::mem::drop(segments);
627        let mut req = self.inner.http_client.request(Method::GET, url);
628        if let Some(http_timeout_override) = http_timeout_override {
629            req = req.timeout(http_timeout_override);
630        }
631
632        let res = req.send().await.context("execute http req")?;
633
634        let status = res.status();
635        if !status.is_success() {
636            return Err(anyhow!("http response status code {}", status));
637        }
638
639        let height: ArchiveHeight = res.json().await.context("read response body json")?;
640
641        Ok(height.height.unwrap_or(0))
642    }
643
644    /// Get the chain_id from the server with retries.
645    ///
646    /// # Example
647    /// ```no_run
648    /// use hypersync_client::Client;
649    ///
650    /// # async fn example() -> anyhow::Result<()> {
651    /// let client = Client::builder()
652    ///     .chain_id(1)
653    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
654    ///     .build()?;
655    ///
656    /// let chain_id = client.get_chain_id().await?;
657    /// println!("Connected to chain ID: {}", chain_id);
658    /// # Ok(())
659    /// # }
660    /// ```
661    pub async fn get_chain_id(&self) -> Result<u64> {
662        let mut base = self.inner.retry_base_ms;
663
664        let mut err = anyhow!("");
665
666        for _ in 0..self.inner.max_num_retries + 1 {
667            match self.get_chain_id_impl().await {
668                Ok(res) => return Ok(res),
669                Err(e) => {
670                    log::error!(
671                        "failed to get chain_id from server, retrying... The error was: {e:?}"
672                    );
673                    err = err.context(format!("{e:?}"));
674                }
675            }
676
677            let base_ms = Duration::from_millis(base);
678            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
679                rand::random(),
680                self.inner.retry_backoff_ms,
681            ));
682
683            tokio::time::sleep(base_ms + jitter).await;
684
685            base = std::cmp::min(
686                base + self.inner.retry_backoff_ms,
687                self.inner.retry_ceiling_ms,
688            );
689        }
690
691        Err(err)
692    }
693
694    /// Get the height of from server with retries.
695    ///
696    /// # Example
697    /// ```no_run
698    /// use hypersync_client::Client;
699    ///
700    /// # async fn example() -> anyhow::Result<()> {
701    /// let client = Client::builder()
702    ///     .chain_id(1)
703    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
704    ///     .build()?;
705    ///
706    /// let height = client.get_height().await?;
707    /// println!("Current block height: {}", height);
708    /// # Ok(())
709    /// # }
710    /// ```
711    pub async fn get_height(&self) -> Result<u64> {
712        let mut base = self.inner.retry_base_ms;
713
714        let mut err = anyhow!("");
715
716        for _ in 0..self.inner.max_num_retries + 1 {
717            match self.get_height_impl(None).await {
718                Ok(res) => return Ok(res),
719                Err(e) => {
720                    log::error!(
721                        "failed to get height from server, retrying... The error was: {e:?}"
722                    );
723                    err = err.context(format!("{e:?}"));
724                }
725            }
726
727            let base_ms = Duration::from_millis(base);
728            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
729                rand::random(),
730                self.inner.retry_backoff_ms,
731            ));
732
733            tokio::time::sleep(base_ms + jitter).await;
734
735            base = std::cmp::min(
736                base + self.inner.retry_backoff_ms,
737                self.inner.retry_ceiling_ms,
738            );
739        }
740
741        Err(err)
742    }
743
744    /// Get the height of the Client instance for health checks.
745    ///
746    /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
747    ///
748    /// # Example
749    /// ```no_run
750    /// use hypersync_client::Client;
751    /// use std::time::Duration;
752    ///
753    /// # async fn example() -> anyhow::Result<()> {
754    /// let client = Client::builder()
755    ///     .chain_id(1)
756    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
757    ///     .build()?;
758    ///
759    /// // Quick health check with 5 second timeout
760    /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
761    /// println!("Server is healthy at block: {}", height);
762    /// # Ok(())
763    /// # }
764    /// ```
765    pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
766        self.get_height_impl(http_req_timeout).await
767    }
768
769    /// Executes query with retries and returns the response.
770    ///
771    /// # Example
772    /// ```no_run
773    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
774    ///
775    /// # async fn example() -> anyhow::Result<()> {
776    /// let client = Client::builder()
777    ///     .chain_id(1)
778    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
779    ///     .build()?;
780    ///
781    /// // Query all blocks from a specific range
782    /// let query = Query::new()
783    ///     .from_block(19000000)
784    ///     .to_block_excl(19000010)
785    ///     .include_all_blocks()
786    ///     .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
787    /// let response = client.get(&query).await?;
788    ///
789    /// println!("Retrieved {} blocks", response.data.blocks.len());
790    /// # Ok(())
791    /// # }
792    /// ```
793    pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
794        let arrow_response = self.get_arrow(query).await.context("get data")?;
795        let converted =
796            QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
797        Ok(converted)
798    }
799
800    /// Add block, transaction and log fields selection to the query, executes it with retries
801    /// and returns the response.
802    ///
803    /// This method automatically joins blocks, transactions, and logs into unified events,
804    /// making it easier to work with related blockchain data.
805    ///
806    /// # Example
807    /// ```no_run
808    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
809    ///
810    /// # async fn example() -> anyhow::Result<()> {
811    /// let client = Client::builder()
812    ///     .chain_id(1)
813    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
814    ///     .build()?;
815    ///
816    /// // Query ERC20 transfers with transaction context
817    /// let query = Query::new()
818    ///     .from_block(19000000)
819    ///     .to_block_excl(19000010)
820    ///     .where_logs(
821    ///         LogFilter::all()
822    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
823    ///     )
824    ///     .select_log_fields([LogField::Address, LogField::Data])
825    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
826    /// let response = client.get_events(query).await?;
827    ///
828    /// println!("Retrieved {} joined events", response.data.len());
829    /// # Ok(())
830    /// # }
831    /// ```
832    pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
833        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
834        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
835        let arrow_response = self.get_arrow(&query).await.context("get data")?;
836        EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
837    }
838
839    /// Executes query once and returns the result using JSON serialization.
840    async fn get_arrow_impl_json(
841        &self,
842        query: &Query,
843    ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
844        let mut url = self.inner.url.clone();
845        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
846        segments.push("query");
847        segments.push("arrow-ipc");
848        std::mem::drop(segments);
849        let req = self.inner.http_client.request(Method::POST, url);
850
851        let res = req.json(&query).send().await.context("execute http req")?;
852
853        let status = res.status();
854        let rate_limit = RateLimitInfo::from_response(&res);
855
856        if status == StatusCode::PAYLOAD_TOO_LARGE {
857            return Err(HyperSyncResponseError::PayloadTooLarge);
858        }
859        if status == StatusCode::TOO_MANY_REQUESTS {
860            return Err(HyperSyncResponseError::RateLimited { rate_limit });
861        }
862        if !status.is_success() {
863            let text = res.text().await.context("read text to see error")?;
864
865            return Err(HyperSyncResponseError::Other(anyhow!(
866                "http response status code {}, err body: {}",
867                status,
868                text
869            )));
870        }
871
872        let bytes = res.bytes().await.context("read response body bytes")?;
873
874        let res = tokio::task::block_in_place(|| {
875            parse_query_response(&bytes).context("parse query response")
876        })?;
877
878        Ok(ArrowImplResponse {
879            response: res,
880            response_bytes: bytes.len().try_into().unwrap(),
881            rate_limit,
882        })
883    }
884
885    fn should_cache_queries(&self) -> bool {
886        matches!(
887            self.inner.serialization_format,
888            SerializationFormat::CapnProto {
889                should_cache_queries: true
890            }
891        )
892    }
893
894    /// Executes query once and returns the result using Cap'n Proto serialization.
895    async fn get_arrow_impl_capnp(
896        &self,
897        query: &Query,
898    ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
899        let mut url = self.inner.url.clone();
900        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
901        segments.push("query");
902        segments.push("arrow-ipc");
903        segments.push("capnp");
904        std::mem::drop(segments);
905
906        let should_cache = self.should_cache_queries();
907
908        if should_cache {
909            let query_with_id = {
910                let mut message = capnp::message::Builder::new_default();
911                let mut request_builder =
912                    message.init_root::<hypersync_net_types_capnp::request::Builder>();
913
914                request_builder
915                    .build_query_id_from_query(query)
916                    .context("build query id")?;
917                let mut query_with_id = Vec::new();
918                capnp::serialize_packed::write_message(&mut query_with_id, &message)
919                    .context("write capnp message")?;
920                query_with_id
921            };
922
923            let req = self.inner.http_client.request(Method::POST, url.clone());
924
925            let res = req
926                .body(query_with_id)
927                .send()
928                .await
929                .context("execute http req")?;
930
931            let status = res.status();
932            let rate_limit = RateLimitInfo::from_response(&res);
933
934            if status == StatusCode::PAYLOAD_TOO_LARGE {
935                return Err(HyperSyncResponseError::PayloadTooLarge);
936            }
937            if status == StatusCode::TOO_MANY_REQUESTS {
938                return Err(HyperSyncResponseError::RateLimited { rate_limit });
939            }
940            if status.is_success() {
941                let bytes = res.bytes().await.context("read response body bytes")?;
942
943                let mut opts = capnp::message::ReaderOptions::new();
944                opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
945                let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
946                    .context("create message reader")?;
947                let query_response = message_reader
948                    .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
949                    .context("get cached_query_response root")?;
950                match query_response.get_either().which().context("get either")? {
951                hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
952                    query_response,
953                ) => {
954                    let res = tokio::task::block_in_place(|| {
955                        let res = query_response?;
956                        read_query_response(&res).context("parse query response cached")
957                    })?;
958                    return Ok(ArrowImplResponse {
959                        response: res,
960                        response_bytes: bytes.len().try_into().unwrap(),
961                        rate_limit,
962                    });
963                }
964                hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
965                    log::trace!("query was not cached, retrying with full query");
966                }
967            }
968            } else {
969                let text = res.text().await.context("read text to see error")?;
970                log::warn!(
971                    "Failed cache query, will retry full query. {}, err body: {}",
972                    status,
973                    text
974                );
975            }
976        };
977
978        let full_query_bytes = {
979            let mut message = capnp::message::Builder::new_default();
980            let mut query_builder =
981                message.init_root::<hypersync_net_types_capnp::request::Builder>();
982
983            query_builder
984                .build_full_query_from_query(query, should_cache)
985                .context("build full query")?;
986            let mut bytes = Vec::new();
987            capnp::serialize_packed::write_message(&mut bytes, &message)
988                .context("write full query capnp message")?;
989            bytes
990        };
991
992        let req = self.inner.http_client.request(Method::POST, url);
993
994        let res = req
995            .body(full_query_bytes)
996            .send()
997            .await
998            .context("execute http req")?;
999
1000        let status = res.status();
1001        let rate_limit = RateLimitInfo::from_response(&res);
1002
1003        if status == StatusCode::PAYLOAD_TOO_LARGE {
1004            return Err(HyperSyncResponseError::PayloadTooLarge);
1005        }
1006        if status == StatusCode::TOO_MANY_REQUESTS {
1007            return Err(HyperSyncResponseError::RateLimited { rate_limit });
1008        }
1009        if !status.is_success() {
1010            let text = res.text().await.context("read text to see error")?;
1011
1012            return Err(HyperSyncResponseError::Other(anyhow!(
1013                "http response status code {}, err body: {}",
1014                status,
1015                text
1016            )));
1017        }
1018
1019        let bytes = res.bytes().await.context("read response body bytes")?;
1020
1021        let res = tokio::task::block_in_place(|| {
1022            parse_query_response(&bytes).context("parse query response")
1023        })?;
1024
1025        Ok(ArrowImplResponse {
1026            response: res,
1027            response_bytes: bytes.len().try_into().unwrap(),
1028            rate_limit,
1029        })
1030    }
1031
1032    /// Executes query once and returns the result, handling payload-too-large by halving block range.
1033    async fn get_arrow_impl(
1034        &self,
1035        query: &Query,
1036    ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
1037        let mut query = query.clone();
1038        loop {
1039            let res = match self.inner.serialization_format {
1040                SerializationFormat::Json => self.get_arrow_impl_json(&query).await,
1041                SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(&query).await,
1042            };
1043            match res {
1044                Ok(res) => return Ok(res),
1045                Err(
1046                    e @ (HyperSyncResponseError::Other(_)
1047                    | HyperSyncResponseError::RateLimited { .. }),
1048                ) => return Err(e),
1049                Err(HyperSyncResponseError::PayloadTooLarge) => {
1050                    let block_range = if let Some(to_block) = query.to_block {
1051                        let current = to_block - query.from_block;
1052                        if current < 2 {
1053                            return Err(HyperSyncResponseError::Other(anyhow!(
1054                                "Payload is too large and query is using the minimum block range."
1055                            )));
1056                        }
1057                        // Half the current block range
1058                        current / 2
1059                    } else {
1060                        200
1061                    };
1062                    let to_block = query.from_block + block_range;
1063                    query.to_block = Some(to_block);
1064
1065                    log::trace!(
1066                        "Payload is too large, retrying with block range from: {} to: {}",
1067                        query.from_block,
1068                        to_block
1069                    );
1070                }
1071            }
1072        }
1073    }
1074
1075    /// Executes query with retries and returns the response in Arrow format.
1076    pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
1077        self.get_arrow_with_size(query)
1078            .await
1079            .map(|res| res.response)
1080    }
1081
1082    /// Internal implementation for get_arrow.
1083    async fn get_arrow_with_size(&self, query: &Query) -> Result<ArrowImplResponse> {
1084        let mut base = self.inner.retry_base_ms;
1085
1086        let mut err = anyhow!("");
1087
1088        // Proactive throttling: if we know we're rate limited, wait before sending
1089        if self.inner.proactive_rate_limit_sleep {
1090            self.wait_for_rate_limit().await;
1091        }
1092
1093        for _ in 0..self.inner.max_num_retries + 1 {
1094            match self.get_arrow_impl(query).await {
1095                Ok(res) => {
1096                    self.update_rate_limit_state(&res.rate_limit);
1097                    return Ok(res);
1098                }
1099                Err(HyperSyncResponseError::RateLimited { rate_limit }) => {
1100                    self.update_rate_limit_state(&rate_limit);
1101                    let wait_secs = rate_limit.suggested_wait_secs().unwrap_or(1) + 1;
1102                    log::warn!(
1103                        "rate limited by server ({rate_limit}), waiting {wait_secs}s before retry"
1104                    );
1105                    err = err.context(format!("rate limited by server ({rate_limit})"));
1106                    tokio::time::sleep(Duration::from_secs(wait_secs)).await;
1107                    continue;
1108                }
1109                Err(HyperSyncResponseError::Other(e)) => {
1110                    log::error!(
1111                        "failed to get arrow data from server, retrying... The error was: {e:?}"
1112                    );
1113                    err = err.context(format!("{e:?}"));
1114                }
1115                Err(HyperSyncResponseError::PayloadTooLarge) => {
1116                    // This shouldn't happen since get_arrow_impl handles it, but just in case
1117                    log::error!("unexpected PayloadTooLarge from get_arrow_impl, retrying...");
1118                    err = err.context("unexpected PayloadTooLarge");
1119                }
1120            }
1121
1122            let base_ms = Duration::from_millis(base);
1123            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
1124                rand::random(),
1125                self.inner.retry_backoff_ms,
1126            ));
1127
1128            tokio::time::sleep(base_ms + jitter).await;
1129
1130            base = std::cmp::min(
1131                base + self.inner.retry_backoff_ms,
1132                self.inner.retry_ceiling_ms,
1133            );
1134        }
1135
1136        Err(err)
1137    }
1138
1139    /// Spawns task to execute query and return data via a channel.
1140    ///
1141    /// # Example
1142    /// ```no_run
1143    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
1144    ///
1145    /// # async fn example() -> anyhow::Result<()> {
1146    /// let client = Client::builder()
1147    ///     .chain_id(1)
1148    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1149    ///     .build()?;
1150    ///
1151    /// // Stream all ERC20 transfer events
1152    /// let query = Query::new()
1153    ///     .from_block(19000000)
1154    ///     .where_logs(
1155    ///         LogFilter::all()
1156    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1157    ///     )
1158    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
1159    /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
1160    ///
1161    /// while let Some(response) = receiver.recv().await {
1162    ///     let response = response?;
1163    ///     println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
1164    /// }
1165    /// # Ok(())
1166    /// # }
1167    /// ```
1168    pub async fn stream(
1169        &self,
1170        query: Query,
1171        config: StreamConfig,
1172    ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1173        check_simple_stream_params(&config)?;
1174
1175        let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1176            mpsc::channel(config.concurrency);
1177
1178        let mut inner_rx = self
1179            .stream_arrow(query, config)
1180            .await
1181            .context("start inner stream")?;
1182
1183        tokio::spawn(async move {
1184            while let Some(resp) = inner_rx.recv().await {
1185                let msg = resp
1186                    .context("inner receiver")
1187                    .and_then(|r| QueryResponse::try_from(&r));
1188                let is_err = msg.is_err();
1189                if tx.send(msg).await.is_err() || is_err {
1190                    return;
1191                }
1192            }
1193        });
1194
1195        Ok(rx)
1196    }
1197
1198    /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1199    /// returning data via a channel.
1200    ///
1201    /// This method automatically joins blocks, transactions, and logs into unified events,
1202    /// then streams them via a channel for real-time processing.
1203    ///
1204    /// # Example
1205    /// ```no_run
1206    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1207    ///
1208    /// # async fn example() -> anyhow::Result<()> {
1209    /// let client = Client::builder()
1210    ///     .chain_id(1)
1211    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1212    ///     .build()?;
1213    ///
1214    /// // Stream NFT transfer events with transaction context
1215    /// let query = Query::new()
1216    ///     .from_block(19000000)
1217    ///     .where_logs(
1218    ///         LogFilter::all()
1219    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1220    ///     )
1221    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1222    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1223    /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1224    ///
1225    /// while let Some(response) = receiver.recv().await {
1226    ///     let response = response?;
1227    ///     println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1228    /// }
1229    /// # Ok(())
1230    /// # }
1231    /// ```
1232    pub async fn stream_events(
1233        &self,
1234        mut query: Query,
1235        config: StreamConfig,
1236    ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1237        check_simple_stream_params(&config)?;
1238
1239        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1240
1241        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1242
1243        let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1244            mpsc::channel(config.concurrency);
1245
1246        let mut inner_rx = self
1247            .stream_arrow(query, config)
1248            .await
1249            .context("start inner stream")?;
1250
1251        tokio::spawn(async move {
1252            while let Some(resp) = inner_rx.recv().await {
1253                let msg = resp
1254                    .context("inner receiver")
1255                    .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1256                let is_err = msg.is_err();
1257                if tx.send(msg).await.is_err() || is_err {
1258                    return;
1259                }
1260            }
1261        });
1262
1263        Ok(rx)
1264    }
1265
1266    /// Spawns task to execute query and return data via a channel in Arrow format.
1267    ///
1268    /// Returns raw Apache Arrow data via a channel for high-performance processing.
1269    /// Ideal for applications that need to work directly with columnar data.
1270    ///
1271    /// # Example
1272    /// ```no_run
1273    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1274    ///
1275    /// # async fn example() -> anyhow::Result<()> {
1276    /// let client = Client::builder()
1277    ///     .chain_id(1)
1278    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1279    ///     .build()?;
1280    ///
1281    /// // Stream transaction data in Arrow format for analytics
1282    /// let query = Query::new()
1283    ///     .from_block(19000000)
1284    ///     .to_block_excl(19000100)
1285    ///     .where_transactions(
1286    ///         TransactionFilter::all()
1287    ///             .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1288    ///     )
1289    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1290    /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1291    ///
1292    /// while let Some(response) = receiver.recv().await {
1293    ///     let response = response?;
1294    ///     println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1295    /// }
1296    /// # Ok(())
1297    /// # }
1298    /// ```
1299    pub async fn stream_arrow(
1300        &self,
1301        query: Query,
1302        config: StreamConfig,
1303    ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1304        stream::stream_arrow(self, query, config).await
1305    }
1306
1307    /// Executes query with retries and returns the response in Arrow format along with
1308    /// rate limit information from the server.
1309    ///
1310    /// This is useful for consumers that want to inspect rate limit headers and implement
1311    /// their own rate limiting logic in external systems.
1312    pub async fn get_arrow_with_rate_limit(
1313        &self,
1314        query: &Query,
1315    ) -> Result<QueryResponseWithRateLimit<ArrowResponseData>> {
1316        let result = self.get_arrow_with_size(query).await?;
1317        Ok(QueryResponseWithRateLimit {
1318            response: result.response,
1319            rate_limit: result.rate_limit,
1320        })
1321    }
1322
1323    /// Executes query with retries and returns the response along with
1324    /// rate limit information from the server.
1325    ///
1326    /// This is useful for consumers that want to inspect rate limit headers and implement
1327    /// their own rate limiting logic in external systems.
1328    pub async fn get_with_rate_limit(
1329        &self,
1330        query: &Query,
1331    ) -> Result<QueryResponseWithRateLimit<ResponseData>> {
1332        let result = self.get_arrow_with_rate_limit(query).await?;
1333        let converted =
1334            QueryResponse::try_from(&result.response).context("convert arrow response")?;
1335        Ok(QueryResponseWithRateLimit {
1336            response: converted,
1337            rate_limit: result.rate_limit,
1338        })
1339    }
1340
1341    /// Returns the most recently observed rate limit information, if any.
1342    ///
1343    /// Updated after every request (including inside streams). Returns `None`
1344    /// if no requests have been made yet or the server hasn't returned rate limit headers.
1345    pub fn rate_limit_info(&self) -> Option<RateLimitInfo> {
1346        self.inner
1347            .rate_limit_state
1348            .lock()
1349            .expect("rate_limit_state mutex poisoned")
1350            .as_ref()
1351            .map(|(info, _captured_at)| info.clone())
1352    }
1353
1354    /// Waits until the current rate limit window resets, if the client is rate limited.
1355    ///
1356    /// Returns immediately if:
1357    /// - No rate limit information has been observed yet
1358    /// - There is remaining quota in the current window
1359    ///
1360    /// This method is useful for consumers who want to explicitly wait before making
1361    /// requests, for example when coordinating rate limits across multiple systems.
1362    pub async fn wait_for_rate_limit(&self) {
1363        let wait_info = {
1364            let state = self
1365                .inner
1366                .rate_limit_state
1367                .lock()
1368                .expect("rate_limit_state mutex poisoned");
1369            match state.as_ref() {
1370                Some((info, captured_at)) if info.is_rate_limited() => {
1371                    info.suggested_wait_secs().map(|secs| {
1372                        let elapsed = captured_at.elapsed().as_secs();
1373                        let remaining_wait = secs.saturating_sub(elapsed);
1374                        (remaining_wait, info.clone())
1375                    })
1376                }
1377                _ => None,
1378            }
1379        };
1380        if let Some((secs, info)) = wait_info {
1381            if secs > 0 {
1382                log::warn!(
1383                    "rate limit exhausted ({info}), proactively waiting {secs}s for window reset"
1384                );
1385                tokio::time::sleep(Duration::from_secs(secs)).await;
1386            }
1387        }
1388    }
1389
1390    /// Updates the internally tracked rate limit state with the current timestamp.
1391    fn update_rate_limit_state(&self, rate_limit: &RateLimitInfo) {
1392        // Only update if the response actually contained rate limit headers
1393        if rate_limit.limit.is_some()
1394            || rate_limit.remaining.is_some()
1395            || rate_limit.reset_secs.is_some()
1396        {
1397            let mut state = self
1398                .inner
1399                .rate_limit_state
1400                .lock()
1401                .expect("rate_limit_state mutex poisoned");
1402            *state = Some((rate_limit.clone(), Instant::now()));
1403        }
1404    }
1405
1406    /// Getter for url field.
1407    ///
1408    /// # Example
1409    /// ```
1410    /// use hypersync_client::Client;
1411    ///
1412    /// let client = Client::builder()
1413    ///     .chain_id(1)
1414    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1415    ///     .build()
1416    ///     .unwrap();
1417    ///
1418    /// println!("Client URL: {}", client.url());
1419    /// ```
1420    pub fn url(&self) -> &Url {
1421        &self.inner.url
1422    }
1423}
1424
1425/// Builder for creating a hypersync client with configuration options.
1426///
1427/// This builder provides a fluent API for configuring client settings like URL,
1428/// authentication, timeouts, and retry behavior.
1429///
1430/// # Example
1431/// ```
1432/// use hypersync_client::{Client, SerializationFormat};
1433///
1434/// let client = Client::builder()
1435///     .chain_id(1)
1436///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1437///     .http_req_timeout_millis(30000)
1438///     .max_num_retries(3)
1439///     .build()
1440///     .unwrap();
1441/// ```
1442#[derive(Debug, Clone, Default)]
1443pub struct ClientBuilder(ClientConfig);
1444
1445impl ClientBuilder {
1446    /// Creates a new ClientBuilder with default configuration.
1447    pub fn new() -> Self {
1448        Self::default()
1449    }
1450
1451    /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1452    ///
1453    /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1454    ///
1455    /// # Arguments
1456    /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1457    ///
1458    /// # Example
1459    /// ```
1460    /// use hypersync_client::Client;
1461    ///
1462    /// let client = Client::builder()
1463    ///     .chain_id(1) // Ethereum mainnet
1464    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1465    ///     .build()
1466    ///     .unwrap();
1467    /// ```
1468    pub fn chain_id(mut self, chain_id: u64) -> Self {
1469        self.0.url = format!("https://{chain_id}.hypersync.xyz");
1470        self
1471    }
1472
1473    /// Sets a custom URL for the hypersync server.
1474    ///
1475    /// Use this method when you need to connect to a custom hypersync endpoint
1476    /// instead of the default public endpoints.
1477    ///
1478    /// # Arguments
1479    /// * `url` - The hypersync server URL
1480    ///
1481    /// # Example
1482    /// ```
1483    /// use hypersync_client::Client;
1484    ///
1485    /// let client = Client::builder()
1486    ///     .url("https://my-custom-hypersync.example.com")
1487    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1488    ///     .build()
1489    ///     .unwrap();
1490    /// ```
1491    pub fn url<S: ToString>(mut self, url: S) -> Self {
1492        self.0.url = url.to_string();
1493        self
1494    }
1495
1496    /// Sets the api token for authentication.
1497    ///
1498    /// Required for accessing authenticated hypersync endpoints.
1499    ///
1500    /// # Arguments
1501    /// * `api_token` - The authentication token
1502    ///
1503    /// # Example
1504    /// ```
1505    /// use hypersync_client::Client;
1506    ///
1507    /// let client = Client::builder()
1508    ///     .chain_id(1)
1509    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1510    ///     .build()
1511    ///     .unwrap();
1512    /// ```
1513    pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1514        self.0.api_token = api_token.to_string();
1515        self
1516    }
1517
1518    /// Sets the HTTP request timeout in milliseconds.
1519    ///
1520    /// # Arguments
1521    /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1522    ///
1523    /// # Example
1524    /// ```
1525    /// use hypersync_client::Client;
1526    ///
1527    /// let client = Client::builder()
1528    ///     .chain_id(1)
1529    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1530    ///     .http_req_timeout_millis(60000) // 60 second timeout
1531    ///     .build()
1532    ///     .unwrap();
1533    /// ```
1534    pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1535        self.0.http_req_timeout_millis = http_req_timeout_millis;
1536        self
1537    }
1538
1539    /// Sets the maximum number of retries for failed requests.
1540    ///
1541    /// # Arguments
1542    /// * `max_num_retries` - Maximum number of retries (default: 10)
1543    ///
1544    /// # Example
1545    /// ```
1546    /// use hypersync_client::Client;
1547    ///
1548    /// let client = Client::builder()
1549    ///     .chain_id(1)
1550    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1551    ///     .max_num_retries(5)
1552    ///     .build()
1553    ///     .unwrap();
1554    /// ```
1555    pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1556        self.0.max_num_retries = max_num_retries;
1557        self
1558    }
1559
1560    /// Sets the backoff increment for retry delays.
1561    ///
1562    /// This value is added to the base delay on each retry attempt.
1563    ///
1564    /// # Arguments
1565    /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1566    ///
1567    /// # Example
1568    /// ```
1569    /// use hypersync_client::Client;
1570    ///
1571    /// let client = Client::builder()
1572    ///     .chain_id(1)
1573    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1574    ///     .retry_backoff_ms(1000) // 1 second backoff increment
1575    ///     .build()
1576    ///     .unwrap();
1577    /// ```
1578    pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1579        self.0.retry_backoff_ms = retry_backoff_ms;
1580        self
1581    }
1582
1583    /// Sets the initial delay for retry attempts.
1584    ///
1585    /// # Arguments
1586    /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1587    ///
1588    /// # Example
1589    /// ```
1590    /// use hypersync_client::Client;
1591    ///
1592    /// let client = Client::builder()
1593    ///     .chain_id(1)
1594    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1595    ///     .retry_base_ms(1000) // Start with 1 second delay
1596    ///     .build()
1597    ///     .unwrap();
1598    /// ```
1599    pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1600        self.0.retry_base_ms = retry_base_ms;
1601        self
1602    }
1603
1604    /// Sets the maximum delay for retry attempts.
1605    ///
1606    /// The retry delay will not exceed this value, even with backoff increments.
1607    ///
1608    /// # Arguments
1609    /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1610    ///
1611    /// # Example
1612    /// ```
1613    /// use hypersync_client::Client;
1614    ///
1615    /// let client = Client::builder()
1616    ///     .chain_id(1)
1617    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1618    ///     .retry_ceiling_ms(30000) // Cap at 30 seconds
1619    ///     .build()
1620    ///     .unwrap();
1621    /// ```
1622    pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1623        self.0.retry_ceiling_ms = retry_ceiling_ms;
1624        self
1625    }
1626
1627    /// Sets whether to proactively sleep when rate limited.
1628    ///
1629    /// When enabled (default), the client will wait for the rate limit window to reset
1630    /// before sending requests it knows will be rejected. Set to `false` to disable
1631    /// this behavior and handle rate limits yourself.
1632    ///
1633    /// # Arguments
1634    /// * `proactive_rate_limit_sleep` - Whether to enable proactive rate limit sleeping (default: true)
1635    pub fn proactive_rate_limit_sleep(mut self, proactive_rate_limit_sleep: bool) -> Self {
1636        self.0.proactive_rate_limit_sleep = proactive_rate_limit_sleep;
1637        self
1638    }
1639
1640    /// Sets the serialization format for client-server communication.
1641    ///
1642    /// # Arguments
1643    /// * `serialization_format` - The format to use (JSON or CapnProto)
1644    ///
1645    /// # Example
1646    /// ```
1647    /// use hypersync_client::{Client, SerializationFormat};
1648    ///
1649    /// let client = Client::builder()
1650    ///     .chain_id(1)
1651    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1652    ///     .serialization_format(SerializationFormat::Json)
1653    ///     .build()
1654    ///     .unwrap();
1655    /// ```
1656    pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1657        self.0.serialization_format = serialization_format;
1658        self
1659    }
1660
1661    /// Builds the client with the configured settings.
1662    ///
1663    /// # Returns
1664    /// * `Result<Client>` - The configured client or an error if configuration is invalid
1665    ///
1666    /// # Errors
1667    /// Returns an error if:
1668    /// * The URL is malformed
1669    /// * Required configuration is missing
1670    ///
1671    /// # Example
1672    /// ```
1673    /// use hypersync_client::Client;
1674    ///
1675    /// let client = Client::builder()
1676    ///     .chain_id(1)
1677    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1678    ///     .build()
1679    ///     .unwrap();
1680    /// ```
1681    pub fn build(self) -> Result<Client> {
1682        if self.0.url.is_empty() {
1683            anyhow::bail!(
1684                "endpoint needs to be set, try using builder.chain_id(1) or\
1685                builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1686            )
1687        }
1688        Client::new(self.0)
1689    }
1690}
1691
1692/// 200ms
1693const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1694const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1695/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1696/// so we timeout after 15s (3x the ping interval).
1697const READ_TIMEOUT: Duration = Duration::from_secs(15);
1698
1699/// Events emitted by the height stream.
1700#[derive(Debug, Clone, PartialEq, Eq)]
1701pub enum HeightStreamEvent {
1702    /// Successfully connected or reconnected to the SSE stream.
1703    Connected,
1704    /// Received a height update from the server.
1705    Height(u64),
1706    /// Connection lost, will attempt to reconnect after the specified delay.
1707    Reconnecting {
1708        /// Duration to wait before attempting reconnection.
1709        delay: Duration,
1710        /// The error that caused the reconnection.
1711        error_msg: String,
1712    },
1713}
1714
1715enum InternalStreamEvent {
1716    Publish(HeightStreamEvent),
1717    Ping,
1718    Unknown(String),
1719}
1720
1721impl Client {
1722    fn get_es_stream(&self) -> Result<EventSource> {
1723        // Build the GET /height/sse request
1724        let mut url = self.inner.url.clone();
1725        url.path_segments_mut()
1726            .ok()
1727            .context("invalid base URL")?
1728            .extend(&["height", "sse"]);
1729
1730        let req = self
1731            .inner
1732            .http_client
1733            // Don't set timeout for SSE stream
1734            .request_no_timeout(Method::GET, url);
1735
1736        // Configure exponential backoff for library-level retries
1737        let retry_policy = ExponentialBackoff::new(
1738            INITIAL_RECONNECT_DELAY,
1739            2.0,
1740            Some(MAX_RECONNECT_DELAY),
1741            None, // unlimited retries
1742        );
1743
1744        // Turn the request into an EventSource stream with retries
1745        let mut es = reqwest_eventsource::EventSource::new(req)
1746            .context("unexpected error creating EventSource")?;
1747        es.set_retry_policy(Box::new(retry_policy));
1748        Ok(es)
1749    }
1750
1751    async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1752        let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1753            .await
1754            .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1755        else {
1756            return Ok(None);
1757        };
1758
1759        let e = match res.context("failed response")? {
1760            Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1761            Event::Message(event) => match event.event.as_str() {
1762                "height" => {
1763                    let height = event
1764                        .data
1765                        .trim()
1766                        .parse::<u64>()
1767                        .context("parsing height from event data")?;
1768                    InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1769                }
1770                "ping" => InternalStreamEvent::Ping,
1771                _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1772            },
1773        };
1774
1775        Ok(Some(e))
1776    }
1777
1778    async fn stream_height_events(
1779        es: &mut EventSource,
1780        tx: &mpsc::Sender<HeightStreamEvent>,
1781    ) -> Result<bool> {
1782        let mut received_an_event = false;
1783        while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1784            match event {
1785                InternalStreamEvent::Publish(event) => {
1786                    received_an_event = true;
1787                    if tx.send(event).await.is_err() {
1788                        return Ok(received_an_event); // Receiver dropped, exit task
1789                    }
1790                }
1791                InternalStreamEvent::Ping => (), // ignore pings
1792                InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1793            }
1794        }
1795        Ok(received_an_event)
1796    }
1797
1798    fn get_delay(consecutive_failures: u32) -> Duration {
1799        if consecutive_failures > 0 {
1800            /// helper function to calculate 2^x
1801            /// optimization using bit shifting
1802            const fn two_to_pow(x: u32) -> u32 {
1803                1 << x
1804            }
1805            // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1806            INITIAL_RECONNECT_DELAY
1807                .saturating_mul(two_to_pow(consecutive_failures - 1))
1808                .min(MAX_RECONNECT_DELAY)
1809        } else {
1810            // On zero consecutive failures, 0 delay
1811            Duration::from_millis(0)
1812        }
1813    }
1814
1815    async fn stream_height_events_with_retry(
1816        &self,
1817        tx: &mpsc::Sender<HeightStreamEvent>,
1818    ) -> Result<()> {
1819        let mut consecutive_failures = 0u32;
1820
1821        loop {
1822            // should always be able to creat a new es stream
1823            // something is wrong with the req builder otherwise
1824            let mut es = self.get_es_stream().context("get es stream")?;
1825
1826            let mut error = anyhow!("");
1827
1828            match Self::stream_height_events(&mut es, tx).await {
1829                Ok(received_an_event) => {
1830                    if received_an_event {
1831                        consecutive_failures = 0; // Reset after successful connection that then failed
1832                    }
1833                    log::trace!("Stream height exited");
1834                }
1835                Err(e) => {
1836                    log::trace!("Stream height failed: {e:?}");
1837                    error = e;
1838                }
1839            }
1840
1841            es.close();
1842
1843            // If the receiver is closed, exit the task
1844            if tx.is_closed() {
1845                break;
1846            }
1847
1848            let delay = Self::get_delay(consecutive_failures);
1849            log::trace!("Reconnecting in {:?}...", delay);
1850
1851            let error_msg = format!("{error:?}");
1852
1853            if tx
1854                .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1855                .await
1856                .is_err()
1857            {
1858                return Ok(()); // Receiver dropped, exit task
1859            }
1860            tokio::time::sleep(delay).await;
1861
1862            // increment consecutive failures so that on the next try
1863            // it will start using back offs
1864            consecutive_failures += 1;
1865        }
1866
1867        Ok(())
1868    }
1869
1870    /// Streams archive height updates from the server via Server-Sent Events.
1871    ///
1872    /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1873    /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1874    ///
1875    /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1876    /// and height updates. This allows applications to display connection status to users.
1877    ///
1878    /// # Returns
1879    /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1880    /// lifecycle and sends events through this channel.
1881    ///
1882    /// # Example
1883    /// ```
1884    /// # use hypersync_client::{Client, HeightStreamEvent};
1885    /// # async fn example() -> anyhow::Result<()> {
1886    /// let client = Client::builder()
1887    ///     .url("https://eth.hypersync.xyz")
1888    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1889    ///     .build()?;
1890    ///
1891    /// let mut rx = client.stream_height();
1892    ///
1893    /// while let Some(event) = rx.recv().await {
1894    ///     match event {
1895    ///         HeightStreamEvent::Connected => println!("Connected to stream"),
1896    ///         HeightStreamEvent::Height(h) => println!("Height: {}", h),
1897    ///         HeightStreamEvent::Reconnecting { delay, error_msg } => {
1898    ///             println!("Reconnecting in {delay:?} due to error: {error_msg}")
1899    ///         }
1900    ///     }
1901    /// }
1902    /// # Ok(())
1903    /// # }
1904    /// ```
1905    pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1906        let (tx, rx) = mpsc::channel(16);
1907        let client = self.clone();
1908
1909        tokio::spawn(async move {
1910            if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1911                log::error!("Stream height failed unexpectedly: {e:?}");
1912            }
1913        });
1914
1915        rx
1916    }
1917}
1918
1919fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1920    if config.event_signature.is_some() {
1921        return Err(anyhow!(
1922            "config.event_signature can't be passed to simple type function. User is expected to \
1923             decode the logs using Decoder."
1924        ));
1925    }
1926    if config.column_mapping.is_some() {
1927        return Err(anyhow!(
1928            "config.column_mapping can't be passed to single type function. User is expected to \
1929             map values manually."
1930        ));
1931    }
1932
1933    Ok(())
1934}
1935
1936/// Used to indicate whether or not a retry should be attempted.
1937#[derive(Debug, thiserror::Error)]
1938pub enum HyperSyncResponseError {
1939    /// Means that the client should retry with a smaller block range.
1940    #[error("hypersync responded with 'payload too large' error")]
1941    PayloadTooLarge,
1942    /// Server responded with 429 Too Many Requests.
1943    #[error("rate limited by server")]
1944    RateLimited {
1945        /// Rate limit information from the 429 response headers.
1946        rate_limit: RateLimitInfo,
1947    },
1948    /// Any other server error.
1949    #[error(transparent)]
1950    Other(#[from] anyhow::Error),
1951}
1952
1953/// Result of a single Arrow query execution, before retry logic.
1954struct ArrowImplResponse {
1955    /// The parsed Arrow response data.
1956    response: ArrowResponse,
1957    /// Size of the response body in bytes.
1958    response_bytes: u64,
1959    /// Rate limit information parsed from response headers.
1960    rate_limit: RateLimitInfo,
1961}
1962
1963#[cfg(test)]
1964mod tests {
1965    use super::*;
1966    #[test]
1967    fn test_get_delay() {
1968        assert_eq!(
1969            Client::get_delay(0),
1970            Duration::from_millis(0),
1971            "starts with 0 delay"
1972        );
1973        // powers of 2 backoff
1974        assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1975        assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1976        assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1977        // maxes out at 30s
1978        assert_eq!(
1979            Client::get_delay(9),
1980            Duration::from_secs(30),
1981            "max delay is 30s"
1982        );
1983        assert_eq!(
1984            Client::get_delay(10),
1985            Duration::from_secs(30),
1986            "max delay is 30s"
1987        );
1988    }
1989
1990    #[tokio::test]
1991    #[ignore = "integration test requiring live hs server"]
1992    async fn test_http2_is_used() -> anyhow::Result<()> {
1993        let api_token = std::env::var("ENVIO_API_TOKEN")?;
1994        let client = reqwest::Client::builder()
1995            .no_gzip()
1996            .user_agent("hscr-test")
1997            .build()?;
1998
1999        let res = client
2000            .get("https://eth.hypersync.xyz/height")
2001            .bearer_auth(&api_token)
2002            .send()
2003            .await?;
2004
2005        assert_eq!(
2006            res.version(),
2007            reqwest::Version::HTTP_2,
2008            "expected HTTP/2 but got {:?}",
2009            res.version()
2010        );
2011        assert!(res.status().is_success());
2012        Ok(())
2013    }
2014
2015    #[tokio::test]
2016    #[ignore = "integration test with live hs server for height stream"]
2017    async fn test_stream_height_events() -> anyhow::Result<()> {
2018        let (tx, mut rx) = mpsc::channel(16);
2019        let handle = tokio::spawn(async move {
2020            let client = Client::builder()
2021                .url("https://monad-testnet.hypersync.xyz")
2022                .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
2023                .build()?;
2024            let mut es = client.get_es_stream().context("get es stream")?;
2025            Client::stream_height_events(&mut es, &tx).await
2026        });
2027
2028        let val = rx.recv().await;
2029        assert!(val.is_some());
2030        assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
2031        let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
2032            panic!("should have received height")
2033        };
2034        let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
2035            panic!("should have received height")
2036        };
2037        assert!(height2 > height);
2038        drop(rx);
2039
2040        let res = handle.await.expect("should have joined");
2041        let received_an_event =
2042            res.expect("should have ended the stream gracefully after dropping rx");
2043        assert!(received_an_event, "should have received an event");
2044
2045        Ok(())
2046    }
2047}