hypersync_client/
lib.rs

1#![deny(missing_docs)]
2//! # HyperSync Client
3//!
4//! A high-performance Rust client for the HyperSync protocol, enabling efficient retrieval
5//! of blockchain data including blocks, transactions, logs, and traces.
6//!
7//! ## Features
8//!
9//! - **High-performance streaming**: Parallel data fetching with automatic retries
10//! - **Flexible querying**: Rich query builder API for precise data selection
11//! - **Multiple data formats**: Support for Arrow, Parquet, and simple Rust types
12//! - **Event decoding**: Automatic ABI decoding for smart contract events
13//! - **Real-time updates**: Live height streaming via Server-Sent Events
14//! - **Production ready**: Built-in rate limiting, retries, and error handling
15//!
16//! ## Quick Start
17//!
18//! ```no_run
19//! use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//!     // Create a client for Ethereum mainnet
24//!     let client = Client::builder()
25//!         .chain_id(1)
26//!         .api_token(std::env::var("ENVIO_API_TOKEN")?)
27//!         .build()?;
28//!
29//!     // Query ERC20 transfer events from USDC contract
30//!     let query = Query::new()
31//!         .from_block(19000000)
32//!         .to_block_excl(19001000)
33//!         .where_logs(
34//!             LogFilter::all()
35//!                 .and_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
36//!                 .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
37//!         )
38//!         .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
39//!
40//!     // Get all data in one response
41//!     let response = client.get(&query).await?;
42//!     println!("Retrieved {} blocks", response.data.blocks.len());
43//!
44//!     // Or stream data for large ranges
45//!     let mut receiver = client.stream(query, StreamConfig::default()).await?;
46//!     while let Some(response) = receiver.recv().await {
47//!         let response = response?;
48//!         println!("Streaming: got blocks up to {}", response.next_block);
49//!     }
50//!
51//!     Ok(())
52//! }
53//! ```
54//!
55//! ## Main Types
56//!
57//! - [`Client`] - Main client for interacting with HyperSync servers
58//! - [`net_types::Query`] - Query builder for specifying what data to fetch
59//! - [`StreamConfig`] - Configuration for streaming operations
60//! - [`QueryResponse`] - Response containing blocks, transactions, logs, and traces
61//! - [`ArrowResponse`] - Response in Apache Arrow format for high-performance processing
62//!
63//! ## Authentication
64//!
65//! You'll need a HyperSync API token to access the service. Get one from
66//! [https://envio.dev/app/api-tokens](https://envio.dev/app/api-tokens).
67//!
68//! ## Examples
69//!
70//! See the `examples/` directory for more detailed usage patterns including:
71//! - ERC20 token transfers
72//! - Wallet transaction history  
73//! - Event decoding and filtering
74//! - Real-time data streaming
75use std::{sync::Arc, time::Duration};
76
77use anyhow::{anyhow, Context, Result};
78use futures::StreamExt;
79use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
80use reqwest::Method;
81use reqwest_eventsource::retry::ExponentialBackoff;
82use reqwest_eventsource::{Event, EventSource};
83
84pub mod arrow_reader;
85mod column_mapping;
86mod config;
87mod decode;
88mod decode_call;
89mod from_arrow;
90mod parquet_out;
91mod parse_response;
92pub mod preset_query;
93mod rayon_async;
94pub mod simple_types;
95mod stream;
96mod types;
97mod util;
98
99pub use hypersync_format as format;
100pub use hypersync_net_types as net_types;
101pub use hypersync_schema as schema;
102
103use parse_response::parse_query_response;
104use tokio::sync::mpsc;
105use types::{EventResponse, ResponseData};
106use url::Url;
107
108pub use column_mapping::{ColumnMapping, DataType};
109pub use config::HexOutput;
110pub use config::{ClientConfig, SerializationFormat, StreamConfig};
111pub use decode::Decoder;
112pub use decode_call::CallDecoder;
113pub use types::{ArrowResponse, ArrowResponseData, QueryResponse};
114
115use crate::parse_response::read_query_response;
116use crate::simple_types::InternalEventJoinStrategy;
117
118#[derive(Debug)]
119struct HttpClientWrapper {
120    /// Initialized reqwest instance for client url.
121    client: reqwest::Client,
122    /// HyperSync server api token.
123    api_token: String,
124    /// Standard timeout for http requests.
125    timeout: Duration,
126}
127
128impl HttpClientWrapper {
129    fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
130        self.client
131            .request(method, url)
132            .timeout(self.timeout)
133            .bearer_auth(&self.api_token)
134    }
135
136    fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
137        self.client
138            .request(method, url)
139            .bearer_auth(&self.api_token)
140    }
141}
142
143/// Internal client state to handle http requests and retries.
144#[derive(Debug)]
145struct ClientInner {
146    http_client: HttpClientWrapper,
147    /// HyperSync server URL.
148    url: Url,
149    /// Number of retries to attempt before returning error.
150    max_num_retries: usize,
151    /// Milliseconds that would be used for retry backoff increasing.
152    retry_backoff_ms: u64,
153    /// Initial wait time for request backoff.
154    retry_base_ms: u64,
155    /// Ceiling time for request backoff.
156    retry_ceiling_ms: u64,
157    /// Query serialization format to use for HTTP requests.
158    serialization_format: SerializationFormat,
159}
160
161/// Client to handle http requests and retries.
162#[derive(Clone, Debug)]
163pub struct Client {
164    inner: Arc<ClientInner>,
165}
166
167impl Client {
168    /// Creates a new client with the given configuration.
169    ///
170    /// Configuration must include the `url` and `api_token` fields.
171    /// # Example
172    /// ```
173    /// use hypersync_client::{Client, ClientConfig};
174    ///
175    /// let config = ClientConfig {
176    ///     url: "https://eth.hypersync.xyz".to_string(),
177    ///     api_token: std::env::var("ENVIO_API_TOKEN")?,
178    ///     ..Default::default()
179    /// };
180    /// let client = Client::new(config)?;
181    /// # Ok::<(), anyhow::Error>(())
182    /// ```
183    ///
184    /// # Errors
185    /// This method fails if the config is invalid.
186    pub fn new(cfg: ClientConfig) -> Result<Self> {
187        // hscr stands for hypersync client rust
188        cfg.validate().context("invalid ClientConfig")?;
189        let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
190        Self::new_internal(cfg, user_agent)
191    }
192
193    /// Creates a new client builder.
194    ///
195    /// # Example
196    /// ```
197    /// use hypersync_client::Client;
198    ///
199    /// let client = Client::builder()
200    ///     .chain_id(1)
201    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
202    ///     .build()
203    ///     .unwrap();
204    /// ```
205    pub fn builder() -> ClientBuilder {
206        ClientBuilder::new()
207    }
208
209    #[doc(hidden)]
210    pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
211        // Creates a new client with the given configuration and custom user agent.
212        // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
213        Self::new_internal(cfg, user_agent.into())
214    }
215
216    /// Internal constructor that takes both config and user agent.
217    fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
218        let http_client = HttpClientWrapper {
219            client: reqwest::Client::builder()
220                .no_gzip()
221                .user_agent(user_agent)
222                .build()
223                .unwrap(),
224            api_token: cfg.api_token,
225            timeout: Duration::from_millis(cfg.http_req_timeout_millis),
226        };
227
228        let url = Url::parse(&cfg.url).context("url is malformed")?;
229
230        Ok(Self {
231            inner: Arc::new(ClientInner {
232                http_client,
233                url,
234                max_num_retries: cfg.max_num_retries,
235                retry_backoff_ms: cfg.retry_backoff_ms,
236                retry_base_ms: cfg.retry_base_ms,
237                retry_ceiling_ms: cfg.retry_ceiling_ms,
238                serialization_format: cfg.serialization_format,
239            }),
240        })
241    }
242
243    /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
244    /// query and stream configuration.
245    ///
246    /// ### Implementation
247    /// Runs multiple queries simultaneously based on config.concurrency.
248    ///
249    /// Each query runs until it reaches query.to, server height, any max_num_* query param,
250    /// or execution timed out by server.
251    ///
252    /// # ⚠️ Important Warning
253    ///
254    /// This method will continue executing until the query has run to completion from beginning
255    /// to the end of the block range defined in the query. For heavy queries with large block
256    /// ranges or high data volumes, consider:
257    ///
258    /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
259    /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
260    /// - Break large queries into smaller block ranges
261    ///
262    /// # Example
263    /// ```no_run
264    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
265    ///
266    /// # async fn example() -> anyhow::Result<()> {
267    /// let client = Client::builder()
268    ///     .chain_id(1)
269    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
270    ///     .build()?;
271    ///
272    /// // Query ERC20 transfer events
273    /// let query = Query::new()
274    ///     .from_block(19000000)
275    ///     .to_block_excl(19000010)
276    ///     .where_logs(
277    ///         LogFilter::all()
278    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
279    ///     )
280    ///     .select_log_fields([LogField::Address, LogField::Data]);
281    /// let response = client.collect(query, StreamConfig::default()).await?;
282    ///
283    /// println!("Collected {} events", response.data.logs.len());
284    /// # Ok(())
285    /// # }
286    /// ```
287    pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
288        check_simple_stream_params(&config)?;
289
290        let mut recv = stream::stream_arrow(self, query, config)
291            .await
292            .context("start stream")?;
293
294        let mut data = ResponseData::default();
295        let mut archive_height = None;
296        let mut next_block = 0;
297        let mut total_execution_time = 0;
298
299        while let Some(res) = recv.recv().await {
300            let res = res.context("get response")?;
301            let res: QueryResponse =
302                QueryResponse::try_from(&res).context("convert arrow response")?;
303
304            for batch in res.data.blocks {
305                data.blocks.push(batch);
306            }
307            for batch in res.data.transactions {
308                data.transactions.push(batch);
309            }
310            for batch in res.data.logs {
311                data.logs.push(batch);
312            }
313            for batch in res.data.traces {
314                data.traces.push(batch);
315            }
316
317            archive_height = res.archive_height;
318            next_block = res.next_block;
319            total_execution_time += res.total_execution_time
320        }
321
322        Ok(QueryResponse {
323            archive_height,
324            next_block,
325            total_execution_time,
326            data,
327            rollback_guard: None,
328        })
329    }
330
331    /// Retrieves events through a stream using the provided query and stream configuration.
332    ///
333    /// # ⚠️ Important Warning
334    ///
335    /// This method will continue executing until the query has run to completion from beginning
336    /// to the end of the block range defined in the query. For heavy queries with large block
337    /// ranges or high data volumes, consider:
338    ///
339    /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
340    /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
341    /// - Break large queries into smaller block ranges
342    ///
343    /// # Example
344    /// ```no_run
345    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
346    ///
347    /// # async fn example() -> anyhow::Result<()> {
348    /// let client = Client::builder()
349    ///     .chain_id(1)
350    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
351    ///     .build()?;
352    ///
353    /// // Query transactions to a specific address
354    /// let query = Query::new()
355    ///     .from_block(19000000)
356    ///     .to_block_excl(19000100)
357    ///     .where_transactions(
358    ///         TransactionFilter::all()
359    ///             .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
360    ///     )
361    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
362    /// let response = client.collect_events(query, StreamConfig::default()).await?;
363    ///
364    /// println!("Collected {} events", response.data.len());
365    /// # Ok(())
366    /// # }
367    /// ```
368    pub async fn collect_events(
369        &self,
370        mut query: Query,
371        config: StreamConfig,
372    ) -> Result<EventResponse> {
373        check_simple_stream_params(&config)?;
374
375        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
376        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
377
378        let mut recv = stream::stream_arrow(self, query, config)
379            .await
380            .context("start stream")?;
381
382        let mut data = Vec::new();
383        let mut archive_height = None;
384        let mut next_block = 0;
385        let mut total_execution_time = 0;
386
387        while let Some(res) = recv.recv().await {
388            let res = res.context("get response")?;
389            let res: QueryResponse =
390                QueryResponse::try_from(&res).context("convert arrow response")?;
391            let events = event_join_strategy.join_from_response_data(res.data);
392
393            data.extend(events);
394
395            archive_height = res.archive_height;
396            next_block = res.next_block;
397            total_execution_time += res.total_execution_time
398        }
399
400        Ok(EventResponse {
401            archive_height,
402            next_block,
403            total_execution_time,
404            data,
405            rollback_guard: None,
406        })
407    }
408
409    /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
410    /// the provided query and stream configuration.
411    ///
412    /// Returns data in Apache Arrow format for high-performance columnar processing.
413    /// Useful for analytics workloads or when working with Arrow-compatible tools.
414    ///
415    /// # ⚠️ Important Warning
416    ///
417    /// This method will continue executing until the query has run to completion from beginning
418    /// to the end of the block range defined in the query. For heavy queries with large block
419    /// ranges or high data volumes, consider:
420    ///
421    /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
422    /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
423    /// - Break large queries into smaller block ranges
424    ///
425    /// # Example
426    /// ```no_run
427    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
428    ///
429    /// # async fn example() -> anyhow::Result<()> {
430    /// let client = Client::builder()
431    ///     .chain_id(1)
432    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
433    ///     .build()?;
434    ///
435    /// // Get block data in Arrow format for analytics
436    /// let query = Query::new()
437    ///     .from_block(19000000)
438    ///     .to_block_excl(19000100)
439    ///     .include_all_blocks()
440    ///     .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
441    /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
442    ///
443    /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
444    /// # Ok(())
445    /// # }
446    /// ```
447    pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
448        let mut recv = stream::stream_arrow(self, query, config)
449            .await
450            .context("start stream")?;
451
452        let mut data = ArrowResponseData::default();
453        let mut archive_height = None;
454        let mut next_block = 0;
455        let mut total_execution_time = 0;
456
457        while let Some(res) = recv.recv().await {
458            let res = res.context("get response")?;
459
460            for batch in res.data.blocks {
461                data.blocks.push(batch);
462            }
463            for batch in res.data.transactions {
464                data.transactions.push(batch);
465            }
466            for batch in res.data.logs {
467                data.logs.push(batch);
468            }
469            for batch in res.data.traces {
470                data.traces.push(batch);
471            }
472            for batch in res.data.decoded_logs {
473                data.decoded_logs.push(batch);
474            }
475
476            archive_height = res.archive_height;
477            next_block = res.next_block;
478            total_execution_time += res.total_execution_time
479        }
480
481        Ok(ArrowResponse {
482            archive_height,
483            next_block,
484            total_execution_time,
485            data,
486            rollback_guard: None,
487        })
488    }
489
490    /// Writes parquet file getting data through a stream using the provided path, query,
491    /// and stream configuration.
492    ///
493    /// Streams data directly to a Parquet file for efficient storage and later analysis.
494    /// Perfect for data exports or ETL pipelines.
495    ///
496    /// # ⚠️ Important Warning
497    ///
498    /// This method will continue executing until the query has run to completion from beginning
499    /// to the end of the block range defined in the query. For heavy queries with large block
500    /// ranges or high data volumes, consider:
501    ///
502    /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
503    /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
504    /// - Break large queries into smaller block ranges
505    ///
506    /// # Example
507    /// ```no_run
508    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
509    ///
510    /// # async fn example() -> anyhow::Result<()> {
511    /// let client = Client::builder()
512    ///     .chain_id(1)
513    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
514    ///     .build()?;
515    ///
516    /// // Export all DEX trades to Parquet for analysis
517    /// let query = Query::new()
518    ///     .from_block(19000000)
519    ///     .to_block_excl(19010000)
520    ///     .where_logs(
521    ///         LogFilter::all()
522    ///             .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
523    ///     )
524    ///     .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
525    /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
526    ///
527    /// println!("Trade data exported to trades.parquet");
528    /// # Ok(())
529    /// # }
530    /// ```
531    pub async fn collect_parquet(
532        &self,
533        path: &str,
534        query: Query,
535        config: StreamConfig,
536    ) -> Result<()> {
537        parquet_out::collect_parquet(self, path, query, config).await
538    }
539
540    /// Internal implementation of getting chain_id from server
541    async fn get_chain_id_impl(&self) -> Result<u64> {
542        let mut url = self.inner.url.clone();
543        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
544        segments.push("chain_id");
545        std::mem::drop(segments);
546        let req = self.inner.http_client.request(Method::GET, url);
547
548        let res = req.send().await.context("execute http req")?;
549
550        let status = res.status();
551        if !status.is_success() {
552            return Err(anyhow!("http response status code {}", status));
553        }
554
555        let chain_id: ChainId = res.json().await.context("read response body json")?;
556
557        Ok(chain_id.chain_id)
558    }
559
560    /// Internal implementation of getting height from server
561    async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
562        let mut url = self.inner.url.clone();
563        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
564        segments.push("height");
565        std::mem::drop(segments);
566        let mut req = self.inner.http_client.request(Method::GET, url);
567        if let Some(http_timeout_override) = http_timeout_override {
568            req = req.timeout(http_timeout_override);
569        }
570
571        let res = req.send().await.context("execute http req")?;
572
573        let status = res.status();
574        if !status.is_success() {
575            return Err(anyhow!("http response status code {}", status));
576        }
577
578        let height: ArchiveHeight = res.json().await.context("read response body json")?;
579
580        Ok(height.height.unwrap_or(0))
581    }
582
583    /// Get the chain_id from the server with retries.
584    ///
585    /// # Example
586    /// ```no_run
587    /// use hypersync_client::Client;
588    ///
589    /// # async fn example() -> anyhow::Result<()> {
590    /// let client = Client::builder()
591    ///     .chain_id(1)
592    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
593    ///     .build()?;
594    ///
595    /// let chain_id = client.get_chain_id().await?;
596    /// println!("Connected to chain ID: {}", chain_id);
597    /// # Ok(())
598    /// # }
599    /// ```
600    pub async fn get_chain_id(&self) -> Result<u64> {
601        let mut base = self.inner.retry_base_ms;
602
603        let mut err = anyhow!("");
604
605        for _ in 0..self.inner.max_num_retries + 1 {
606            match self.get_chain_id_impl().await {
607                Ok(res) => return Ok(res),
608                Err(e) => {
609                    log::error!(
610                        "failed to get chain_id from server, retrying... The error was: {e:?}"
611                    );
612                    err = err.context(format!("{e:?}"));
613                }
614            }
615
616            let base_ms = Duration::from_millis(base);
617            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
618                rand::random(),
619                self.inner.retry_backoff_ms,
620            ));
621
622            tokio::time::sleep(base_ms + jitter).await;
623
624            base = std::cmp::min(
625                base + self.inner.retry_backoff_ms,
626                self.inner.retry_ceiling_ms,
627            );
628        }
629
630        Err(err)
631    }
632
633    /// Get the height of from server with retries.
634    ///
635    /// # Example
636    /// ```no_run
637    /// use hypersync_client::Client;
638    ///
639    /// # async fn example() -> anyhow::Result<()> {
640    /// let client = Client::builder()
641    ///     .chain_id(1)
642    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
643    ///     .build()?;
644    ///
645    /// let height = client.get_height().await?;
646    /// println!("Current block height: {}", height);
647    /// # Ok(())
648    /// # }
649    /// ```
650    pub async fn get_height(&self) -> Result<u64> {
651        let mut base = self.inner.retry_base_ms;
652
653        let mut err = anyhow!("");
654
655        for _ in 0..self.inner.max_num_retries + 1 {
656            match self.get_height_impl(None).await {
657                Ok(res) => return Ok(res),
658                Err(e) => {
659                    log::error!(
660                        "failed to get height from server, retrying... The error was: {e:?}"
661                    );
662                    err = err.context(format!("{e:?}"));
663                }
664            }
665
666            let base_ms = Duration::from_millis(base);
667            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
668                rand::random(),
669                self.inner.retry_backoff_ms,
670            ));
671
672            tokio::time::sleep(base_ms + jitter).await;
673
674            base = std::cmp::min(
675                base + self.inner.retry_backoff_ms,
676                self.inner.retry_ceiling_ms,
677            );
678        }
679
680        Err(err)
681    }
682
683    /// Get the height of the Client instance for health checks.
684    ///
685    /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
686    ///
687    /// # Example
688    /// ```no_run
689    /// use hypersync_client::Client;
690    /// use std::time::Duration;
691    ///
692    /// # async fn example() -> anyhow::Result<()> {
693    /// let client = Client::builder()
694    ///     .chain_id(1)
695    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
696    ///     .build()?;
697    ///
698    /// // Quick health check with 5 second timeout
699    /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
700    /// println!("Server is healthy at block: {}", height);
701    /// # Ok(())
702    /// # }
703    /// ```
704    pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
705        self.get_height_impl(http_req_timeout).await
706    }
707
708    /// Executes query with retries and returns the response.
709    ///
710    /// # Example
711    /// ```no_run
712    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
713    ///
714    /// # async fn example() -> anyhow::Result<()> {
715    /// let client = Client::builder()
716    ///     .chain_id(1)
717    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
718    ///     .build()?;
719    ///
720    /// // Query all blocks from a specific range
721    /// let query = Query::new()
722    ///     .from_block(19000000)
723    ///     .to_block_excl(19000010)
724    ///     .include_all_blocks()
725    ///     .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
726    /// let response = client.get(&query).await?;
727    ///
728    /// println!("Retrieved {} blocks", response.data.blocks.len());
729    /// # Ok(())
730    /// # }
731    /// ```
732    pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
733        let arrow_response = self.get_arrow(query).await.context("get data")?;
734        let converted =
735            QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
736        Ok(converted)
737    }
738
739    /// Add block, transaction and log fields selection to the query, executes it with retries
740    /// and returns the response.
741    ///
742    /// This method automatically joins blocks, transactions, and logs into unified events,
743    /// making it easier to work with related blockchain data.
744    ///
745    /// # Example
746    /// ```no_run
747    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
748    ///
749    /// # async fn example() -> anyhow::Result<()> {
750    /// let client = Client::builder()
751    ///     .chain_id(1)
752    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
753    ///     .build()?;
754    ///
755    /// // Query ERC20 transfers with transaction context
756    /// let query = Query::new()
757    ///     .from_block(19000000)
758    ///     .to_block_excl(19000010)
759    ///     .where_logs(
760    ///         LogFilter::all()
761    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
762    ///     )
763    ///     .select_log_fields([LogField::Address, LogField::Data])
764    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
765    /// let response = client.get_events(query).await?;
766    ///
767    /// println!("Retrieved {} joined events", response.data.len());
768    /// # Ok(())
769    /// # }
770    /// ```
771    pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
772        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
773        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
774        let arrow_response = self.get_arrow(&query).await.context("get data")?;
775        EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
776    }
777
778    /// Executes query once and returns the result in (Arrow, size) format using JSON serialization.
779    async fn get_arrow_impl_json(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
780        let mut url = self.inner.url.clone();
781        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
782        segments.push("query");
783        segments.push("arrow-ipc");
784        std::mem::drop(segments);
785        let req = self.inner.http_client.request(Method::POST, url);
786
787        let res = req.json(&query).send().await.context("execute http req")?;
788
789        let status = res.status();
790        if !status.is_success() {
791            let text = res.text().await.context("read text to see error")?;
792
793            return Err(anyhow!(
794                "http response status code {}, err body: {}",
795                status,
796                text
797            ));
798        }
799
800        let bytes = res.bytes().await.context("read response body bytes")?;
801
802        let res = tokio::task::block_in_place(|| {
803            parse_query_response(&bytes).context("parse query response")
804        })?;
805
806        Ok((res, bytes.len().try_into().unwrap()))
807    }
808
809    fn should_cache_queries(&self) -> bool {
810        matches!(
811            self.inner.serialization_format,
812            SerializationFormat::CapnProto {
813                should_cache_queries: true
814            }
815        )
816    }
817
818    /// Executes query once and returns the result in (Arrow, size) format using Cap'n Proto serialization.
819    async fn get_arrow_impl_capnp(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
820        let mut url = self.inner.url.clone();
821        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
822        segments.push("query");
823        segments.push("arrow-ipc");
824        segments.push("capnp");
825        std::mem::drop(segments);
826
827        let should_cache = self.should_cache_queries();
828
829        if should_cache {
830            let query_with_id = {
831                let mut message = capnp::message::Builder::new_default();
832                let mut request_builder =
833                    message.init_root::<hypersync_net_types_capnp::request::Builder>();
834
835                request_builder.build_query_id_from_query(query)?;
836                let mut query_with_id = Vec::new();
837                capnp::serialize_packed::write_message(&mut query_with_id, &message)?;
838                query_with_id
839            };
840
841            let mut req = self.inner.http_client.request(Method::POST, url.clone());
842            req = req.header("content-type", "application/x-capnp");
843
844            let res = req
845                .body(query_with_id)
846                .send()
847                .await
848                .context("execute http req")?;
849
850            let status = res.status();
851            if status.is_success() {
852                let bytes = res.bytes().await.context("read response body bytes")?;
853
854                let mut opts = capnp::message::ReaderOptions::new();
855                opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
856                let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
857                    .context("create message reader")?;
858                let query_response = message_reader
859                    .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
860                    .context("get cached_query_response root")?;
861                match query_response.get_either().which()? {
862                hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
863                    query_response,
864                ) => {
865                    let res = tokio::task::block_in_place(|| {
866                        let res = query_response?;
867                        read_query_response(&res).context("parse query response cached")
868                    })?;
869                    return Ok((res, bytes.len().try_into().unwrap()));
870                }
871                hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
872                    log::trace!("query was not cached, retrying with full query");
873                }
874            }
875            } else {
876                let text = res.text().await.context("read text to see error")?;
877                log::warn!(
878                    "Failed cache query, will retry full query. {}, err body: {}",
879                    status,
880                    text
881                );
882            }
883        };
884
885        let full_query_bytes = {
886            let mut message = capnp::message::Builder::new_default();
887            let mut query_builder =
888                message.init_root::<hypersync_net_types_capnp::request::Builder>();
889
890            query_builder.build_full_query_from_query(query, should_cache)?;
891            let mut bytes = Vec::new();
892            capnp::serialize_packed::write_message(&mut bytes, &message)?;
893            bytes
894        };
895
896        let mut req = self.inner.http_client.request(Method::POST, url);
897        req = req.header("content-type", "application/x-capnp");
898
899        let res = req
900            .header("content-type", "application/x-capnp")
901            .body(full_query_bytes)
902            .send()
903            .await
904            .context("execute http req")?;
905
906        let status = res.status();
907        if !status.is_success() {
908            let text = res.text().await.context("read text to see error")?;
909
910            return Err(anyhow!(
911                "http response status code {}, err body: {}",
912                status,
913                text
914            ));
915        }
916
917        let bytes = res.bytes().await.context("read response body bytes")?;
918
919        let res = tokio::task::block_in_place(|| {
920            parse_query_response(&bytes).context("parse query response")
921        })?;
922
923        Ok((res, bytes.len().try_into().unwrap()))
924    }
925
926    /// Executes query once and returns the result in (Arrow, size) format.
927    async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
928        match self.inner.serialization_format {
929            SerializationFormat::Json => self.get_arrow_impl_json(query).await,
930            SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(query).await,
931        }
932    }
933
934    /// Executes query with retries and returns the response in Arrow format.
935    pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
936        self.get_arrow_with_size(query).await.map(|res| res.0)
937    }
938
939    /// Internal implementation for get_arrow.
940    async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
941        let mut base = self.inner.retry_base_ms;
942
943        let mut err = anyhow!("");
944
945        for _ in 0..self.inner.max_num_retries + 1 {
946            match self.get_arrow_impl(query).await {
947                Ok(res) => return Ok(res),
948                Err(e) => {
949                    log::error!(
950                        "failed to get arrow data from server, retrying... The error was: {e:?}"
951                    );
952                    err = err.context(format!("{e:?}"));
953                }
954            }
955
956            let base_ms = Duration::from_millis(base);
957            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
958                rand::random(),
959                self.inner.retry_backoff_ms,
960            ));
961
962            tokio::time::sleep(base_ms + jitter).await;
963
964            base = std::cmp::min(
965                base + self.inner.retry_backoff_ms,
966                self.inner.retry_ceiling_ms,
967            );
968        }
969
970        Err(err)
971    }
972
973    /// Spawns task to execute query and return data via a channel.
974    ///
975    /// # Example
976    /// ```no_run
977    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
978    ///
979    /// # async fn example() -> anyhow::Result<()> {
980    /// let client = Client::builder()
981    ///     .chain_id(1)
982    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
983    ///     .build()?;
984    ///
985    /// // Stream all ERC20 transfer events
986    /// let query = Query::new()
987    ///     .from_block(19000000)
988    ///     .where_logs(
989    ///         LogFilter::all()
990    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
991    ///     )
992    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
993    /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
994    ///
995    /// while let Some(response) = receiver.recv().await {
996    ///     let response = response?;
997    ///     println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
998    /// }
999    /// # Ok(())
1000    /// # }
1001    /// ```
1002    pub async fn stream(
1003        &self,
1004        query: Query,
1005        config: StreamConfig,
1006    ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1007        check_simple_stream_params(&config)?;
1008
1009        let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1010            mpsc::channel(config.concurrency);
1011
1012        let mut inner_rx = self
1013            .stream_arrow(query, config)
1014            .await
1015            .context("start inner stream")?;
1016
1017        tokio::spawn(async move {
1018            while let Some(resp) = inner_rx.recv().await {
1019                let msg = resp
1020                    .context("inner receiver")
1021                    .and_then(|r| QueryResponse::try_from(&r));
1022                let is_err = msg.is_err();
1023                if tx.send(msg).await.is_err() || is_err {
1024                    return;
1025                }
1026            }
1027        });
1028
1029        Ok(rx)
1030    }
1031
1032    /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1033    /// returning data via a channel.
1034    ///
1035    /// This method automatically joins blocks, transactions, and logs into unified events,
1036    /// then streams them via a channel for real-time processing.
1037    ///
1038    /// # Example
1039    /// ```no_run
1040    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1041    ///
1042    /// # async fn example() -> anyhow::Result<()> {
1043    /// let client = Client::builder()
1044    ///     .chain_id(1)
1045    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1046    ///     .build()?;
1047    ///
1048    /// // Stream NFT transfer events with transaction context
1049    /// let query = Query::new()
1050    ///     .from_block(19000000)
1051    ///     .where_logs(
1052    ///         LogFilter::all()
1053    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1054    ///     )
1055    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1056    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1057    /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1058    ///
1059    /// while let Some(response) = receiver.recv().await {
1060    ///     let response = response?;
1061    ///     println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1062    /// }
1063    /// # Ok(())
1064    /// # }
1065    /// ```
1066    pub async fn stream_events(
1067        &self,
1068        mut query: Query,
1069        config: StreamConfig,
1070    ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1071        check_simple_stream_params(&config)?;
1072
1073        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1074
1075        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1076
1077        let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1078            mpsc::channel(config.concurrency);
1079
1080        let mut inner_rx = self
1081            .stream_arrow(query, config)
1082            .await
1083            .context("start inner stream")?;
1084
1085        tokio::spawn(async move {
1086            while let Some(resp) = inner_rx.recv().await {
1087                let msg = resp
1088                    .context("inner receiver")
1089                    .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1090                let is_err = msg.is_err();
1091                if tx.send(msg).await.is_err() || is_err {
1092                    return;
1093                }
1094            }
1095        });
1096
1097        Ok(rx)
1098    }
1099
1100    /// Spawns task to execute query and return data via a channel in Arrow format.
1101    ///
1102    /// Returns raw Apache Arrow data via a channel for high-performance processing.
1103    /// Ideal for applications that need to work directly with columnar data.
1104    ///
1105    /// # Example
1106    /// ```no_run
1107    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1108    ///
1109    /// # async fn example() -> anyhow::Result<()> {
1110    /// let client = Client::builder()
1111    ///     .chain_id(1)
1112    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1113    ///     .build()?;
1114    ///
1115    /// // Stream transaction data in Arrow format for analytics
1116    /// let query = Query::new()
1117    ///     .from_block(19000000)
1118    ///     .to_block_excl(19000100)
1119    ///     .where_transactions(
1120    ///         TransactionFilter::all()
1121    ///             .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1122    ///     )
1123    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1124    /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1125    ///
1126    /// while let Some(response) = receiver.recv().await {
1127    ///     let response = response?;
1128    ///     println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1129    /// }
1130    /// # Ok(())
1131    /// # }
1132    /// ```
1133    pub async fn stream_arrow(
1134        &self,
1135        query: Query,
1136        config: StreamConfig,
1137    ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1138        stream::stream_arrow(self, query, config).await
1139    }
1140
1141    /// Getter for url field.
1142    ///
1143    /// # Example
1144    /// ```
1145    /// use hypersync_client::Client;
1146    ///
1147    /// let client = Client::builder()
1148    ///     .chain_id(1)
1149    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1150    ///     .build()
1151    ///     .unwrap();
1152    ///
1153    /// println!("Client URL: {}", client.url());
1154    /// ```
1155    pub fn url(&self) -> &Url {
1156        &self.inner.url
1157    }
1158}
1159
1160/// Builder for creating a hypersync client with configuration options.
1161///
1162/// This builder provides a fluent API for configuring client settings like URL,
1163/// authentication, timeouts, and retry behavior.
1164///
1165/// # Example
1166/// ```
1167/// use hypersync_client::{Client, SerializationFormat};
1168///
1169/// let client = Client::builder()
1170///     .chain_id(1)
1171///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1172///     .http_req_timeout_millis(30000)
1173///     .max_num_retries(3)
1174///     .build()
1175///     .unwrap();
1176/// ```
1177#[derive(Debug, Clone, Default)]
1178pub struct ClientBuilder(ClientConfig);
1179
1180impl ClientBuilder {
1181    /// Creates a new ClientBuilder with default configuration.
1182    pub fn new() -> Self {
1183        Self::default()
1184    }
1185
1186    /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1187    ///
1188    /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1189    ///
1190    /// # Arguments
1191    /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1192    ///
1193    /// # Example
1194    /// ```
1195    /// use hypersync_client::Client;
1196    ///
1197    /// let client = Client::builder()
1198    ///     .chain_id(1) // Ethereum mainnet
1199    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1200    ///     .build()
1201    ///     .unwrap();
1202    /// ```
1203    pub fn chain_id(mut self, chain_id: u64) -> Self {
1204        self.0.url = format!("https://{chain_id}.hypersync.xyz");
1205        self
1206    }
1207
1208    /// Sets a custom URL for the hypersync server.
1209    ///
1210    /// Use this method when you need to connect to a custom hypersync endpoint
1211    /// instead of the default public endpoints.
1212    ///
1213    /// # Arguments
1214    /// * `url` - The hypersync server URL
1215    ///
1216    /// # Example
1217    /// ```
1218    /// use hypersync_client::Client;
1219    ///
1220    /// let client = Client::builder()
1221    ///     .url("https://my-custom-hypersync.example.com")
1222    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1223    ///     .build()
1224    ///     .unwrap();
1225    /// ```
1226    pub fn url<S: ToString>(mut self, url: S) -> Self {
1227        self.0.url = url.to_string();
1228        self
1229    }
1230
1231    /// Sets the api token for authentication.
1232    ///
1233    /// Required for accessing authenticated hypersync endpoints.
1234    ///
1235    /// # Arguments
1236    /// * `api_token` - The authentication token
1237    ///
1238    /// # Example
1239    /// ```
1240    /// use hypersync_client::Client;
1241    ///
1242    /// let client = Client::builder()
1243    ///     .chain_id(1)
1244    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1245    ///     .build()
1246    ///     .unwrap();
1247    /// ```
1248    pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1249        self.0.api_token = api_token.to_string();
1250        self
1251    }
1252
1253    /// Sets the HTTP request timeout in milliseconds.
1254    ///
1255    /// # Arguments
1256    /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1257    ///
1258    /// # Example
1259    /// ```
1260    /// use hypersync_client::Client;
1261    ///
1262    /// let client = Client::builder()
1263    ///     .chain_id(1)
1264    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1265    ///     .http_req_timeout_millis(60000) // 60 second timeout
1266    ///     .build()
1267    ///     .unwrap();
1268    /// ```
1269    pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1270        self.0.http_req_timeout_millis = http_req_timeout_millis;
1271        self
1272    }
1273
1274    /// Sets the maximum number of retries for failed requests.
1275    ///
1276    /// # Arguments
1277    /// * `max_num_retries` - Maximum number of retries (default: 10)
1278    ///
1279    /// # Example
1280    /// ```
1281    /// use hypersync_client::Client;
1282    ///
1283    /// let client = Client::builder()
1284    ///     .chain_id(1)
1285    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1286    ///     .max_num_retries(5)
1287    ///     .build()
1288    ///     .unwrap();
1289    /// ```
1290    pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1291        self.0.max_num_retries = max_num_retries;
1292        self
1293    }
1294
1295    /// Sets the backoff increment for retry delays.
1296    ///
1297    /// This value is added to the base delay on each retry attempt.
1298    ///
1299    /// # Arguments
1300    /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1301    ///
1302    /// # Example
1303    /// ```
1304    /// use hypersync_client::Client;
1305    ///
1306    /// let client = Client::builder()
1307    ///     .chain_id(1)
1308    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1309    ///     .retry_backoff_ms(1000) // 1 second backoff increment
1310    ///     .build()
1311    ///     .unwrap();
1312    /// ```
1313    pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1314        self.0.retry_backoff_ms = retry_backoff_ms;
1315        self
1316    }
1317
1318    /// Sets the initial delay for retry attempts.
1319    ///
1320    /// # Arguments
1321    /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1322    ///
1323    /// # Example
1324    /// ```
1325    /// use hypersync_client::Client;
1326    ///
1327    /// let client = Client::builder()
1328    ///     .chain_id(1)
1329    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1330    ///     .retry_base_ms(1000) // Start with 1 second delay
1331    ///     .build()
1332    ///     .unwrap();
1333    /// ```
1334    pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1335        self.0.retry_base_ms = retry_base_ms;
1336        self
1337    }
1338
1339    /// Sets the maximum delay for retry attempts.
1340    ///
1341    /// The retry delay will not exceed this value, even with backoff increments.
1342    ///
1343    /// # Arguments
1344    /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1345    ///
1346    /// # Example
1347    /// ```
1348    /// use hypersync_client::Client;
1349    ///
1350    /// let client = Client::builder()
1351    ///     .chain_id(1)
1352    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1353    ///     .retry_ceiling_ms(30000) // Cap at 30 seconds
1354    ///     .build()
1355    ///     .unwrap();
1356    /// ```
1357    pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1358        self.0.retry_ceiling_ms = retry_ceiling_ms;
1359        self
1360    }
1361
1362    /// Sets the serialization format for client-server communication.
1363    ///
1364    /// # Arguments
1365    /// * `serialization_format` - The format to use (JSON or CapnProto)
1366    ///
1367    /// # Example
1368    /// ```
1369    /// use hypersync_client::{Client, SerializationFormat};
1370    ///
1371    /// let client = Client::builder()
1372    ///     .chain_id(1)
1373    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1374    ///     .serialization_format(SerializationFormat::Json)
1375    ///     .build()
1376    ///     .unwrap();
1377    /// ```
1378    pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1379        self.0.serialization_format = serialization_format;
1380        self
1381    }
1382
1383    /// Builds the client with the configured settings.
1384    ///
1385    /// # Returns
1386    /// * `Result<Client>` - The configured client or an error if configuration is invalid
1387    ///
1388    /// # Errors
1389    /// Returns an error if:
1390    /// * The URL is malformed
1391    /// * Required configuration is missing
1392    ///
1393    /// # Example
1394    /// ```
1395    /// use hypersync_client::Client;
1396    ///
1397    /// let client = Client::builder()
1398    ///     .chain_id(1)
1399    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1400    ///     .build()
1401    ///     .unwrap();
1402    /// ```
1403    pub fn build(self) -> Result<Client> {
1404        if self.0.url.is_empty() {
1405            anyhow::bail!(
1406                "endpoint needs to be set, try using builder.chain_id(1) or\
1407                builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1408            )
1409        }
1410        Client::new(self.0)
1411    }
1412}
1413
1414/// 200ms
1415const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1416const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1417/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1418/// so we timeout after 15s (3x the ping interval).
1419const READ_TIMEOUT: Duration = Duration::from_secs(15);
1420
1421/// Events emitted by the height stream.
1422#[derive(Debug, Clone, PartialEq, Eq)]
1423pub enum HeightStreamEvent {
1424    /// Successfully connected or reconnected to the SSE stream.
1425    Connected,
1426    /// Received a height update from the server.
1427    Height(u64),
1428    /// Connection lost, will attempt to reconnect after the specified delay.
1429    Reconnecting {
1430        /// Duration to wait before attempting reconnection.
1431        delay: Duration,
1432        /// The error that caused the reconnection.
1433        error_msg: String,
1434    },
1435}
1436
1437enum InternalStreamEvent {
1438    Publish(HeightStreamEvent),
1439    Ping,
1440    Unknown(String),
1441}
1442
1443impl Client {
1444    fn get_es_stream(&self) -> Result<EventSource> {
1445        // Build the GET /height/sse request
1446        let mut url = self.inner.url.clone();
1447        url.path_segments_mut()
1448            .ok()
1449            .context("invalid base URL")?
1450            .extend(&["height", "sse"]);
1451
1452        let req = self
1453            .inner
1454            .http_client
1455            // Don't set timeout for SSE stream
1456            .request_no_timeout(Method::GET, url);
1457
1458        // Configure exponential backoff for library-level retries
1459        let retry_policy = ExponentialBackoff::new(
1460            INITIAL_RECONNECT_DELAY,
1461            2.0,
1462            Some(MAX_RECONNECT_DELAY),
1463            None, // unlimited retries
1464        );
1465
1466        // Turn the request into an EventSource stream with retries
1467        let mut es = reqwest_eventsource::EventSource::new(req)
1468            .context("unexpected error creating EventSource")?;
1469        es.set_retry_policy(Box::new(retry_policy));
1470        Ok(es)
1471    }
1472
1473    async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1474        let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1475            .await
1476            .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1477        else {
1478            return Ok(None);
1479        };
1480
1481        let e = match res.context("failed response")? {
1482            Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1483            Event::Message(event) => match event.event.as_str() {
1484                "height" => {
1485                    let height = event
1486                        .data
1487                        .trim()
1488                        .parse::<u64>()
1489                        .context("parsing height from event data")?;
1490                    InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1491                }
1492                "ping" => InternalStreamEvent::Ping,
1493                _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1494            },
1495        };
1496
1497        Ok(Some(e))
1498    }
1499
1500    async fn stream_height_events(
1501        es: &mut EventSource,
1502        tx: &mpsc::Sender<HeightStreamEvent>,
1503    ) -> Result<bool> {
1504        let mut received_an_event = false;
1505        while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1506            match event {
1507                InternalStreamEvent::Publish(event) => {
1508                    received_an_event = true;
1509                    if tx.send(event).await.is_err() {
1510                        return Ok(received_an_event); // Receiver dropped, exit task
1511                    }
1512                }
1513                InternalStreamEvent::Ping => (), // ignore pings
1514                InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1515            }
1516        }
1517        Ok(received_an_event)
1518    }
1519
1520    fn get_delay(consecutive_failures: u32) -> Duration {
1521        if consecutive_failures > 0 {
1522            /// helper function to calculate 2^x
1523            /// optimization using bit shifting
1524            const fn two_to_pow(x: u32) -> u32 {
1525                1 << x
1526            }
1527            // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1528            INITIAL_RECONNECT_DELAY
1529                .saturating_mul(two_to_pow(consecutive_failures - 1))
1530                .min(MAX_RECONNECT_DELAY)
1531        } else {
1532            // On zero consecutive failures, 0 delay
1533            Duration::from_millis(0)
1534        }
1535    }
1536
1537    async fn stream_height_events_with_retry(
1538        &self,
1539        tx: &mpsc::Sender<HeightStreamEvent>,
1540    ) -> Result<()> {
1541        let mut consecutive_failures = 0u32;
1542
1543        loop {
1544            // should always be able to creat a new es stream
1545            // something is wrong with the req builder otherwise
1546            let mut es = self.get_es_stream().context("get es stream")?;
1547
1548            let mut error = anyhow!("");
1549
1550            match Self::stream_height_events(&mut es, tx).await {
1551                Ok(received_an_event) => {
1552                    if received_an_event {
1553                        consecutive_failures = 0; // Reset after successful connection that then failed
1554                    }
1555                    log::trace!("Stream height exited");
1556                }
1557                Err(e) => {
1558                    log::trace!("Stream height failed: {e:?}");
1559                    error = e;
1560                }
1561            }
1562
1563            es.close();
1564
1565            // If the receiver is closed, exit the task
1566            if tx.is_closed() {
1567                break;
1568            }
1569
1570            let delay = Self::get_delay(consecutive_failures);
1571            log::trace!("Reconnecting in {:?}...", delay);
1572
1573            let error_msg = format!("{error:?}");
1574
1575            if tx
1576                .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1577                .await
1578                .is_err()
1579            {
1580                return Ok(()); // Receiver dropped, exit task
1581            }
1582            tokio::time::sleep(delay).await;
1583
1584            // increment consecutive failures so that on the next try
1585            // it will start using back offs
1586            consecutive_failures += 1;
1587        }
1588
1589        Ok(())
1590    }
1591
1592    /// Streams archive height updates from the server via Server-Sent Events.
1593    ///
1594    /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1595    /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1596    ///
1597    /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1598    /// and height updates. This allows applications to display connection status to users.
1599    ///
1600    /// # Returns
1601    /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1602    /// lifecycle and sends events through this channel.
1603    ///
1604    /// # Example
1605    /// ```
1606    /// # use hypersync_client::{Client, HeightStreamEvent};
1607    /// # async fn example() -> anyhow::Result<()> {
1608    /// let client = Client::builder()
1609    ///     .url("https://eth.hypersync.xyz")
1610    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1611    ///     .build()?;
1612    ///
1613    /// let mut rx = client.stream_height();
1614    ///
1615    /// while let Some(event) = rx.recv().await {
1616    ///     match event {
1617    ///         HeightStreamEvent::Connected => println!("Connected to stream"),
1618    ///         HeightStreamEvent::Height(h) => println!("Height: {}", h),
1619    ///         HeightStreamEvent::Reconnecting { delay, error_msg } => {
1620    ///             println!("Reconnecting in {delay:?} due to error: {error_msg}")
1621    ///         }
1622    ///     }
1623    /// }
1624    /// # Ok(())
1625    /// # }
1626    /// ```
1627    pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1628        let (tx, rx) = mpsc::channel(16);
1629        let client = self.clone();
1630
1631        tokio::spawn(async move {
1632            if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1633                log::error!("Stream height failed unexpectedly: {e:?}");
1634            }
1635        });
1636
1637        rx
1638    }
1639}
1640
1641fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1642    if config.event_signature.is_some() {
1643        return Err(anyhow!(
1644            "config.event_signature can't be passed to simple type function. User is expected to \
1645             decode the logs using Decoder."
1646        ));
1647    }
1648    if config.column_mapping.is_some() {
1649        return Err(anyhow!(
1650            "config.column_mapping can't be passed to single type function. User is expected to \
1651             map values manually."
1652        ));
1653    }
1654
1655    Ok(())
1656}
1657
1658#[cfg(test)]
1659mod tests {
1660    use super::*;
1661    #[test]
1662    fn test_get_delay() {
1663        assert_eq!(
1664            Client::get_delay(0),
1665            Duration::from_millis(0),
1666            "starts with 0 delay"
1667        );
1668        // powers of 2 backoff
1669        assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1670        assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1671        assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1672        // maxes out at 30s
1673        assert_eq!(
1674            Client::get_delay(9),
1675            Duration::from_secs(30),
1676            "max delay is 30s"
1677        );
1678        assert_eq!(
1679            Client::get_delay(10),
1680            Duration::from_secs(30),
1681            "max delay is 30s"
1682        );
1683    }
1684
1685    #[tokio::test]
1686    #[ignore = "integration test with live hs server for height stream"]
1687    async fn test_stream_height_events() -> anyhow::Result<()> {
1688        let (tx, mut rx) = mpsc::channel(16);
1689        let handle = tokio::spawn(async move {
1690            let client = Client::builder()
1691                .url("https://monad-testnet.hypersync.xyz")
1692                .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1693                .build()?;
1694            let mut es = client.get_es_stream().context("get es stream")?;
1695            Client::stream_height_events(&mut es, &tx).await
1696        });
1697
1698        let val = rx.recv().await;
1699        assert!(val.is_some());
1700        assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
1701        let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
1702            panic!("should have received height")
1703        };
1704        let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
1705            panic!("should have received height")
1706        };
1707        assert!(height2 > height);
1708        drop(rx);
1709
1710        let res = handle.await.expect("should have joined");
1711        let received_an_event =
1712            res.expect("should have ended the stream gracefully after dropping rx");
1713        assert!(received_an_event, "should have received an event");
1714
1715        Ok(())
1716    }
1717}