hypersync_client/
lib.rs

1#![deny(missing_docs)]
2//! # HyperSync Client
3//!
4//! A high-performance Rust client for the HyperSync protocol, enabling efficient retrieval
5//! of blockchain data including blocks, transactions, logs, and traces.
6//!
7//! ## Features
8//!
9//! - **High-performance streaming**: Parallel data fetching with automatic retries
10//! - **Flexible querying**: Rich query builder API for precise data selection
11//! - **Multiple data formats**: Support for Arrow, Parquet, and simple Rust types
12//! - **Event decoding**: Automatic ABI decoding for smart contract events
13//! - **Real-time updates**: Live height streaming via Server-Sent Events
14//! - **Production ready**: Built-in rate limiting, retries, and error handling
15//!
16//! ## Quick Start
17//!
18//! ```no_run
19//! use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//!     // Create a client for Ethereum mainnet
24//!     let client = Client::builder()
25//!         .chain_id(1)
26//!         .api_token(std::env::var("ENVIO_API_TOKEN")?)
27//!         .build()?;
28//!
29//!     // Query ERC20 transfer events from USDC contract
30//!     let query = Query::new()
31//!         .from_block(19000000)
32//!         .to_block_excl(19001000)
33//!         .where_logs(
34//!             LogFilter::all()
35//!                 .and_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
36//!                 .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
37//!         )
38//!         .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
39//!
40//!     // Get all data in one response
41//!     let response = client.get(&query).await?;
42//!     println!("Retrieved {} blocks", response.data.blocks.len());
43//!
44//!     // Or stream data for large ranges
45//!     let mut receiver = client.stream(query, StreamConfig::default()).await?;
46//!     while let Some(response) = receiver.recv().await {
47//!         let response = response?;
48//!         println!("Streaming: got blocks up to {}", response.next_block);
49//!     }
50//!
51//!     Ok(())
52//! }
53//! ```
54//!
55//! ## Main Types
56//!
57//! - [`Client`] - Main client for interacting with HyperSync servers
58//! - [`net_types::Query`] - Query builder for specifying what data to fetch
59//! - [`StreamConfig`] - Configuration for streaming operations
60//! - [`QueryResponse`] - Response containing blocks, transactions, logs, and traces
61//! - [`ArrowResponse`] - Response in Apache Arrow format for high-performance processing
62//!
63//! ## Authentication
64//!
65//! You'll need a HyperSync API token to access the service. Get one from
66//! [https://envio.dev/app/api-tokens](https://envio.dev/app/api-tokens).
67//!
68//! ## Examples
69//!
70//! See the `examples/` directory for more detailed usage patterns including:
71//! - ERC20 token transfers
72//! - Wallet transaction history  
73//! - Event decoding and filtering
74//! - Real-time data streaming
75use std::{sync::Arc, time::Duration};
76
77use anyhow::{anyhow, Context, Result};
78use futures::StreamExt;
79use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
80use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk};
81use reqwest::{header, Method};
82use reqwest_eventsource::retry::ExponentialBackoff;
83use reqwest_eventsource::{Event, EventSource};
84
85mod column_mapping;
86mod config;
87mod decode;
88mod decode_call;
89mod from_arrow;
90mod parquet_out;
91mod parse_response;
92pub mod preset_query;
93mod rayon_async;
94pub mod simple_types;
95mod stream;
96#[cfg(feature = "ethers")]
97pub mod to_ethers;
98mod types;
99mod util;
100
101pub use from_arrow::FromArrow;
102pub use hypersync_format as format;
103pub use hypersync_net_types as net_types;
104pub use hypersync_schema as schema;
105
106use parse_response::parse_query_response;
107use tokio::sync::mpsc;
108use types::{EventResponse, ResponseData};
109use url::Url;
110
111pub use column_mapping::{ColumnMapping, DataType};
112pub use config::HexOutput;
113pub use config::{ClientConfig, SerializationFormat, StreamConfig};
114pub use decode::Decoder;
115pub use decode_call::CallDecoder;
116pub use types::{ArrowBatch, ArrowResponse, ArrowResponseData, QueryResponse};
117
118use crate::parse_response::read_query_response;
119use crate::simple_types::InternalEventJoinStrategy;
120
121type ArrowChunk = Chunk<Box<dyn Array>>;
122
123/// Internal client state to handle http requests and retries.
124#[derive(Debug)]
125struct ClientInner {
126    /// Initialized reqwest instance for client url.
127    http_client: reqwest::Client,
128    /// HyperSync server URL.
129    url: Url,
130    /// HyperSync server api token.
131    api_token: String,
132    /// Number of retries to attempt before returning error.
133    max_num_retries: usize,
134    /// Milliseconds that would be used for retry backoff increasing.
135    retry_backoff_ms: u64,
136    /// Initial wait time for request backoff.
137    retry_base_ms: u64,
138    /// Ceiling time for request backoff.
139    retry_ceiling_ms: u64,
140    /// Query serialization format to use for HTTP requests.
141    serialization_format: SerializationFormat,
142}
143
144/// Client to handle http requests and retries.
145#[derive(Clone, Debug)]
146pub struct Client {
147    inner: Arc<ClientInner>,
148}
149
150impl Client {
151    /// Creates a new client with the given configuration.
152    ///
153    /// Configuration must include the `url` and `api_token` fields.
154    /// # Example
155    /// ```
156    /// use hypersync_client::{Client, ClientConfig};
157    ///
158    /// let config = ClientConfig {
159    ///     url: "https://eth.hypersync.xyz".to_string(),
160    ///     api_token: std::env::var("ENVIO_API_TOKEN")?,
161    ///     ..Default::default()
162    /// };
163    /// let client = Client::new(config)?;
164    /// # Ok::<(), anyhow::Error>(())
165    /// ```
166    ///
167    /// # Errors
168    /// This method fails if the config is invalid.
169    pub fn new(cfg: ClientConfig) -> Result<Self> {
170        // hscr stands for hypersync client rust
171        cfg.validate().context("invalid ClientConfig")?;
172        let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
173        Self::new_internal(cfg, user_agent)
174    }
175
176    /// Creates a new client builder.
177    ///
178    /// # Example
179    /// ```
180    /// use hypersync_client::Client;
181    ///
182    /// let client = Client::builder()
183    ///     .chain_id(1)
184    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
185    ///     .build()
186    ///     .unwrap();
187    /// ```
188    pub fn builder() -> ClientBuilder {
189        ClientBuilder::new()
190    }
191
192    #[doc(hidden)]
193    pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
194        // Creates a new client with the given configuration and custom user agent.
195        // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
196        Self::new_internal(cfg, user_agent.into())
197    }
198
199    /// Internal constructor that takes both config and user agent.
200    fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
201        let http_client = reqwest::Client::builder()
202            .no_gzip()
203            .timeout(Duration::from_millis(cfg.http_req_timeout_millis))
204            .user_agent(user_agent)
205            .build()
206            .unwrap();
207
208        let url = Url::parse(&cfg.url).context("url is malformed")?;
209
210        Ok(Self {
211            inner: Arc::new(ClientInner {
212                http_client,
213                url,
214                api_token: cfg.api_token,
215                max_num_retries: cfg.max_num_retries,
216                retry_backoff_ms: cfg.retry_backoff_ms,
217                retry_base_ms: cfg.retry_base_ms,
218                retry_ceiling_ms: cfg.retry_ceiling_ms,
219                serialization_format: cfg.serialization_format,
220            }),
221        })
222    }
223
224    /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
225    /// query and stream configuration.
226    ///
227    /// ### Implementation
228    /// Runs multiple queries simultaneously based on config.concurrency.
229    ///
230    /// Each query runs until it reaches query.to, server height, any max_num_* query param,
231    /// or execution timed out by server.
232    ///
233    /// # ⚠️ Important Warning
234    ///
235    /// This method will continue executing until the query has run to completion from beginning
236    /// to the end of the block range defined in the query. For heavy queries with large block
237    /// ranges or high data volumes, consider:
238    ///
239    /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
240    /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
241    /// - Break large queries into smaller block ranges
242    ///
243    /// # Example
244    /// ```no_run
245    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
246    ///
247    /// # async fn example() -> anyhow::Result<()> {
248    /// let client = Client::builder()
249    ///     .chain_id(1)
250    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
251    ///     .build()?;
252    ///
253    /// // Query ERC20 transfer events
254    /// let query = Query::new()
255    ///     .from_block(19000000)
256    ///     .to_block_excl(19000010)
257    ///     .where_logs(
258    ///         LogFilter::all()
259    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
260    ///     )
261    ///     .select_log_fields([LogField::Address, LogField::Data]);
262    /// let response = client.collect(query, StreamConfig::default()).await?;
263    ///
264    /// println!("Collected {} events", response.data.logs.len());
265    /// # Ok(())
266    /// # }
267    /// ```
268    pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
269        check_simple_stream_params(&config)?;
270
271        let mut recv = stream::stream_arrow(self, query, config)
272            .await
273            .context("start stream")?;
274
275        let mut data = ResponseData::default();
276        let mut archive_height = None;
277        let mut next_block = 0;
278        let mut total_execution_time = 0;
279
280        while let Some(res) = recv.recv().await {
281            let res = res.context("get response")?;
282            let res: QueryResponse = QueryResponse::from(&res);
283
284            for batch in res.data.blocks {
285                data.blocks.push(batch);
286            }
287            for batch in res.data.transactions {
288                data.transactions.push(batch);
289            }
290            for batch in res.data.logs {
291                data.logs.push(batch);
292            }
293            for batch in res.data.traces {
294                data.traces.push(batch);
295            }
296
297            archive_height = res.archive_height;
298            next_block = res.next_block;
299            total_execution_time += res.total_execution_time
300        }
301
302        Ok(QueryResponse {
303            archive_height,
304            next_block,
305            total_execution_time,
306            data,
307            rollback_guard: None,
308        })
309    }
310
311    /// Retrieves events through a stream using the provided query and stream configuration.
312    ///
313    /// # ⚠️ Important Warning
314    ///
315    /// This method will continue executing until the query has run to completion from beginning
316    /// to the end of the block range defined in the query. For heavy queries with large block
317    /// ranges or high data volumes, consider:
318    ///
319    /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
320    /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
321    /// - Break large queries into smaller block ranges
322    ///
323    /// # Example
324    /// ```no_run
325    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
326    ///
327    /// # async fn example() -> anyhow::Result<()> {
328    /// let client = Client::builder()
329    ///     .chain_id(1)
330    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
331    ///     .build()?;
332    ///
333    /// // Query transactions to a specific address
334    /// let query = Query::new()
335    ///     .from_block(19000000)
336    ///     .to_block_excl(19000100)
337    ///     .where_transactions(
338    ///         TransactionFilter::all()
339    ///             .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
340    ///     )
341    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
342    /// let response = client.collect_events(query, StreamConfig::default()).await?;
343    ///
344    /// println!("Collected {} events", response.data.len());
345    /// # Ok(())
346    /// # }
347    /// ```
348    pub async fn collect_events(
349        &self,
350        mut query: Query,
351        config: StreamConfig,
352    ) -> Result<EventResponse> {
353        check_simple_stream_params(&config)?;
354
355        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
356        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
357
358        let mut recv = stream::stream_arrow(self, query, config)
359            .await
360            .context("start stream")?;
361
362        let mut data = Vec::new();
363        let mut archive_height = None;
364        let mut next_block = 0;
365        let mut total_execution_time = 0;
366
367        while let Some(res) = recv.recv().await {
368            let res = res.context("get response")?;
369            let res: QueryResponse = QueryResponse::from(&res);
370            let events = event_join_strategy.join_from_response_data(res.data);
371
372            data.extend(events);
373
374            archive_height = res.archive_height;
375            next_block = res.next_block;
376            total_execution_time += res.total_execution_time
377        }
378
379        Ok(EventResponse {
380            archive_height,
381            next_block,
382            total_execution_time,
383            data,
384            rollback_guard: None,
385        })
386    }
387
388    /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
389    /// the provided query and stream configuration.
390    ///
391    /// Returns data in Apache Arrow format for high-performance columnar processing.
392    /// Useful for analytics workloads or when working with Arrow-compatible tools.
393    ///
394    /// # ⚠️ Important Warning
395    ///
396    /// This method will continue executing until the query has run to completion from beginning
397    /// to the end of the block range defined in the query. For heavy queries with large block
398    /// ranges or high data volumes, consider:
399    ///
400    /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
401    /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
402    /// - Break large queries into smaller block ranges
403    ///
404    /// # Example
405    /// ```no_run
406    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
407    ///
408    /// # async fn example() -> anyhow::Result<()> {
409    /// let client = Client::builder()
410    ///     .chain_id(1)
411    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
412    ///     .build()?;
413    ///
414    /// // Get block data in Arrow format for analytics
415    /// let query = Query::new()
416    ///     .from_block(19000000)
417    ///     .to_block_excl(19000100)
418    ///     .include_all_blocks()
419    ///     .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
420    /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
421    ///
422    /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
423    /// # Ok(())
424    /// # }
425    /// ```
426    pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
427        let mut recv = stream::stream_arrow(self, query, config)
428            .await
429            .context("start stream")?;
430
431        let mut data = ArrowResponseData::default();
432        let mut archive_height = None;
433        let mut next_block = 0;
434        let mut total_execution_time = 0;
435
436        while let Some(res) = recv.recv().await {
437            let res = res.context("get response")?;
438
439            for batch in res.data.blocks {
440                data.blocks.push(batch);
441            }
442            for batch in res.data.transactions {
443                data.transactions.push(batch);
444            }
445            for batch in res.data.logs {
446                data.logs.push(batch);
447            }
448            for batch in res.data.traces {
449                data.traces.push(batch);
450            }
451            for batch in res.data.decoded_logs {
452                data.decoded_logs.push(batch);
453            }
454
455            archive_height = res.archive_height;
456            next_block = res.next_block;
457            total_execution_time += res.total_execution_time
458        }
459
460        Ok(ArrowResponse {
461            archive_height,
462            next_block,
463            total_execution_time,
464            data,
465            rollback_guard: None,
466        })
467    }
468
469    /// Writes parquet file getting data through a stream using the provided path, query,
470    /// and stream configuration.
471    ///
472    /// Streams data directly to a Parquet file for efficient storage and later analysis.
473    /// Perfect for data exports or ETL pipelines.
474    ///
475    /// # ⚠️ Important Warning
476    ///
477    /// This method will continue executing until the query has run to completion from beginning
478    /// to the end of the block range defined in the query. For heavy queries with large block
479    /// ranges or high data volumes, consider:
480    ///
481    /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
482    /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
483    /// - Break large queries into smaller block ranges
484    ///
485    /// # Example
486    /// ```no_run
487    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
488    ///
489    /// # async fn example() -> anyhow::Result<()> {
490    /// let client = Client::builder()
491    ///     .chain_id(1)
492    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
493    ///     .build()?;
494    ///
495    /// // Export all DEX trades to Parquet for analysis
496    /// let query = Query::new()
497    ///     .from_block(19000000)
498    ///     .to_block_excl(19010000)
499    ///     .where_logs(
500    ///         LogFilter::all()
501    ///             .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
502    ///     )
503    ///     .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
504    /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
505    ///
506    /// println!("Trade data exported to trades.parquet");
507    /// # Ok(())
508    /// # }
509    /// ```
510    pub async fn collect_parquet(
511        &self,
512        path: &str,
513        query: Query,
514        config: StreamConfig,
515    ) -> Result<()> {
516        parquet_out::collect_parquet(self, path, query, config).await
517    }
518
519    /// Internal implementation of getting chain_id from server
520    async fn get_chain_id_impl(&self) -> Result<u64> {
521        let mut url = self.inner.url.clone();
522        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
523        segments.push("chain_id");
524        std::mem::drop(segments);
525        let req = self
526            .inner
527            .http_client
528            .request(Method::GET, url)
529            .bearer_auth(&self.inner.api_token);
530
531        let res = req.send().await.context("execute http req")?;
532
533        let status = res.status();
534        if !status.is_success() {
535            return Err(anyhow!("http response status code {}", status));
536        }
537
538        let chain_id: ChainId = res.json().await.context("read response body json")?;
539
540        Ok(chain_id.chain_id)
541    }
542
543    /// Internal implementation of getting height from server
544    async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
545        let mut url = self.inner.url.clone();
546        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
547        segments.push("height");
548        std::mem::drop(segments);
549        let mut req = self
550            .inner
551            .http_client
552            .request(Method::GET, url)
553            .bearer_auth(&self.inner.api_token);
554
555        if let Some(http_timeout_override) = http_timeout_override {
556            req = req.timeout(http_timeout_override);
557        }
558
559        let res = req.send().await.context("execute http req")?;
560
561        let status = res.status();
562        if !status.is_success() {
563            return Err(anyhow!("http response status code {}", status));
564        }
565
566        let height: ArchiveHeight = res.json().await.context("read response body json")?;
567
568        Ok(height.height.unwrap_or(0))
569    }
570
571    /// Get the chain_id from the server with retries.
572    ///
573    /// # Example
574    /// ```no_run
575    /// use hypersync_client::Client;
576    ///
577    /// # async fn example() -> anyhow::Result<()> {
578    /// let client = Client::builder()
579    ///     .chain_id(1)
580    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
581    ///     .build()?;
582    ///
583    /// let chain_id = client.get_chain_id().await?;
584    /// println!("Connected to chain ID: {}", chain_id);
585    /// # Ok(())
586    /// # }
587    /// ```
588    pub async fn get_chain_id(&self) -> Result<u64> {
589        let mut base = self.inner.retry_base_ms;
590
591        let mut err = anyhow!("");
592
593        for _ in 0..self.inner.max_num_retries + 1 {
594            match self.get_chain_id_impl().await {
595                Ok(res) => return Ok(res),
596                Err(e) => {
597                    log::error!(
598                        "failed to get chain_id from server, retrying... The error was: {e:?}"
599                    );
600                    err = err.context(format!("{e:?}"));
601                }
602            }
603
604            let base_ms = Duration::from_millis(base);
605            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
606                rand::random(),
607                self.inner.retry_backoff_ms,
608            ));
609
610            tokio::time::sleep(base_ms + jitter).await;
611
612            base = std::cmp::min(
613                base + self.inner.retry_backoff_ms,
614                self.inner.retry_ceiling_ms,
615            );
616        }
617
618        Err(err)
619    }
620
621    /// Get the height of from server with retries.
622    ///
623    /// # Example
624    /// ```no_run
625    /// use hypersync_client::Client;
626    ///
627    /// # async fn example() -> anyhow::Result<()> {
628    /// let client = Client::builder()
629    ///     .chain_id(1)
630    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
631    ///     .build()?;
632    ///
633    /// let height = client.get_height().await?;
634    /// println!("Current block height: {}", height);
635    /// # Ok(())
636    /// # }
637    /// ```
638    pub async fn get_height(&self) -> Result<u64> {
639        let mut base = self.inner.retry_base_ms;
640
641        let mut err = anyhow!("");
642
643        for _ in 0..self.inner.max_num_retries + 1 {
644            match self.get_height_impl(None).await {
645                Ok(res) => return Ok(res),
646                Err(e) => {
647                    log::error!(
648                        "failed to get height from server, retrying... The error was: {e:?}"
649                    );
650                    err = err.context(format!("{e:?}"));
651                }
652            }
653
654            let base_ms = Duration::from_millis(base);
655            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
656                rand::random(),
657                self.inner.retry_backoff_ms,
658            ));
659
660            tokio::time::sleep(base_ms + jitter).await;
661
662            base = std::cmp::min(
663                base + self.inner.retry_backoff_ms,
664                self.inner.retry_ceiling_ms,
665            );
666        }
667
668        Err(err)
669    }
670
671    /// Get the height of the Client instance for health checks.
672    ///
673    /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
674    ///
675    /// # Example
676    /// ```no_run
677    /// use hypersync_client::Client;
678    /// use std::time::Duration;
679    ///
680    /// # async fn example() -> anyhow::Result<()> {
681    /// let client = Client::builder()
682    ///     .chain_id(1)
683    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
684    ///     .build()?;
685    ///
686    /// // Quick health check with 5 second timeout
687    /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
688    /// println!("Server is healthy at block: {}", height);
689    /// # Ok(())
690    /// # }
691    /// ```
692    pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
693        self.get_height_impl(http_req_timeout).await
694    }
695
696    /// Executes query with retries and returns the response.
697    ///
698    /// # Example
699    /// ```no_run
700    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
701    ///
702    /// # async fn example() -> anyhow::Result<()> {
703    /// let client = Client::builder()
704    ///     .chain_id(1)
705    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
706    ///     .build()?;
707    ///
708    /// // Query all blocks from a specific range
709    /// let query = Query::new()
710    ///     .from_block(19000000)
711    ///     .to_block_excl(19000010)
712    ///     .include_all_blocks()
713    ///     .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
714    /// let response = client.get(&query).await?;
715    ///
716    /// println!("Retrieved {} blocks", response.data.blocks.len());
717    /// # Ok(())
718    /// # }
719    /// ```
720    pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
721        let arrow_response = self.get_arrow(query).await.context("get data")?;
722        Ok(QueryResponse::from(&arrow_response))
723    }
724
725    /// Add block, transaction and log fields selection to the query, executes it with retries
726    /// and returns the response.
727    ///
728    /// This method automatically joins blocks, transactions, and logs into unified events,
729    /// making it easier to work with related blockchain data.
730    ///
731    /// # Example
732    /// ```no_run
733    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
734    ///
735    /// # async fn example() -> anyhow::Result<()> {
736    /// let client = Client::builder()
737    ///     .chain_id(1)
738    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
739    ///     .build()?;
740    ///
741    /// // Query ERC20 transfers with transaction context
742    /// let query = Query::new()
743    ///     .from_block(19000000)
744    ///     .to_block_excl(19000010)
745    ///     .where_logs(
746    ///         LogFilter::all()
747    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
748    ///     )
749    ///     .select_log_fields([LogField::Address, LogField::Data])
750    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
751    /// let response = client.get_events(query).await?;
752    ///
753    /// println!("Retrieved {} joined events", response.data.len());
754    /// # Ok(())
755    /// # }
756    /// ```
757    pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
758        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
759        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
760        let arrow_response = self.get_arrow(&query).await.context("get data")?;
761        Ok(EventResponse::from_arrow_response(
762            &arrow_response,
763            &event_join_strategy,
764        ))
765    }
766
767    /// Executes query once and returns the result in (Arrow, size) format using JSON serialization.
768    async fn get_arrow_impl_json(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
769        let mut url = self.inner.url.clone();
770        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
771        segments.push("query");
772        segments.push("arrow-ipc");
773        std::mem::drop(segments);
774        let req = self
775            .inner
776            .http_client
777            .request(Method::POST, url)
778            .bearer_auth(&self.inner.api_token);
779
780        let res = req.json(&query).send().await.context("execute http req")?;
781
782        let status = res.status();
783        if !status.is_success() {
784            let text = res.text().await.context("read text to see error")?;
785
786            return Err(anyhow!(
787                "http response status code {}, err body: {}",
788                status,
789                text
790            ));
791        }
792
793        let bytes = res.bytes().await.context("read response body bytes")?;
794
795        let res = tokio::task::block_in_place(|| {
796            parse_query_response(&bytes).context("parse query response")
797        })?;
798
799        Ok((res, bytes.len().try_into().unwrap()))
800    }
801
802    fn should_cache_queries(&self) -> bool {
803        matches!(
804            self.inner.serialization_format,
805            SerializationFormat::CapnProto {
806                should_cache_queries: true
807            }
808        )
809    }
810
811    /// Executes query once and returns the result in (Arrow, size) format using Cap'n Proto serialization.
812    async fn get_arrow_impl_capnp(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
813        let mut url = self.inner.url.clone();
814        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
815        segments.push("query");
816        segments.push("arrow-ipc");
817        segments.push("capnp");
818        std::mem::drop(segments);
819
820        let should_cache = self.should_cache_queries();
821
822        if should_cache {
823            let query_with_id = {
824                let mut message = capnp::message::Builder::new_default();
825                let mut request_builder =
826                    message.init_root::<hypersync_net_types_capnp::request::Builder>();
827
828                request_builder.build_query_id_from_query(query)?;
829                let mut query_with_id = Vec::new();
830                capnp::serialize_packed::write_message(&mut query_with_id, &message)?;
831                query_with_id
832            };
833
834            let mut req = self
835                .inner
836                .http_client
837                .request(Method::POST, url.clone())
838                .bearer_auth(&self.inner.api_token);
839            req = req.header("content-type", "application/x-capnp");
840
841            let res = req
842                .body(query_with_id)
843                .send()
844                .await
845                .context("execute http req")?;
846
847            let status = res.status();
848            if status.is_success() {
849                let bytes = res.bytes().await.context("read response body bytes")?;
850
851                let mut opts = capnp::message::ReaderOptions::new();
852                opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
853                let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
854                    .context("create message reader")?;
855                let query_response = message_reader
856                    .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
857                    .context("get cached_query_response root")?;
858                match query_response.get_either().which()? {
859                hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
860                    query_response,
861                ) => {
862                    let res = tokio::task::block_in_place(|| {
863                        let res = query_response?;
864                        read_query_response(&res).context("parse query response cached")
865                    })?;
866                    return Ok((res, bytes.len().try_into().unwrap()));
867                }
868                hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
869                    log::trace!("query was not cached, retrying with full query");
870                }
871            }
872            } else {
873                let text = res.text().await.context("read text to see error")?;
874                log::warn!(
875                    "Failed cache query, will retry full query. {}, err body: {}",
876                    status,
877                    text
878                );
879            }
880        };
881
882        let full_query_bytes = {
883            let mut message = capnp::message::Builder::new_default();
884            let mut query_builder =
885                message.init_root::<hypersync_net_types_capnp::request::Builder>();
886
887            query_builder.build_full_query_from_query(query, should_cache)?;
888            let mut bytes = Vec::new();
889            capnp::serialize_packed::write_message(&mut bytes, &message)?;
890            bytes
891        };
892
893        let mut req = self
894            .inner
895            .http_client
896            .request(Method::POST, url)
897            .bearer_auth(&self.inner.api_token);
898        req = req.header("content-type", "application/x-capnp");
899
900        let res = req
901            .header("content-type", "application/x-capnp")
902            .body(full_query_bytes)
903            .send()
904            .await
905            .context("execute http req")?;
906
907        let status = res.status();
908        if !status.is_success() {
909            let text = res.text().await.context("read text to see error")?;
910
911            return Err(anyhow!(
912                "http response status code {}, err body: {}",
913                status,
914                text
915            ));
916        }
917
918        let bytes = res.bytes().await.context("read response body bytes")?;
919
920        let res = tokio::task::block_in_place(|| {
921            parse_query_response(&bytes).context("parse query response")
922        })?;
923
924        Ok((res, bytes.len().try_into().unwrap()))
925    }
926
927    /// Executes query once and returns the result in (Arrow, size) format.
928    async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
929        match self.inner.serialization_format {
930            SerializationFormat::Json => self.get_arrow_impl_json(query).await,
931            SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(query).await,
932        }
933    }
934
935    /// Executes query with retries and returns the response in Arrow format.
936    pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
937        self.get_arrow_with_size(query).await.map(|res| res.0)
938    }
939
940    /// Internal implementation for get_arrow.
941    async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
942        let mut base = self.inner.retry_base_ms;
943
944        let mut err = anyhow!("");
945
946        for _ in 0..self.inner.max_num_retries + 1 {
947            match self.get_arrow_impl(query).await {
948                Ok(res) => return Ok(res),
949                Err(e) => {
950                    log::error!(
951                        "failed to get arrow data from server, retrying... The error was: {e:?}"
952                    );
953                    err = err.context(format!("{e:?}"));
954                }
955            }
956
957            let base_ms = Duration::from_millis(base);
958            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
959                rand::random(),
960                self.inner.retry_backoff_ms,
961            ));
962
963            tokio::time::sleep(base_ms + jitter).await;
964
965            base = std::cmp::min(
966                base + self.inner.retry_backoff_ms,
967                self.inner.retry_ceiling_ms,
968            );
969        }
970
971        Err(err)
972    }
973
974    /// Spawns task to execute query and return data via a channel.
975    ///
976    /// # Example
977    /// ```no_run
978    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
979    ///
980    /// # async fn example() -> anyhow::Result<()> {
981    /// let client = Client::builder()
982    ///     .chain_id(1)
983    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
984    ///     .build()?;
985    ///
986    /// // Stream all ERC20 transfer events
987    /// let query = Query::new()
988    ///     .from_block(19000000)
989    ///     .where_logs(
990    ///         LogFilter::all()
991    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
992    ///     )
993    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
994    /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
995    ///
996    /// while let Some(response) = receiver.recv().await {
997    ///     let response = response?;
998    ///     println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
999    /// }
1000    /// # Ok(())
1001    /// # }
1002    /// ```
1003    pub async fn stream(
1004        &self,
1005        query: Query,
1006        config: StreamConfig,
1007    ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1008        check_simple_stream_params(&config)?;
1009
1010        let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1011            mpsc::channel(config.concurrency);
1012
1013        let mut inner_rx = self
1014            .stream_arrow(query, config)
1015            .await
1016            .context("start inner stream")?;
1017
1018        tokio::spawn(async move {
1019            while let Some(resp) = inner_rx.recv().await {
1020                let is_err = resp.is_err();
1021                if tx
1022                    .send(resp.map(|r| QueryResponse::from(&r)))
1023                    .await
1024                    .is_err()
1025                    || is_err
1026                {
1027                    return;
1028                }
1029            }
1030        });
1031
1032        Ok(rx)
1033    }
1034
1035    /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1036    /// returning data via a channel.
1037    ///
1038    /// This method automatically joins blocks, transactions, and logs into unified events,
1039    /// then streams them via a channel for real-time processing.
1040    ///
1041    /// # Example
1042    /// ```no_run
1043    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1044    ///
1045    /// # async fn example() -> anyhow::Result<()> {
1046    /// let client = Client::builder()
1047    ///     .chain_id(1)
1048    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1049    ///     .build()?;
1050    ///
1051    /// // Stream NFT transfer events with transaction context
1052    /// let query = Query::new()
1053    ///     .from_block(19000000)
1054    ///     .where_logs(
1055    ///         LogFilter::all()
1056    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1057    ///     )
1058    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1059    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1060    /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1061    ///
1062    /// while let Some(response) = receiver.recv().await {
1063    ///     let response = response?;
1064    ///     println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1065    /// }
1066    /// # Ok(())
1067    /// # }
1068    /// ```
1069    pub async fn stream_events(
1070        &self,
1071        mut query: Query,
1072        config: StreamConfig,
1073    ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1074        check_simple_stream_params(&config)?;
1075
1076        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1077
1078        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1079
1080        let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1081            mpsc::channel(config.concurrency);
1082
1083        let mut inner_rx = self
1084            .stream_arrow(query, config)
1085            .await
1086            .context("start inner stream")?;
1087
1088        tokio::spawn(async move {
1089            while let Some(resp) = inner_rx.recv().await {
1090                let is_err = resp.is_err();
1091                if tx
1092                    .send(
1093                        resp.map(|r| EventResponse::from_arrow_response(&r, &event_join_strategy)),
1094                    )
1095                    .await
1096                    .is_err()
1097                    || is_err
1098                {
1099                    return;
1100                }
1101            }
1102        });
1103
1104        Ok(rx)
1105    }
1106
1107    /// Spawns task to execute query and return data via a channel in Arrow format.
1108    ///
1109    /// Returns raw Apache Arrow data via a channel for high-performance processing.
1110    /// Ideal for applications that need to work directly with columnar data.
1111    ///
1112    /// # Example
1113    /// ```no_run
1114    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1115    ///
1116    /// # async fn example() -> anyhow::Result<()> {
1117    /// let client = Client::builder()
1118    ///     .chain_id(1)
1119    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1120    ///     .build()?;
1121    ///
1122    /// // Stream transaction data in Arrow format for analytics
1123    /// let query = Query::new()
1124    ///     .from_block(19000000)
1125    ///     .to_block_excl(19000100)
1126    ///     .where_transactions(
1127    ///         TransactionFilter::all()
1128    ///             .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1129    ///     )
1130    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1131    /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1132    ///
1133    /// while let Some(response) = receiver.recv().await {
1134    ///     let response = response?;
1135    ///     println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1136    /// }
1137    /// # Ok(())
1138    /// # }
1139    /// ```
1140    pub async fn stream_arrow(
1141        &self,
1142        query: Query,
1143        config: StreamConfig,
1144    ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1145        stream::stream_arrow(self, query, config).await
1146    }
1147
1148    /// Getter for url field.
1149    ///
1150    /// # Example
1151    /// ```
1152    /// use hypersync_client::Client;
1153    ///
1154    /// let client = Client::builder()
1155    ///     .chain_id(1)
1156    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1157    ///     .build()
1158    ///     .unwrap();
1159    ///
1160    /// println!("Client URL: {}", client.url());
1161    /// ```
1162    pub fn url(&self) -> &Url {
1163        &self.inner.url
1164    }
1165}
1166
1167/// Builder for creating a hypersync client with configuration options.
1168///
1169/// This builder provides a fluent API for configuring client settings like URL,
1170/// authentication, timeouts, and retry behavior.
1171///
1172/// # Example
1173/// ```
1174/// use hypersync_client::{Client, SerializationFormat};
1175///
1176/// let client = Client::builder()
1177///     .chain_id(1)
1178///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1179///     .http_req_timeout_millis(30000)
1180///     .max_num_retries(3)
1181///     .build()
1182///     .unwrap();
1183/// ```
1184#[derive(Debug, Clone, Default)]
1185pub struct ClientBuilder(ClientConfig);
1186
1187impl ClientBuilder {
1188    /// Creates a new ClientBuilder with default configuration.
1189    pub fn new() -> Self {
1190        Self::default()
1191    }
1192
1193    /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1194    ///
1195    /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1196    ///
1197    /// # Arguments
1198    /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1199    ///
1200    /// # Example
1201    /// ```
1202    /// use hypersync_client::Client;
1203    ///
1204    /// let client = Client::builder()
1205    ///     .chain_id(1) // Ethereum mainnet
1206    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1207    ///     .build()
1208    ///     .unwrap();
1209    /// ```
1210    pub fn chain_id(mut self, chain_id: u64) -> Self {
1211        self.0.url = format!("https://{chain_id}.hypersync.xyz");
1212        self
1213    }
1214
1215    /// Sets a custom URL for the hypersync server.
1216    ///
1217    /// Use this method when you need to connect to a custom hypersync endpoint
1218    /// instead of the default public endpoints.
1219    ///
1220    /// # Arguments
1221    /// * `url` - The hypersync server URL
1222    ///
1223    /// # Example
1224    /// ```
1225    /// use hypersync_client::Client;
1226    ///
1227    /// let client = Client::builder()
1228    ///     .url("https://my-custom-hypersync.example.com")
1229    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1230    ///     .build()
1231    ///     .unwrap();
1232    /// ```
1233    pub fn url<S: ToString>(mut self, url: S) -> Self {
1234        self.0.url = url.to_string();
1235        self
1236    }
1237
1238    /// Sets the api token for authentication.
1239    ///
1240    /// Required for accessing authenticated hypersync endpoints.
1241    ///
1242    /// # Arguments
1243    /// * `api_token` - The authentication token
1244    ///
1245    /// # Example
1246    /// ```
1247    /// use hypersync_client::Client;
1248    ///
1249    /// let client = Client::builder()
1250    ///     .chain_id(1)
1251    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1252    ///     .build()
1253    ///     .unwrap();
1254    /// ```
1255    pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1256        self.0.api_token = api_token.to_string();
1257        self
1258    }
1259
1260    /// Sets the HTTP request timeout in milliseconds.
1261    ///
1262    /// # Arguments
1263    /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1264    ///
1265    /// # Example
1266    /// ```
1267    /// use hypersync_client::Client;
1268    ///
1269    /// let client = Client::builder()
1270    ///     .chain_id(1)
1271    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1272    ///     .http_req_timeout_millis(60000) // 60 second timeout
1273    ///     .build()
1274    ///     .unwrap();
1275    /// ```
1276    pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1277        self.0.http_req_timeout_millis = http_req_timeout_millis;
1278        self
1279    }
1280
1281    /// Sets the maximum number of retries for failed requests.
1282    ///
1283    /// # Arguments
1284    /// * `max_num_retries` - Maximum number of retries (default: 10)
1285    ///
1286    /// # Example
1287    /// ```
1288    /// use hypersync_client::Client;
1289    ///
1290    /// let client = Client::builder()
1291    ///     .chain_id(1)
1292    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1293    ///     .max_num_retries(5)
1294    ///     .build()
1295    ///     .unwrap();
1296    /// ```
1297    pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1298        self.0.max_num_retries = max_num_retries;
1299        self
1300    }
1301
1302    /// Sets the backoff increment for retry delays.
1303    ///
1304    /// This value is added to the base delay on each retry attempt.
1305    ///
1306    /// # Arguments
1307    /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1308    ///
1309    /// # Example
1310    /// ```
1311    /// use hypersync_client::Client;
1312    ///
1313    /// let client = Client::builder()
1314    ///     .chain_id(1)
1315    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1316    ///     .retry_backoff_ms(1000) // 1 second backoff increment
1317    ///     .build()
1318    ///     .unwrap();
1319    /// ```
1320    pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1321        self.0.retry_backoff_ms = retry_backoff_ms;
1322        self
1323    }
1324
1325    /// Sets the initial delay for retry attempts.
1326    ///
1327    /// # Arguments
1328    /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1329    ///
1330    /// # Example
1331    /// ```
1332    /// use hypersync_client::Client;
1333    ///
1334    /// let client = Client::builder()
1335    ///     .chain_id(1)
1336    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1337    ///     .retry_base_ms(1000) // Start with 1 second delay
1338    ///     .build()
1339    ///     .unwrap();
1340    /// ```
1341    pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1342        self.0.retry_base_ms = retry_base_ms;
1343        self
1344    }
1345
1346    /// Sets the maximum delay for retry attempts.
1347    ///
1348    /// The retry delay will not exceed this value, even with backoff increments.
1349    ///
1350    /// # Arguments
1351    /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1352    ///
1353    /// # Example
1354    /// ```
1355    /// use hypersync_client::Client;
1356    ///
1357    /// let client = Client::builder()
1358    ///     .chain_id(1)
1359    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1360    ///     .retry_ceiling_ms(30000) // Cap at 30 seconds
1361    ///     .build()
1362    ///     .unwrap();
1363    /// ```
1364    pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1365        self.0.retry_ceiling_ms = retry_ceiling_ms;
1366        self
1367    }
1368
1369    /// Sets the serialization format for client-server communication.
1370    ///
1371    /// # Arguments
1372    /// * `serialization_format` - The format to use (JSON or CapnProto)
1373    ///
1374    /// # Example
1375    /// ```
1376    /// use hypersync_client::{Client, SerializationFormat};
1377    ///
1378    /// let client = Client::builder()
1379    ///     .chain_id(1)
1380    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1381    ///     .serialization_format(SerializationFormat::Json)
1382    ///     .build()
1383    ///     .unwrap();
1384    /// ```
1385    pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1386        self.0.serialization_format = serialization_format;
1387        self
1388    }
1389
1390    /// Builds the client with the configured settings.
1391    ///
1392    /// # Returns
1393    /// * `Result<Client>` - The configured client or an error if configuration is invalid
1394    ///
1395    /// # Errors
1396    /// Returns an error if:
1397    /// * The URL is malformed
1398    /// * Required configuration is missing
1399    ///
1400    /// # Example
1401    /// ```
1402    /// use hypersync_client::Client;
1403    ///
1404    /// let client = Client::builder()
1405    ///     .chain_id(1)
1406    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1407    ///     .build()
1408    ///     .unwrap();
1409    /// ```
1410    pub fn build(self) -> Result<Client> {
1411        if self.0.url.is_empty() {
1412            anyhow::bail!(
1413                "endpoint needs to be set, try using builder.chain_id(1) or\
1414                builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1415            )
1416        }
1417        Client::new(self.0)
1418    }
1419}
1420
1421/// 200ms
1422const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1423const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1424/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1425/// so we timeout after 15s (3x the ping interval).
1426const READ_TIMEOUT: Duration = Duration::from_secs(15);
1427
1428/// Events emitted by the height stream.
1429#[derive(Debug, Clone, PartialEq, Eq)]
1430pub enum HeightStreamEvent {
1431    /// Successfully connected or reconnected to the SSE stream.
1432    Connected,
1433    /// Received a height update from the server.
1434    Height(u64),
1435    /// Connection lost, will attempt to reconnect after the specified delay.
1436    Reconnecting {
1437        /// Duration to wait before attempting reconnection.
1438        delay: Duration,
1439    },
1440}
1441
1442enum InternalStreamEvent {
1443    Publish(HeightStreamEvent),
1444    Ping,
1445    Unknown(String),
1446}
1447
1448impl Client {
1449    fn get_es_stream(&self) -> Result<EventSource> {
1450        // Build the GET /height/sse request
1451        let mut url = self.inner.url.clone();
1452        url.path_segments_mut()
1453            .ok()
1454            .context("invalid base URL")?
1455            .extend(&["height", "sse"]);
1456
1457        let req = self
1458            .inner
1459            .http_client
1460            .get(url)
1461            .header(header::ACCEPT, "text/event-stream")
1462            .bearer_auth(&self.inner.api_token);
1463
1464        // Configure exponential backoff for library-level retries
1465        let retry_policy = ExponentialBackoff::new(
1466            INITIAL_RECONNECT_DELAY,
1467            2.0,
1468            Some(MAX_RECONNECT_DELAY),
1469            None, // unlimited retries
1470        );
1471
1472        // Turn the request into an EventSource stream with retries
1473        let mut es = reqwest_eventsource::EventSource::new(req)
1474            .context("unexpected error creating EventSource")?;
1475        es.set_retry_policy(Box::new(retry_policy));
1476        Ok(es)
1477    }
1478
1479    async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1480        let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1481            .await
1482            .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1483        else {
1484            return Ok(None);
1485        };
1486
1487        let e = match res.context("failed response")? {
1488            Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1489            Event::Message(event) => match event.event.as_str() {
1490                "height" => {
1491                    let height = event
1492                        .data
1493                        .trim()
1494                        .parse::<u64>()
1495                        .context("parsing height from event data")?;
1496                    InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1497                }
1498                "ping" => InternalStreamEvent::Ping,
1499                _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1500            },
1501        };
1502
1503        Ok(Some(e))
1504    }
1505
1506    async fn stream_height_events(
1507        es: &mut EventSource,
1508        tx: &mpsc::Sender<HeightStreamEvent>,
1509    ) -> Result<bool> {
1510        let mut received_an_event = false;
1511        while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1512            match event {
1513                InternalStreamEvent::Publish(event) => {
1514                    received_an_event = true;
1515                    if tx.send(event).await.is_err() {
1516                        return Ok(received_an_event); // Receiver dropped, exit task
1517                    }
1518                }
1519                InternalStreamEvent::Ping => (), // ignore pings
1520                InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1521            }
1522        }
1523        Ok(received_an_event)
1524    }
1525
1526    fn get_delay(consecutive_failures: u32) -> Duration {
1527        if consecutive_failures > 0 {
1528            /// helper function to calculate 2^x
1529            /// optimization using bit shifting
1530            const fn two_to_pow(x: u32) -> u32 {
1531                1 << x
1532            }
1533            // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1534            INITIAL_RECONNECT_DELAY
1535                .saturating_mul(two_to_pow(consecutive_failures - 1))
1536                .min(MAX_RECONNECT_DELAY)
1537        } else {
1538            // On zero consecutive failures, 0 delay
1539            Duration::from_millis(0)
1540        }
1541    }
1542
1543    async fn stream_height_events_with_retry(
1544        &self,
1545        tx: &mpsc::Sender<HeightStreamEvent>,
1546    ) -> Result<()> {
1547        let mut consecutive_failures = 0u32;
1548
1549        loop {
1550            // should always be able to creat a new es stream
1551            // something is wrong with the req builder otherwise
1552            let mut es = self.get_es_stream().context("get es stream")?;
1553
1554            match Self::stream_height_events(&mut es, tx).await {
1555                Ok(received_an_event) => {
1556                    if received_an_event {
1557                        consecutive_failures = 0; // Reset after successful connection that then failed
1558                    }
1559                    log::trace!("Stream height exited");
1560                }
1561                Err(e) => {
1562                    log::trace!("Stream height failed: {e:?}");
1563                }
1564            }
1565
1566            es.close();
1567
1568            // If the receiver is closed, exit the task
1569            if tx.is_closed() {
1570                break;
1571            }
1572
1573            let delay = Self::get_delay(consecutive_failures);
1574            log::trace!("Reconnecting in {:?}...", delay);
1575
1576            if tx
1577                .send(HeightStreamEvent::Reconnecting { delay })
1578                .await
1579                .is_err()
1580            {
1581                return Ok(()); // Receiver dropped, exit task
1582            }
1583            tokio::time::sleep(delay).await;
1584
1585            // increment consecutive failures so that on the next try
1586            // it will start using back offs
1587            consecutive_failures += 1;
1588        }
1589
1590        Ok(())
1591    }
1592
1593    /// Streams archive height updates from the server via Server-Sent Events.
1594    ///
1595    /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1596    /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1597    ///
1598    /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1599    /// and height updates. This allows applications to display connection status to users.
1600    ///
1601    /// # Returns
1602    /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1603    /// lifecycle and sends events through this channel.
1604    ///
1605    /// # Example
1606    /// ```
1607    /// # use hypersync_client::{Client, HeightStreamEvent};
1608    /// # async fn example() -> anyhow::Result<()> {
1609    /// let client = Client::builder()
1610    ///     .url("https://eth.hypersync.xyz")
1611    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1612    ///     .build()?;
1613    ///
1614    /// let mut rx = client.stream_height();
1615    ///
1616    /// while let Some(event) = rx.recv().await {
1617    ///     match event {
1618    ///         HeightStreamEvent::Connected => println!("Connected to stream"),
1619    ///         HeightStreamEvent::Height(h) => println!("Height: {}", h),
1620    ///         HeightStreamEvent::Reconnecting { delay } => {
1621    ///             println!("Reconnecting in {:?}...", delay)
1622    ///         }
1623    ///     }
1624    /// }
1625    /// # Ok(())
1626    /// # }
1627    /// ```
1628    pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1629        let (tx, rx) = mpsc::channel(16);
1630        let client = self.clone();
1631
1632        tokio::spawn(async move {
1633            if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1634                log::error!("Stream height failed unexpectedly: {e:?}");
1635            }
1636        });
1637
1638        rx
1639    }
1640}
1641
1642fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1643    if config.event_signature.is_some() {
1644        return Err(anyhow!(
1645            "config.event_signature can't be passed to simple type function. User is expected to \
1646             decode the logs using Decoder."
1647        ));
1648    }
1649    if config.column_mapping.is_some() {
1650        return Err(anyhow!(
1651            "config.column_mapping can't be passed to single type function. User is expected to \
1652             map values manually."
1653        ));
1654    }
1655
1656    Ok(())
1657}
1658
1659#[cfg(test)]
1660mod tests {
1661    use super::*;
1662    #[test]
1663    fn test_get_delay() {
1664        assert_eq!(
1665            Client::get_delay(0),
1666            Duration::from_millis(0),
1667            "starts with 0 delay"
1668        );
1669        // powers of 2 backoff
1670        assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1671        assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1672        assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1673        // maxes out at 30s
1674        assert_eq!(
1675            Client::get_delay(9),
1676            Duration::from_secs(30),
1677            "max delay is 30s"
1678        );
1679        assert_eq!(
1680            Client::get_delay(10),
1681            Duration::from_secs(30),
1682            "max delay is 30s"
1683        );
1684    }
1685
1686    #[tokio::test]
1687    #[ignore = "integration test with live hs server for height stream"]
1688    async fn test_stream_height_events() -> anyhow::Result<()> {
1689        let (tx, mut rx) = mpsc::channel(16);
1690        let handle = tokio::spawn(async move {
1691            let client = Client::builder()
1692                .url("https://monad-testnet.hypersync.xyz")
1693                .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1694                .build()?;
1695            let mut es = client.get_es_stream().context("get es stream")?;
1696            Client::stream_height_events(&mut es, &tx).await
1697        });
1698
1699        let val = rx.recv().await;
1700        assert!(val.is_some());
1701        assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
1702        let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
1703            panic!("should have received height")
1704        };
1705        let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
1706            panic!("should have received height")
1707        };
1708        assert!(height2 > height);
1709        drop(rx);
1710
1711        let res = handle.await.expect("should have joined");
1712        let received_an_event =
1713            res.expect("should have ended the stream gracefully after dropping rx");
1714        assert!(received_an_event, "should have received an event");
1715
1716        Ok(())
1717    }
1718}