Skip to main content

hypersync_client/
lib.rs

1#![deny(missing_docs)]
2//! # HyperSync Client
3//!
4//! A high-performance Rust client for the HyperSync protocol, enabling efficient retrieval
5//! of blockchain data including blocks, transactions, logs, and traces.
6//!
7//! ## Features
8//!
9//! - **High-performance streaming**: Parallel data fetching with automatic retries
10//! - **Flexible querying**: Rich query builder API for precise data selection
11//! - **Multiple data formats**: Support for Arrow, Parquet, and simple Rust types
12//! - **Event decoding**: Automatic ABI decoding for smart contract events
13//! - **Real-time updates**: Live height streaming via Server-Sent Events
14//! - **Production ready**: Built-in rate limiting, retries, and error handling
15//!
16//! ## Quick Start
17//!
18//! ```no_run
19//! use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//!     // Create a client for Ethereum mainnet
24//!     let client = Client::builder()
25//!         .chain_id(1)
26//!         .api_token(std::env::var("ENVIO_API_TOKEN")?)
27//!         .build()?;
28//!
29//!     // Query ERC20 transfer events from USDC contract
30//!     let query = Query::new()
31//!         .from_block(19000000)
32//!         .to_block_excl(19001000)
33//!         .where_logs(
34//!             LogFilter::all()
35//!                 .and_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
36//!                 .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
37//!         )
38//!         .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
39//!
40//!     // Get all data in one response
41//!     let response = client.get(&query).await?;
42//!     println!("Retrieved {} blocks", response.data.blocks.len());
43//!
44//!     // Or stream data for large ranges
45//!     let mut receiver = client.stream(query, StreamConfig::default()).await?;
46//!     while let Some(response) = receiver.recv().await {
47//!         let response = response?;
48//!         println!("Streaming: got blocks up to {}", response.next_block);
49//!     }
50//!
51//!     Ok(())
52//! }
53//! ```
54//!
55//! ## Main Types
56//!
57//! - [`Client`] - Main client for interacting with HyperSync servers
58//! - [`net_types::Query`] - Query builder for specifying what data to fetch
59//! - [`StreamConfig`] - Configuration for streaming operations
60//! - [`QueryResponse`] - Response containing blocks, transactions, logs, and traces
61//! - [`ArrowResponse`] - Response in Apache Arrow format for high-performance processing
62//!
63//! ## Authentication
64//!
65//! You'll need a HyperSync API token to access the service. Get one from
66//! [https://envio.dev/app/api-tokens](https://envio.dev/app/api-tokens).
67//!
68//! ## Examples
69//!
70//! See the `examples/` directory for more detailed usage patterns including:
71//! - ERC20 token transfers
72//! - Wallet transaction history  
73//! - Event decoding and filtering
74//! - Real-time data streaming
75use std::time::Instant;
76use std::{sync::Arc, time::Duration};
77
78use anyhow::{anyhow, Context, Result};
79use futures::StreamExt;
80use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
81use reqwest::{Method, StatusCode};
82use reqwest_eventsource::retry::ExponentialBackoff;
83use reqwest_eventsource::{Event, EventSource};
84
85pub mod arrow_reader;
86mod column_mapping;
87mod config;
88mod decode;
89mod decode_call;
90mod from_arrow;
91mod parquet_out;
92mod parse_response;
93pub mod preset_query;
94mod rayon_async;
95pub mod simple_types;
96mod stream;
97mod types;
98mod util;
99
100pub use hypersync_format as format;
101pub use hypersync_net_types as net_types;
102pub use hypersync_schema as schema;
103
104use parse_response::parse_query_response;
105use tokio::sync::mpsc;
106use types::{EventResponse, ResponseData};
107use url::Url;
108
109pub use column_mapping::{ColumnMapping, DataType};
110pub use config::HexOutput;
111pub use config::{ClientConfig, SerializationFormat, StreamConfig};
112pub use decode::Decoder;
113pub use decode_call::CallDecoder;
114pub use types::{ArrowResponse, ArrowResponseData, QueryResponse};
115
116use crate::parse_response::read_query_response;
117use crate::simple_types::InternalEventJoinStrategy;
118
119#[derive(Debug)]
120struct HttpClientWrapper {
121    /// Mutable state that needs to be refreshed periodically
122    inner: std::sync::Mutex<HttpClientWrapperInner>,
123    /// HyperSync server api token.
124    api_token: String,
125    /// Standard timeout for http requests.
126    timeout: Duration,
127    /// Pools are never idle since polling often happens on the client
128    /// so connections never timeout and dns lookups never happen again
129    /// this is problematic if the underlying ip address changes during
130    /// failovers on hypersync. Since reqwest doesn't implement this we
131    /// simply recreate the client every time
132    max_connection_age: Duration,
133    /// For refreshing the client
134    user_agent: String,
135}
136
137#[derive(Debug)]
138struct HttpClientWrapperInner {
139    /// Initialized reqwest instance for client url.
140    client: reqwest::Client,
141    /// Timestamp when the client was created
142    created_at: Instant,
143}
144
145impl HttpClientWrapper {
146    fn new(user_agent: String, api_token: String, timeout: Duration) -> Self {
147        let client = reqwest::Client::builder()
148            .no_gzip()
149            .user_agent(&user_agent)
150            .build()
151            .unwrap();
152
153        Self {
154            inner: std::sync::Mutex::new(HttpClientWrapperInner {
155                client,
156                created_at: Instant::now(),
157            }),
158            api_token,
159            timeout,
160            max_connection_age: Duration::from_secs(60),
161            user_agent,
162        }
163    }
164
165    fn get_client(&self) -> reqwest::Client {
166        let mut inner = self.inner.lock().expect("HttpClientWrapper mutex poisoned");
167
168        // Check if client needs refresh due to age
169        if inner.created_at.elapsed() > self.max_connection_age {
170            // Recreate client to force new DNS lookup for failover scenarios
171            inner.client = reqwest::Client::builder()
172                .no_gzip()
173                .user_agent(&self.user_agent)
174                .build()
175                .unwrap();
176            inner.created_at = Instant::now();
177        }
178
179        // Clone is cheap for reqwest::Client (it's Arc-wrapped internally)
180        inner.client.clone()
181    }
182
183    fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
184        let client = self.get_client();
185        client
186            .request(method, url)
187            .timeout(self.timeout)
188            .bearer_auth(&self.api_token)
189    }
190
191    fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
192        let client = self.get_client();
193        client.request(method, url).bearer_auth(&self.api_token)
194    }
195}
196
197/// Internal client state to handle http requests and retries.
198#[derive(Debug)]
199struct ClientInner {
200    http_client: HttpClientWrapper,
201    /// HyperSync server URL.
202    url: Url,
203    /// Number of retries to attempt before returning error.
204    max_num_retries: usize,
205    /// Milliseconds that would be used for retry backoff increasing.
206    retry_backoff_ms: u64,
207    /// Initial wait time for request backoff.
208    retry_base_ms: u64,
209    /// Ceiling time for request backoff.
210    retry_ceiling_ms: u64,
211    /// Query serialization format to use for HTTP requests.
212    serialization_format: SerializationFormat,
213}
214
215/// Client to handle http requests and retries.
216#[derive(Clone, Debug)]
217pub struct Client {
218    inner: Arc<ClientInner>,
219}
220
221impl Client {
222    /// Creates a new client with the given configuration.
223    ///
224    /// Configuration must include the `url` and `api_token` fields.
225    /// # Example
226    /// ```
227    /// use hypersync_client::{Client, ClientConfig};
228    ///
229    /// let config = ClientConfig {
230    ///     url: "https://eth.hypersync.xyz".to_string(),
231    ///     api_token: std::env::var("ENVIO_API_TOKEN")?,
232    ///     ..Default::default()
233    /// };
234    /// let client = Client::new(config)?;
235    /// # Ok::<(), anyhow::Error>(())
236    /// ```
237    ///
238    /// # Errors
239    /// This method fails if the config is invalid.
240    pub fn new(cfg: ClientConfig) -> Result<Self> {
241        // hscr stands for hypersync client rust
242        cfg.validate().context("invalid ClientConfig")?;
243        let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
244        Self::new_internal(cfg, user_agent)
245    }
246
247    /// Creates a new client builder.
248    ///
249    /// # Example
250    /// ```
251    /// use hypersync_client::Client;
252    ///
253    /// let client = Client::builder()
254    ///     .chain_id(1)
255    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
256    ///     .build()
257    ///     .unwrap();
258    /// ```
259    pub fn builder() -> ClientBuilder {
260        ClientBuilder::new()
261    }
262
263    #[doc(hidden)]
264    pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
265        // Creates a new client with the given configuration and custom user agent.
266        // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
267        Self::new_internal(cfg, user_agent.into())
268    }
269
270    /// Internal constructor that takes both config and user agent.
271    fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
272        let http_client = HttpClientWrapper::new(
273            user_agent,
274            cfg.api_token,
275            Duration::from_millis(cfg.http_req_timeout_millis),
276        );
277
278        let url = Url::parse(&cfg.url).context("url is malformed")?;
279
280        Ok(Self {
281            inner: Arc::new(ClientInner {
282                http_client,
283                url,
284                max_num_retries: cfg.max_num_retries,
285                retry_backoff_ms: cfg.retry_backoff_ms,
286                retry_base_ms: cfg.retry_base_ms,
287                retry_ceiling_ms: cfg.retry_ceiling_ms,
288                serialization_format: cfg.serialization_format,
289            }),
290        })
291    }
292
293    /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
294    /// query and stream configuration.
295    ///
296    /// ### Implementation
297    /// Runs multiple queries simultaneously based on config.concurrency.
298    ///
299    /// Each query runs until it reaches query.to, server height, any max_num_* query param,
300    /// or execution timed out by server.
301    ///
302    /// # ⚠️ Important Warning
303    ///
304    /// This method will continue executing until the query has run to completion from beginning
305    /// to the end of the block range defined in the query. For heavy queries with large block
306    /// ranges or high data volumes, consider:
307    ///
308    /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
309    /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
310    /// - Break large queries into smaller block ranges
311    ///
312    /// # Example
313    /// ```no_run
314    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
315    ///
316    /// # async fn example() -> anyhow::Result<()> {
317    /// let client = Client::builder()
318    ///     .chain_id(1)
319    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
320    ///     .build()?;
321    ///
322    /// // Query ERC20 transfer events
323    /// let query = Query::new()
324    ///     .from_block(19000000)
325    ///     .to_block_excl(19000010)
326    ///     .where_logs(
327    ///         LogFilter::all()
328    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
329    ///     )
330    ///     .select_log_fields([LogField::Address, LogField::Data]);
331    /// let response = client.collect(query, StreamConfig::default()).await?;
332    ///
333    /// println!("Collected {} events", response.data.logs.len());
334    /// # Ok(())
335    /// # }
336    /// ```
337    pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
338        check_simple_stream_params(&config)?;
339
340        let mut recv = stream::stream_arrow(self, query, config)
341            .await
342            .context("start stream")?;
343
344        let mut data = ResponseData::default();
345        let mut archive_height = None;
346        let mut next_block = 0;
347        let mut total_execution_time = 0;
348
349        while let Some(res) = recv.recv().await {
350            let res = res.context("get response")?;
351            let res: QueryResponse =
352                QueryResponse::try_from(&res).context("convert arrow response")?;
353
354            for batch in res.data.blocks {
355                data.blocks.push(batch);
356            }
357            for batch in res.data.transactions {
358                data.transactions.push(batch);
359            }
360            for batch in res.data.logs {
361                data.logs.push(batch);
362            }
363            for batch in res.data.traces {
364                data.traces.push(batch);
365            }
366
367            archive_height = res.archive_height;
368            next_block = res.next_block;
369            total_execution_time += res.total_execution_time
370        }
371
372        Ok(QueryResponse {
373            archive_height,
374            next_block,
375            total_execution_time,
376            data,
377            rollback_guard: None,
378        })
379    }
380
381    /// Retrieves events through a stream using the provided query and stream configuration.
382    ///
383    /// # ⚠️ Important Warning
384    ///
385    /// This method will continue executing until the query has run to completion from beginning
386    /// to the end of the block range defined in the query. For heavy queries with large block
387    /// ranges or high data volumes, consider:
388    ///
389    /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
390    /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
391    /// - Break large queries into smaller block ranges
392    ///
393    /// # Example
394    /// ```no_run
395    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
396    ///
397    /// # async fn example() -> anyhow::Result<()> {
398    /// let client = Client::builder()
399    ///     .chain_id(1)
400    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
401    ///     .build()?;
402    ///
403    /// // Query transactions to a specific address
404    /// let query = Query::new()
405    ///     .from_block(19000000)
406    ///     .to_block_excl(19000100)
407    ///     .where_transactions(
408    ///         TransactionFilter::all()
409    ///             .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
410    ///     )
411    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
412    /// let response = client.collect_events(query, StreamConfig::default()).await?;
413    ///
414    /// println!("Collected {} events", response.data.len());
415    /// # Ok(())
416    /// # }
417    /// ```
418    pub async fn collect_events(
419        &self,
420        mut query: Query,
421        config: StreamConfig,
422    ) -> Result<EventResponse> {
423        check_simple_stream_params(&config)?;
424
425        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
426        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
427
428        let mut recv = stream::stream_arrow(self, query, config)
429            .await
430            .context("start stream")?;
431
432        let mut data = Vec::new();
433        let mut archive_height = None;
434        let mut next_block = 0;
435        let mut total_execution_time = 0;
436
437        while let Some(res) = recv.recv().await {
438            let res = res.context("get response")?;
439            let res: QueryResponse =
440                QueryResponse::try_from(&res).context("convert arrow response")?;
441            let events = event_join_strategy.join_from_response_data(res.data);
442
443            data.extend(events);
444
445            archive_height = res.archive_height;
446            next_block = res.next_block;
447            total_execution_time += res.total_execution_time
448        }
449
450        Ok(EventResponse {
451            archive_height,
452            next_block,
453            total_execution_time,
454            data,
455            rollback_guard: None,
456        })
457    }
458
459    /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
460    /// the provided query and stream configuration.
461    ///
462    /// Returns data in Apache Arrow format for high-performance columnar processing.
463    /// Useful for analytics workloads or when working with Arrow-compatible tools.
464    ///
465    /// # ⚠️ Important Warning
466    ///
467    /// This method will continue executing until the query has run to completion from beginning
468    /// to the end of the block range defined in the query. For heavy queries with large block
469    /// ranges or high data volumes, consider:
470    ///
471    /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
472    /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
473    /// - Break large queries into smaller block ranges
474    ///
475    /// # Example
476    /// ```no_run
477    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
478    ///
479    /// # async fn example() -> anyhow::Result<()> {
480    /// let client = Client::builder()
481    ///     .chain_id(1)
482    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
483    ///     .build()?;
484    ///
485    /// // Get block data in Arrow format for analytics
486    /// let query = Query::new()
487    ///     .from_block(19000000)
488    ///     .to_block_excl(19000100)
489    ///     .include_all_blocks()
490    ///     .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
491    /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
492    ///
493    /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
494    /// # Ok(())
495    /// # }
496    /// ```
497    pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
498        let mut recv = stream::stream_arrow(self, query, config)
499            .await
500            .context("start stream")?;
501
502        let mut data = ArrowResponseData::default();
503        let mut archive_height = None;
504        let mut next_block = 0;
505        let mut total_execution_time = 0;
506
507        while let Some(res) = recv.recv().await {
508            let res = res.context("get response")?;
509
510            for batch in res.data.blocks {
511                data.blocks.push(batch);
512            }
513            for batch in res.data.transactions {
514                data.transactions.push(batch);
515            }
516            for batch in res.data.logs {
517                data.logs.push(batch);
518            }
519            for batch in res.data.traces {
520                data.traces.push(batch);
521            }
522            for batch in res.data.decoded_logs {
523                data.decoded_logs.push(batch);
524            }
525
526            archive_height = res.archive_height;
527            next_block = res.next_block;
528            total_execution_time += res.total_execution_time
529        }
530
531        Ok(ArrowResponse {
532            archive_height,
533            next_block,
534            total_execution_time,
535            data,
536            rollback_guard: None,
537        })
538    }
539
540    /// Writes parquet file getting data through a stream using the provided path, query,
541    /// and stream configuration.
542    ///
543    /// Streams data directly to a Parquet file for efficient storage and later analysis.
544    /// Perfect for data exports or ETL pipelines.
545    ///
546    /// # ⚠️ Important Warning
547    ///
548    /// This method will continue executing until the query has run to completion from beginning
549    /// to the end of the block range defined in the query. For heavy queries with large block
550    /// ranges or high data volumes, consider:
551    ///
552    /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
553    /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
554    /// - Break large queries into smaller block ranges
555    ///
556    /// # Example
557    /// ```no_run
558    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
559    ///
560    /// # async fn example() -> anyhow::Result<()> {
561    /// let client = Client::builder()
562    ///     .chain_id(1)
563    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
564    ///     .build()?;
565    ///
566    /// // Export all DEX trades to Parquet for analysis
567    /// let query = Query::new()
568    ///     .from_block(19000000)
569    ///     .to_block_excl(19010000)
570    ///     .where_logs(
571    ///         LogFilter::all()
572    ///             .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
573    ///     )
574    ///     .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
575    /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
576    ///
577    /// println!("Trade data exported to trades.parquet");
578    /// # Ok(())
579    /// # }
580    /// ```
581    pub async fn collect_parquet(
582        &self,
583        path: &str,
584        query: Query,
585        config: StreamConfig,
586    ) -> Result<()> {
587        parquet_out::collect_parquet(self, path, query, config).await
588    }
589
590    /// Internal implementation of getting chain_id from server
591    async fn get_chain_id_impl(&self) -> Result<u64> {
592        let mut url = self.inner.url.clone();
593        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
594        segments.push("chain_id");
595        std::mem::drop(segments);
596        let req = self.inner.http_client.request(Method::GET, url);
597
598        let res = req.send().await.context("execute http req")?;
599
600        let status = res.status();
601        if !status.is_success() {
602            return Err(anyhow!("http response status code {}", status));
603        }
604
605        let chain_id: ChainId = res.json().await.context("read response body json")?;
606
607        Ok(chain_id.chain_id)
608    }
609
610    /// Internal implementation of getting height from server
611    async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
612        let mut url = self.inner.url.clone();
613        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
614        segments.push("height");
615        std::mem::drop(segments);
616        let mut req = self.inner.http_client.request(Method::GET, url);
617        if let Some(http_timeout_override) = http_timeout_override {
618            req = req.timeout(http_timeout_override);
619        }
620
621        let res = req.send().await.context("execute http req")?;
622
623        let status = res.status();
624        if !status.is_success() {
625            return Err(anyhow!("http response status code {}", status));
626        }
627
628        let height: ArchiveHeight = res.json().await.context("read response body json")?;
629
630        Ok(height.height.unwrap_or(0))
631    }
632
633    /// Get the chain_id from the server with retries.
634    ///
635    /// # Example
636    /// ```no_run
637    /// use hypersync_client::Client;
638    ///
639    /// # async fn example() -> anyhow::Result<()> {
640    /// let client = Client::builder()
641    ///     .chain_id(1)
642    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
643    ///     .build()?;
644    ///
645    /// let chain_id = client.get_chain_id().await?;
646    /// println!("Connected to chain ID: {}", chain_id);
647    /// # Ok(())
648    /// # }
649    /// ```
650    pub async fn get_chain_id(&self) -> Result<u64> {
651        let mut base = self.inner.retry_base_ms;
652
653        let mut err = anyhow!("");
654
655        for _ in 0..self.inner.max_num_retries + 1 {
656            match self.get_chain_id_impl().await {
657                Ok(res) => return Ok(res),
658                Err(e) => {
659                    log::error!(
660                        "failed to get chain_id from server, retrying... The error was: {e:?}"
661                    );
662                    err = err.context(format!("{e:?}"));
663                }
664            }
665
666            let base_ms = Duration::from_millis(base);
667            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
668                rand::random(),
669                self.inner.retry_backoff_ms,
670            ));
671
672            tokio::time::sleep(base_ms + jitter).await;
673
674            base = std::cmp::min(
675                base + self.inner.retry_backoff_ms,
676                self.inner.retry_ceiling_ms,
677            );
678        }
679
680        Err(err)
681    }
682
683    /// Get the height of from server with retries.
684    ///
685    /// # Example
686    /// ```no_run
687    /// use hypersync_client::Client;
688    ///
689    /// # async fn example() -> anyhow::Result<()> {
690    /// let client = Client::builder()
691    ///     .chain_id(1)
692    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
693    ///     .build()?;
694    ///
695    /// let height = client.get_height().await?;
696    /// println!("Current block height: {}", height);
697    /// # Ok(())
698    /// # }
699    /// ```
700    pub async fn get_height(&self) -> Result<u64> {
701        let mut base = self.inner.retry_base_ms;
702
703        let mut err = anyhow!("");
704
705        for _ in 0..self.inner.max_num_retries + 1 {
706            match self.get_height_impl(None).await {
707                Ok(res) => return Ok(res),
708                Err(e) => {
709                    log::error!(
710                        "failed to get height from server, retrying... The error was: {e:?}"
711                    );
712                    err = err.context(format!("{e:?}"));
713                }
714            }
715
716            let base_ms = Duration::from_millis(base);
717            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
718                rand::random(),
719                self.inner.retry_backoff_ms,
720            ));
721
722            tokio::time::sleep(base_ms + jitter).await;
723
724            base = std::cmp::min(
725                base + self.inner.retry_backoff_ms,
726                self.inner.retry_ceiling_ms,
727            );
728        }
729
730        Err(err)
731    }
732
733    /// Get the height of the Client instance for health checks.
734    ///
735    /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
736    ///
737    /// # Example
738    /// ```no_run
739    /// use hypersync_client::Client;
740    /// use std::time::Duration;
741    ///
742    /// # async fn example() -> anyhow::Result<()> {
743    /// let client = Client::builder()
744    ///     .chain_id(1)
745    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
746    ///     .build()?;
747    ///
748    /// // Quick health check with 5 second timeout
749    /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
750    /// println!("Server is healthy at block: {}", height);
751    /// # Ok(())
752    /// # }
753    /// ```
754    pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
755        self.get_height_impl(http_req_timeout).await
756    }
757
758    /// Executes query with retries and returns the response.
759    ///
760    /// # Example
761    /// ```no_run
762    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
763    ///
764    /// # async fn example() -> anyhow::Result<()> {
765    /// let client = Client::builder()
766    ///     .chain_id(1)
767    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
768    ///     .build()?;
769    ///
770    /// // Query all blocks from a specific range
771    /// let query = Query::new()
772    ///     .from_block(19000000)
773    ///     .to_block_excl(19000010)
774    ///     .include_all_blocks()
775    ///     .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
776    /// let response = client.get(&query).await?;
777    ///
778    /// println!("Retrieved {} blocks", response.data.blocks.len());
779    /// # Ok(())
780    /// # }
781    /// ```
782    pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
783        let arrow_response = self.get_arrow(query).await.context("get data")?;
784        let converted =
785            QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
786        Ok(converted)
787    }
788
789    /// Add block, transaction and log fields selection to the query, executes it with retries
790    /// and returns the response.
791    ///
792    /// This method automatically joins blocks, transactions, and logs into unified events,
793    /// making it easier to work with related blockchain data.
794    ///
795    /// # Example
796    /// ```no_run
797    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
798    ///
799    /// # async fn example() -> anyhow::Result<()> {
800    /// let client = Client::builder()
801    ///     .chain_id(1)
802    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
803    ///     .build()?;
804    ///
805    /// // Query ERC20 transfers with transaction context
806    /// let query = Query::new()
807    ///     .from_block(19000000)
808    ///     .to_block_excl(19000010)
809    ///     .where_logs(
810    ///         LogFilter::all()
811    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
812    ///     )
813    ///     .select_log_fields([LogField::Address, LogField::Data])
814    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
815    /// let response = client.get_events(query).await?;
816    ///
817    /// println!("Retrieved {} joined events", response.data.len());
818    /// # Ok(())
819    /// # }
820    /// ```
821    pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
822        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
823        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
824        let arrow_response = self.get_arrow(&query).await.context("get data")?;
825        EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
826    }
827
828    /// Executes query once and returns the result in (Arrow, size) format using JSON serialization.
829    async fn get_arrow_impl_json(
830        &self,
831        query: &Query,
832    ) -> std::result::Result<(ArrowResponse, u64), HyperSyncResponseError> {
833        let mut url = self.inner.url.clone();
834        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
835        segments.push("query");
836        segments.push("arrow-ipc");
837        std::mem::drop(segments);
838        let req = self.inner.http_client.request(Method::POST, url);
839
840        let res = req.json(&query).send().await.context("execute http req")?;
841
842        let status = res.status();
843        if status == StatusCode::PAYLOAD_TOO_LARGE {
844            return Err(HyperSyncResponseError::PayloadTooLarge);
845        }
846        if !status.is_success() {
847            let text = res.text().await.context("read text to see error")?;
848
849            return Err(HyperSyncResponseError::Other(anyhow!(
850                "http response status code {}, err body: {}",
851                status,
852                text
853            )));
854        }
855
856        let bytes = res.bytes().await.context("read response body bytes")?;
857
858        let res = tokio::task::block_in_place(|| {
859            parse_query_response(&bytes).context("parse query response")
860        })?;
861
862        Ok((res, bytes.len().try_into().unwrap()))
863    }
864
865    fn should_cache_queries(&self) -> bool {
866        matches!(
867            self.inner.serialization_format,
868            SerializationFormat::CapnProto {
869                should_cache_queries: true
870            }
871        )
872    }
873
874    /// Executes query once and returns the result in (Arrow, size) format using Cap'n Proto serialization.
875    async fn get_arrow_impl_capnp(
876        &self,
877        query: &Query,
878    ) -> std::result::Result<(ArrowResponse, u64), HyperSyncResponseError> {
879        let mut url = self.inner.url.clone();
880        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
881        segments.push("query");
882        segments.push("arrow-ipc");
883        segments.push("capnp");
884        std::mem::drop(segments);
885
886        let should_cache = self.should_cache_queries();
887
888        if should_cache {
889            let query_with_id = {
890                let mut message = capnp::message::Builder::new_default();
891                let mut request_builder =
892                    message.init_root::<hypersync_net_types_capnp::request::Builder>();
893
894                request_builder
895                    .build_query_id_from_query(query)
896                    .context("build query id")?;
897                let mut query_with_id = Vec::new();
898                capnp::serialize_packed::write_message(&mut query_with_id, &message)
899                    .context("write capnp message")?;
900                query_with_id
901            };
902
903            let req = self.inner.http_client.request(Method::POST, url.clone());
904
905            let res = req
906                .body(query_with_id)
907                .send()
908                .await
909                .context("execute http req")?;
910
911            let status = res.status();
912            if status == StatusCode::PAYLOAD_TOO_LARGE {
913                return Err(HyperSyncResponseError::PayloadTooLarge);
914            }
915            if status.is_success() {
916                let bytes = res.bytes().await.context("read response body bytes")?;
917
918                let mut opts = capnp::message::ReaderOptions::new();
919                opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
920                let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
921                    .context("create message reader")?;
922                let query_response = message_reader
923                    .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
924                    .context("get cached_query_response root")?;
925                match query_response.get_either().which().context("get either")? {
926                hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
927                    query_response,
928                ) => {
929                    let res = tokio::task::block_in_place(|| {
930                        let res = query_response?;
931                        read_query_response(&res).context("parse query response cached")
932                    })?;
933                    return Ok((res, bytes.len().try_into().unwrap()));
934                }
935                hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
936                    log::trace!("query was not cached, retrying with full query");
937                }
938            }
939            } else {
940                let text = res.text().await.context("read text to see error")?;
941                log::warn!(
942                    "Failed cache query, will retry full query. {}, err body: {}",
943                    status,
944                    text
945                );
946            }
947        };
948
949        let full_query_bytes = {
950            let mut message = capnp::message::Builder::new_default();
951            let mut query_builder =
952                message.init_root::<hypersync_net_types_capnp::request::Builder>();
953
954            query_builder
955                .build_full_query_from_query(query, should_cache)
956                .context("build full query")?;
957            let mut bytes = Vec::new();
958            capnp::serialize_packed::write_message(&mut bytes, &message)
959                .context("write full query capnp message")?;
960            bytes
961        };
962
963        let req = self.inner.http_client.request(Method::POST, url);
964
965        let res = req
966            .body(full_query_bytes)
967            .send()
968            .await
969            .context("execute http req")?;
970
971        let status = res.status();
972        if status == StatusCode::PAYLOAD_TOO_LARGE {
973            return Err(HyperSyncResponseError::PayloadTooLarge);
974        }
975        if !status.is_success() {
976            let text = res.text().await.context("read text to see error")?;
977
978            return Err(HyperSyncResponseError::Other(anyhow!(
979                "http response status code {}, err body: {}",
980                status,
981                text
982            )));
983        }
984
985        let bytes = res.bytes().await.context("read response body bytes")?;
986
987        let res = tokio::task::block_in_place(|| {
988            parse_query_response(&bytes).context("parse query response")
989        })?;
990
991        Ok((res, bytes.len().try_into().unwrap()))
992    }
993
994    /// Executes query once and returns the result in (Arrow, size) format.
995    async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
996        let mut query = query.clone();
997        loop {
998            let res = match self.inner.serialization_format {
999                SerializationFormat::Json => self.get_arrow_impl_json(&query).await,
1000                SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(&query).await,
1001            };
1002            match res {
1003                Ok(res) => return Ok(res),
1004                Err(HyperSyncResponseError::Other(e)) => return Err(e),
1005                Err(HyperSyncResponseError::PayloadTooLarge) => {
1006                    let block_range = if let Some(to_block) = query.to_block {
1007                        let current = to_block - query.from_block;
1008                        if current < 2 {
1009                            anyhow::bail!(
1010                                "Payload is too large and query is using the minimum block range."
1011                            )
1012                        }
1013                        // Half the current block range
1014                        current / 2
1015                    } else {
1016                        200
1017                    };
1018                    let to_block = query.from_block + block_range;
1019                    query.to_block = Some(to_block);
1020
1021                    log::trace!(
1022                        "Payload is too large, retrying with block range from: {} to: {}",
1023                        query.from_block,
1024                        to_block
1025                    );
1026                }
1027            }
1028        }
1029    }
1030
1031    /// Executes query with retries and returns the response in Arrow format.
1032    pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
1033        self.get_arrow_with_size(query).await.map(|res| res.0)
1034    }
1035
1036    /// Internal implementation for get_arrow.
1037    async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
1038        let mut base = self.inner.retry_base_ms;
1039
1040        let mut err = anyhow!("");
1041
1042        for _ in 0..self.inner.max_num_retries + 1 {
1043            match self.get_arrow_impl(query).await {
1044                Ok(res) => return Ok(res),
1045                Err(e) => {
1046                    log::error!(
1047                        "failed to get arrow data from server, retrying... The error was: {e:?}"
1048                    );
1049                    err = err.context(format!("{e:?}"));
1050                }
1051            }
1052
1053            let base_ms = Duration::from_millis(base);
1054            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
1055                rand::random(),
1056                self.inner.retry_backoff_ms,
1057            ));
1058
1059            tokio::time::sleep(base_ms + jitter).await;
1060
1061            base = std::cmp::min(
1062                base + self.inner.retry_backoff_ms,
1063                self.inner.retry_ceiling_ms,
1064            );
1065        }
1066
1067        Err(err)
1068    }
1069
1070    /// Spawns task to execute query and return data via a channel.
1071    ///
1072    /// # Example
1073    /// ```no_run
1074    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
1075    ///
1076    /// # async fn example() -> anyhow::Result<()> {
1077    /// let client = Client::builder()
1078    ///     .chain_id(1)
1079    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1080    ///     .build()?;
1081    ///
1082    /// // Stream all ERC20 transfer events
1083    /// let query = Query::new()
1084    ///     .from_block(19000000)
1085    ///     .where_logs(
1086    ///         LogFilter::all()
1087    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1088    ///     )
1089    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
1090    /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
1091    ///
1092    /// while let Some(response) = receiver.recv().await {
1093    ///     let response = response?;
1094    ///     println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
1095    /// }
1096    /// # Ok(())
1097    /// # }
1098    /// ```
1099    pub async fn stream(
1100        &self,
1101        query: Query,
1102        config: StreamConfig,
1103    ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1104        check_simple_stream_params(&config)?;
1105
1106        let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1107            mpsc::channel(config.concurrency);
1108
1109        let mut inner_rx = self
1110            .stream_arrow(query, config)
1111            .await
1112            .context("start inner stream")?;
1113
1114        tokio::spawn(async move {
1115            while let Some(resp) = inner_rx.recv().await {
1116                let msg = resp
1117                    .context("inner receiver")
1118                    .and_then(|r| QueryResponse::try_from(&r));
1119                let is_err = msg.is_err();
1120                if tx.send(msg).await.is_err() || is_err {
1121                    return;
1122                }
1123            }
1124        });
1125
1126        Ok(rx)
1127    }
1128
1129    /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1130    /// returning data via a channel.
1131    ///
1132    /// This method automatically joins blocks, transactions, and logs into unified events,
1133    /// then streams them via a channel for real-time processing.
1134    ///
1135    /// # Example
1136    /// ```no_run
1137    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1138    ///
1139    /// # async fn example() -> anyhow::Result<()> {
1140    /// let client = Client::builder()
1141    ///     .chain_id(1)
1142    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1143    ///     .build()?;
1144    ///
1145    /// // Stream NFT transfer events with transaction context
1146    /// let query = Query::new()
1147    ///     .from_block(19000000)
1148    ///     .where_logs(
1149    ///         LogFilter::all()
1150    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1151    ///     )
1152    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1153    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1154    /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1155    ///
1156    /// while let Some(response) = receiver.recv().await {
1157    ///     let response = response?;
1158    ///     println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1159    /// }
1160    /// # Ok(())
1161    /// # }
1162    /// ```
1163    pub async fn stream_events(
1164        &self,
1165        mut query: Query,
1166        config: StreamConfig,
1167    ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1168        check_simple_stream_params(&config)?;
1169
1170        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1171
1172        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1173
1174        let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1175            mpsc::channel(config.concurrency);
1176
1177        let mut inner_rx = self
1178            .stream_arrow(query, config)
1179            .await
1180            .context("start inner stream")?;
1181
1182        tokio::spawn(async move {
1183            while let Some(resp) = inner_rx.recv().await {
1184                let msg = resp
1185                    .context("inner receiver")
1186                    .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1187                let is_err = msg.is_err();
1188                if tx.send(msg).await.is_err() || is_err {
1189                    return;
1190                }
1191            }
1192        });
1193
1194        Ok(rx)
1195    }
1196
1197    /// Spawns task to execute query and return data via a channel in Arrow format.
1198    ///
1199    /// Returns raw Apache Arrow data via a channel for high-performance processing.
1200    /// Ideal for applications that need to work directly with columnar data.
1201    ///
1202    /// # Example
1203    /// ```no_run
1204    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1205    ///
1206    /// # async fn example() -> anyhow::Result<()> {
1207    /// let client = Client::builder()
1208    ///     .chain_id(1)
1209    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1210    ///     .build()?;
1211    ///
1212    /// // Stream transaction data in Arrow format for analytics
1213    /// let query = Query::new()
1214    ///     .from_block(19000000)
1215    ///     .to_block_excl(19000100)
1216    ///     .where_transactions(
1217    ///         TransactionFilter::all()
1218    ///             .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1219    ///     )
1220    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1221    /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1222    ///
1223    /// while let Some(response) = receiver.recv().await {
1224    ///     let response = response?;
1225    ///     println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1226    /// }
1227    /// # Ok(())
1228    /// # }
1229    /// ```
1230    pub async fn stream_arrow(
1231        &self,
1232        query: Query,
1233        config: StreamConfig,
1234    ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1235        stream::stream_arrow(self, query, config).await
1236    }
1237
1238    /// Getter for url field.
1239    ///
1240    /// # Example
1241    /// ```
1242    /// use hypersync_client::Client;
1243    ///
1244    /// let client = Client::builder()
1245    ///     .chain_id(1)
1246    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1247    ///     .build()
1248    ///     .unwrap();
1249    ///
1250    /// println!("Client URL: {}", client.url());
1251    /// ```
1252    pub fn url(&self) -> &Url {
1253        &self.inner.url
1254    }
1255}
1256
1257/// Builder for creating a hypersync client with configuration options.
1258///
1259/// This builder provides a fluent API for configuring client settings like URL,
1260/// authentication, timeouts, and retry behavior.
1261///
1262/// # Example
1263/// ```
1264/// use hypersync_client::{Client, SerializationFormat};
1265///
1266/// let client = Client::builder()
1267///     .chain_id(1)
1268///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1269///     .http_req_timeout_millis(30000)
1270///     .max_num_retries(3)
1271///     .build()
1272///     .unwrap();
1273/// ```
1274#[derive(Debug, Clone, Default)]
1275pub struct ClientBuilder(ClientConfig);
1276
1277impl ClientBuilder {
1278    /// Creates a new ClientBuilder with default configuration.
1279    pub fn new() -> Self {
1280        Self::default()
1281    }
1282
1283    /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1284    ///
1285    /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1286    ///
1287    /// # Arguments
1288    /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1289    ///
1290    /// # Example
1291    /// ```
1292    /// use hypersync_client::Client;
1293    ///
1294    /// let client = Client::builder()
1295    ///     .chain_id(1) // Ethereum mainnet
1296    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1297    ///     .build()
1298    ///     .unwrap();
1299    /// ```
1300    pub fn chain_id(mut self, chain_id: u64) -> Self {
1301        self.0.url = format!("https://{chain_id}.hypersync.xyz");
1302        self
1303    }
1304
1305    /// Sets a custom URL for the hypersync server.
1306    ///
1307    /// Use this method when you need to connect to a custom hypersync endpoint
1308    /// instead of the default public endpoints.
1309    ///
1310    /// # Arguments
1311    /// * `url` - The hypersync server URL
1312    ///
1313    /// # Example
1314    /// ```
1315    /// use hypersync_client::Client;
1316    ///
1317    /// let client = Client::builder()
1318    ///     .url("https://my-custom-hypersync.example.com")
1319    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1320    ///     .build()
1321    ///     .unwrap();
1322    /// ```
1323    pub fn url<S: ToString>(mut self, url: S) -> Self {
1324        self.0.url = url.to_string();
1325        self
1326    }
1327
1328    /// Sets the api token for authentication.
1329    ///
1330    /// Required for accessing authenticated hypersync endpoints.
1331    ///
1332    /// # Arguments
1333    /// * `api_token` - The authentication token
1334    ///
1335    /// # Example
1336    /// ```
1337    /// use hypersync_client::Client;
1338    ///
1339    /// let client = Client::builder()
1340    ///     .chain_id(1)
1341    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1342    ///     .build()
1343    ///     .unwrap();
1344    /// ```
1345    pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1346        self.0.api_token = api_token.to_string();
1347        self
1348    }
1349
1350    /// Sets the HTTP request timeout in milliseconds.
1351    ///
1352    /// # Arguments
1353    /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1354    ///
1355    /// # Example
1356    /// ```
1357    /// use hypersync_client::Client;
1358    ///
1359    /// let client = Client::builder()
1360    ///     .chain_id(1)
1361    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1362    ///     .http_req_timeout_millis(60000) // 60 second timeout
1363    ///     .build()
1364    ///     .unwrap();
1365    /// ```
1366    pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1367        self.0.http_req_timeout_millis = http_req_timeout_millis;
1368        self
1369    }
1370
1371    /// Sets the maximum number of retries for failed requests.
1372    ///
1373    /// # Arguments
1374    /// * `max_num_retries` - Maximum number of retries (default: 10)
1375    ///
1376    /// # Example
1377    /// ```
1378    /// use hypersync_client::Client;
1379    ///
1380    /// let client = Client::builder()
1381    ///     .chain_id(1)
1382    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1383    ///     .max_num_retries(5)
1384    ///     .build()
1385    ///     .unwrap();
1386    /// ```
1387    pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1388        self.0.max_num_retries = max_num_retries;
1389        self
1390    }
1391
1392    /// Sets the backoff increment for retry delays.
1393    ///
1394    /// This value is added to the base delay on each retry attempt.
1395    ///
1396    /// # Arguments
1397    /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1398    ///
1399    /// # Example
1400    /// ```
1401    /// use hypersync_client::Client;
1402    ///
1403    /// let client = Client::builder()
1404    ///     .chain_id(1)
1405    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1406    ///     .retry_backoff_ms(1000) // 1 second backoff increment
1407    ///     .build()
1408    ///     .unwrap();
1409    /// ```
1410    pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1411        self.0.retry_backoff_ms = retry_backoff_ms;
1412        self
1413    }
1414
1415    /// Sets the initial delay for retry attempts.
1416    ///
1417    /// # Arguments
1418    /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1419    ///
1420    /// # Example
1421    /// ```
1422    /// use hypersync_client::Client;
1423    ///
1424    /// let client = Client::builder()
1425    ///     .chain_id(1)
1426    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1427    ///     .retry_base_ms(1000) // Start with 1 second delay
1428    ///     .build()
1429    ///     .unwrap();
1430    /// ```
1431    pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1432        self.0.retry_base_ms = retry_base_ms;
1433        self
1434    }
1435
1436    /// Sets the maximum delay for retry attempts.
1437    ///
1438    /// The retry delay will not exceed this value, even with backoff increments.
1439    ///
1440    /// # Arguments
1441    /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1442    ///
1443    /// # Example
1444    /// ```
1445    /// use hypersync_client::Client;
1446    ///
1447    /// let client = Client::builder()
1448    ///     .chain_id(1)
1449    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1450    ///     .retry_ceiling_ms(30000) // Cap at 30 seconds
1451    ///     .build()
1452    ///     .unwrap();
1453    /// ```
1454    pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1455        self.0.retry_ceiling_ms = retry_ceiling_ms;
1456        self
1457    }
1458
1459    /// Sets the serialization format for client-server communication.
1460    ///
1461    /// # Arguments
1462    /// * `serialization_format` - The format to use (JSON or CapnProto)
1463    ///
1464    /// # Example
1465    /// ```
1466    /// use hypersync_client::{Client, SerializationFormat};
1467    ///
1468    /// let client = Client::builder()
1469    ///     .chain_id(1)
1470    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1471    ///     .serialization_format(SerializationFormat::Json)
1472    ///     .build()
1473    ///     .unwrap();
1474    /// ```
1475    pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1476        self.0.serialization_format = serialization_format;
1477        self
1478    }
1479
1480    /// Builds the client with the configured settings.
1481    ///
1482    /// # Returns
1483    /// * `Result<Client>` - The configured client or an error if configuration is invalid
1484    ///
1485    /// # Errors
1486    /// Returns an error if:
1487    /// * The URL is malformed
1488    /// * Required configuration is missing
1489    ///
1490    /// # Example
1491    /// ```
1492    /// use hypersync_client::Client;
1493    ///
1494    /// let client = Client::builder()
1495    ///     .chain_id(1)
1496    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1497    ///     .build()
1498    ///     .unwrap();
1499    /// ```
1500    pub fn build(self) -> Result<Client> {
1501        if self.0.url.is_empty() {
1502            anyhow::bail!(
1503                "endpoint needs to be set, try using builder.chain_id(1) or\
1504                builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1505            )
1506        }
1507        Client::new(self.0)
1508    }
1509}
1510
1511/// 200ms
1512const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1513const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1514/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1515/// so we timeout after 15s (3x the ping interval).
1516const READ_TIMEOUT: Duration = Duration::from_secs(15);
1517
1518/// Events emitted by the height stream.
1519#[derive(Debug, Clone, PartialEq, Eq)]
1520pub enum HeightStreamEvent {
1521    /// Successfully connected or reconnected to the SSE stream.
1522    Connected,
1523    /// Received a height update from the server.
1524    Height(u64),
1525    /// Connection lost, will attempt to reconnect after the specified delay.
1526    Reconnecting {
1527        /// Duration to wait before attempting reconnection.
1528        delay: Duration,
1529        /// The error that caused the reconnection.
1530        error_msg: String,
1531    },
1532}
1533
1534enum InternalStreamEvent {
1535    Publish(HeightStreamEvent),
1536    Ping,
1537    Unknown(String),
1538}
1539
1540impl Client {
1541    fn get_es_stream(&self) -> Result<EventSource> {
1542        // Build the GET /height/sse request
1543        let mut url = self.inner.url.clone();
1544        url.path_segments_mut()
1545            .ok()
1546            .context("invalid base URL")?
1547            .extend(&["height", "sse"]);
1548
1549        let req = self
1550            .inner
1551            .http_client
1552            // Don't set timeout for SSE stream
1553            .request_no_timeout(Method::GET, url);
1554
1555        // Configure exponential backoff for library-level retries
1556        let retry_policy = ExponentialBackoff::new(
1557            INITIAL_RECONNECT_DELAY,
1558            2.0,
1559            Some(MAX_RECONNECT_DELAY),
1560            None, // unlimited retries
1561        );
1562
1563        // Turn the request into an EventSource stream with retries
1564        let mut es = reqwest_eventsource::EventSource::new(req)
1565            .context("unexpected error creating EventSource")?;
1566        es.set_retry_policy(Box::new(retry_policy));
1567        Ok(es)
1568    }
1569
1570    async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1571        let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1572            .await
1573            .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1574        else {
1575            return Ok(None);
1576        };
1577
1578        let e = match res.context("failed response")? {
1579            Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1580            Event::Message(event) => match event.event.as_str() {
1581                "height" => {
1582                    let height = event
1583                        .data
1584                        .trim()
1585                        .parse::<u64>()
1586                        .context("parsing height from event data")?;
1587                    InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1588                }
1589                "ping" => InternalStreamEvent::Ping,
1590                _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1591            },
1592        };
1593
1594        Ok(Some(e))
1595    }
1596
1597    async fn stream_height_events(
1598        es: &mut EventSource,
1599        tx: &mpsc::Sender<HeightStreamEvent>,
1600    ) -> Result<bool> {
1601        let mut received_an_event = false;
1602        while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1603            match event {
1604                InternalStreamEvent::Publish(event) => {
1605                    received_an_event = true;
1606                    if tx.send(event).await.is_err() {
1607                        return Ok(received_an_event); // Receiver dropped, exit task
1608                    }
1609                }
1610                InternalStreamEvent::Ping => (), // ignore pings
1611                InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1612            }
1613        }
1614        Ok(received_an_event)
1615    }
1616
1617    fn get_delay(consecutive_failures: u32) -> Duration {
1618        if consecutive_failures > 0 {
1619            /// helper function to calculate 2^x
1620            /// optimization using bit shifting
1621            const fn two_to_pow(x: u32) -> u32 {
1622                1 << x
1623            }
1624            // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1625            INITIAL_RECONNECT_DELAY
1626                .saturating_mul(two_to_pow(consecutive_failures - 1))
1627                .min(MAX_RECONNECT_DELAY)
1628        } else {
1629            // On zero consecutive failures, 0 delay
1630            Duration::from_millis(0)
1631        }
1632    }
1633
1634    async fn stream_height_events_with_retry(
1635        &self,
1636        tx: &mpsc::Sender<HeightStreamEvent>,
1637    ) -> Result<()> {
1638        let mut consecutive_failures = 0u32;
1639
1640        loop {
1641            // should always be able to creat a new es stream
1642            // something is wrong with the req builder otherwise
1643            let mut es = self.get_es_stream().context("get es stream")?;
1644
1645            let mut error = anyhow!("");
1646
1647            match Self::stream_height_events(&mut es, tx).await {
1648                Ok(received_an_event) => {
1649                    if received_an_event {
1650                        consecutive_failures = 0; // Reset after successful connection that then failed
1651                    }
1652                    log::trace!("Stream height exited");
1653                }
1654                Err(e) => {
1655                    log::trace!("Stream height failed: {e:?}");
1656                    error = e;
1657                }
1658            }
1659
1660            es.close();
1661
1662            // If the receiver is closed, exit the task
1663            if tx.is_closed() {
1664                break;
1665            }
1666
1667            let delay = Self::get_delay(consecutive_failures);
1668            log::trace!("Reconnecting in {:?}...", delay);
1669
1670            let error_msg = format!("{error:?}");
1671
1672            if tx
1673                .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1674                .await
1675                .is_err()
1676            {
1677                return Ok(()); // Receiver dropped, exit task
1678            }
1679            tokio::time::sleep(delay).await;
1680
1681            // increment consecutive failures so that on the next try
1682            // it will start using back offs
1683            consecutive_failures += 1;
1684        }
1685
1686        Ok(())
1687    }
1688
1689    /// Streams archive height updates from the server via Server-Sent Events.
1690    ///
1691    /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1692    /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1693    ///
1694    /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1695    /// and height updates. This allows applications to display connection status to users.
1696    ///
1697    /// # Returns
1698    /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1699    /// lifecycle and sends events through this channel.
1700    ///
1701    /// # Example
1702    /// ```
1703    /// # use hypersync_client::{Client, HeightStreamEvent};
1704    /// # async fn example() -> anyhow::Result<()> {
1705    /// let client = Client::builder()
1706    ///     .url("https://eth.hypersync.xyz")
1707    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1708    ///     .build()?;
1709    ///
1710    /// let mut rx = client.stream_height();
1711    ///
1712    /// while let Some(event) = rx.recv().await {
1713    ///     match event {
1714    ///         HeightStreamEvent::Connected => println!("Connected to stream"),
1715    ///         HeightStreamEvent::Height(h) => println!("Height: {}", h),
1716    ///         HeightStreamEvent::Reconnecting { delay, error_msg } => {
1717    ///             println!("Reconnecting in {delay:?} due to error: {error_msg}")
1718    ///         }
1719    ///     }
1720    /// }
1721    /// # Ok(())
1722    /// # }
1723    /// ```
1724    pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1725        let (tx, rx) = mpsc::channel(16);
1726        let client = self.clone();
1727
1728        tokio::spawn(async move {
1729            if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1730                log::error!("Stream height failed unexpectedly: {e:?}");
1731            }
1732        });
1733
1734        rx
1735    }
1736}
1737
1738fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1739    if config.event_signature.is_some() {
1740        return Err(anyhow!(
1741            "config.event_signature can't be passed to simple type function. User is expected to \
1742             decode the logs using Decoder."
1743        ));
1744    }
1745    if config.column_mapping.is_some() {
1746        return Err(anyhow!(
1747            "config.column_mapping can't be passed to single type function. User is expected to \
1748             map values manually."
1749        ));
1750    }
1751
1752    Ok(())
1753}
1754
1755/// Used to indicate whether or not a retry should be attempted.
1756#[derive(Debug, thiserror::Error)]
1757pub enum HyperSyncResponseError {
1758    /// Means that the client should retry with a smaller block range.
1759    #[error("hypersync responded with 'payload too large' error")]
1760    PayloadTooLarge,
1761    /// Any other server error.
1762    #[error(transparent)]
1763    Other(#[from] anyhow::Error),
1764}
1765
1766#[cfg(test)]
1767mod tests {
1768    use super::*;
1769    #[test]
1770    fn test_get_delay() {
1771        assert_eq!(
1772            Client::get_delay(0),
1773            Duration::from_millis(0),
1774            "starts with 0 delay"
1775        );
1776        // powers of 2 backoff
1777        assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1778        assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1779        assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1780        // maxes out at 30s
1781        assert_eq!(
1782            Client::get_delay(9),
1783            Duration::from_secs(30),
1784            "max delay is 30s"
1785        );
1786        assert_eq!(
1787            Client::get_delay(10),
1788            Duration::from_secs(30),
1789            "max delay is 30s"
1790        );
1791    }
1792
1793    #[tokio::test]
1794    #[ignore = "integration test with live hs server for height stream"]
1795    async fn test_stream_height_events() -> anyhow::Result<()> {
1796        let (tx, mut rx) = mpsc::channel(16);
1797        let handle = tokio::spawn(async move {
1798            let client = Client::builder()
1799                .url("https://monad-testnet.hypersync.xyz")
1800                .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1801                .build()?;
1802            let mut es = client.get_es_stream().context("get es stream")?;
1803            Client::stream_height_events(&mut es, &tx).await
1804        });
1805
1806        let val = rx.recv().await;
1807        assert!(val.is_some());
1808        assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
1809        let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
1810            panic!("should have received height")
1811        };
1812        let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
1813            panic!("should have received height")
1814        };
1815        assert!(height2 > height);
1816        drop(rx);
1817
1818        let res = handle.await.expect("should have joined");
1819        let received_an_event =
1820            res.expect("should have ended the stream gracefully after dropping rx");
1821        assert!(received_an_event, "should have received an event");
1822
1823        Ok(())
1824    }
1825}