hypersync_client/lib.rs
1#![deny(missing_docs)]
2//! # HyperSync Client
3//!
4//! A high-performance Rust client for the HyperSync protocol, enabling efficient retrieval
5//! of blockchain data including blocks, transactions, logs, and traces.
6//!
7//! ## Features
8//!
9//! - **High-performance streaming**: Parallel data fetching with automatic retries
10//! - **Flexible querying**: Rich query builder API for precise data selection
11//! - **Multiple data formats**: Support for Arrow, Parquet, and simple Rust types
12//! - **Event decoding**: Automatic ABI decoding for smart contract events
13//! - **Real-time updates**: Live height streaming via Server-Sent Events
14//! - **Production ready**: Built-in rate limiting, retries, and error handling
15//!
16//! ## Quick Start
17//!
18//! ```no_run
19//! use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//! // Create a client for Ethereum mainnet
24//! let client = Client::builder()
25//! .chain_id(1)
26//! .api_token(std::env::var("ENVIO_API_TOKEN")?)
27//! .build()?;
28//!
29//! // Query ERC20 transfer events from USDC contract
30//! let query = Query::new()
31//! .from_block(19000000)
32//! .to_block_excl(19001000)
33//! .where_logs(
34//! LogFilter::all()
35//! .and_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
36//! .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
37//! )
38//! .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
39//!
40//! // Get all data in one response
41//! let response = client.get(&query).await?;
42//! println!("Retrieved {} blocks", response.data.blocks.len());
43//!
44//! // Or stream data for large ranges
45//! let mut receiver = client.stream(query, StreamConfig::default()).await?;
46//! while let Some(response) = receiver.recv().await {
47//! let response = response?;
48//! println!("Streaming: got blocks up to {}", response.next_block);
49//! }
50//!
51//! Ok(())
52//! }
53//! ```
54//!
55//! ## Main Types
56//!
57//! - [`Client`] - Main client for interacting with HyperSync servers
58//! - [`net_types::Query`] - Query builder for specifying what data to fetch
59//! - [`StreamConfig`] - Configuration for streaming operations
60//! - [`QueryResponse`] - Response containing blocks, transactions, logs, and traces
61//! - [`ArrowResponse`] - Response in Apache Arrow format for high-performance processing
62//!
63//! ## Authentication
64//!
65//! You'll need a HyperSync API token to access the service. Get one from
66//! [https://envio.dev/app/api-tokens](https://envio.dev/app/api-tokens).
67//!
68//! ## Examples
69//!
70//! See the `examples/` directory for more detailed usage patterns including:
71//! - ERC20 token transfers
72//! - Wallet transaction history
73//! - Event decoding and filtering
74//! - Real-time data streaming
75use std::time::Instant;
76use std::{sync::Arc, time::Duration};
77
78use anyhow::{anyhow, Context, Result};
79use futures::StreamExt;
80use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
81use reqwest::{Method, StatusCode};
82use reqwest_eventsource::retry::ExponentialBackoff;
83use reqwest_eventsource::{Event, EventSource};
84
85pub mod arrow_reader;
86mod column_mapping;
87mod config;
88mod decode;
89mod decode_call;
90mod from_arrow;
91mod parquet_out;
92mod parse_response;
93pub mod preset_query;
94mod rayon_async;
95pub mod simple_types;
96mod stream;
97mod types;
98mod util;
99
100pub use hypersync_format as format;
101pub use hypersync_net_types as net_types;
102pub use hypersync_schema as schema;
103
104use parse_response::parse_query_response;
105use tokio::sync::mpsc;
106use types::{EventResponse, ResponseData};
107use url::Url;
108
109pub use column_mapping::{ColumnMapping, DataType};
110pub use config::HexOutput;
111pub use config::{ClientConfig, SerializationFormat, StreamConfig};
112pub use decode::Decoder;
113pub use decode_call::CallDecoder;
114pub use types::{ArrowResponse, ArrowResponseData, QueryResponse};
115
116use crate::parse_response::read_query_response;
117use crate::simple_types::InternalEventJoinStrategy;
118
119#[derive(Debug)]
120struct HttpClientWrapper {
121 /// Mutable state that needs to be refreshed periodically
122 inner: std::sync::Mutex<HttpClientWrapperInner>,
123 /// HyperSync server api token.
124 api_token: String,
125 /// Standard timeout for http requests.
126 timeout: Duration,
127 /// Pools are never idle since polling often happens on the client
128 /// so connections never timeout and dns lookups never happen again
129 /// this is problematic if the underlying ip address changes during
130 /// failovers on hypersync. Since reqwest doesn't implement this we
131 /// simply recreate the client every time
132 max_connection_age: Duration,
133 /// For refreshing the client
134 user_agent: String,
135}
136
137#[derive(Debug)]
138struct HttpClientWrapperInner {
139 /// Initialized reqwest instance for client url.
140 client: reqwest::Client,
141 /// Timestamp when the client was created
142 created_at: Instant,
143}
144
145impl HttpClientWrapper {
146 fn new(user_agent: String, api_token: String, timeout: Duration) -> Self {
147 let client = reqwest::Client::builder()
148 .no_gzip()
149 .user_agent(&user_agent)
150 .build()
151 .unwrap();
152
153 Self {
154 inner: std::sync::Mutex::new(HttpClientWrapperInner {
155 client,
156 created_at: Instant::now(),
157 }),
158 api_token,
159 timeout,
160 max_connection_age: Duration::from_secs(60),
161 user_agent,
162 }
163 }
164
165 fn get_client(&self) -> reqwest::Client {
166 let mut inner = self.inner.lock().expect("HttpClientWrapper mutex poisoned");
167
168 // Check if client needs refresh due to age
169 if inner.created_at.elapsed() > self.max_connection_age {
170 // Recreate client to force new DNS lookup for failover scenarios
171 inner.client = reqwest::Client::builder()
172 .no_gzip()
173 .user_agent(&self.user_agent)
174 .build()
175 .unwrap();
176 inner.created_at = Instant::now();
177 }
178
179 // Clone is cheap for reqwest::Client (it's Arc-wrapped internally)
180 inner.client.clone()
181 }
182
183 fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
184 let client = self.get_client();
185 client
186 .request(method, url)
187 .timeout(self.timeout)
188 .bearer_auth(&self.api_token)
189 }
190
191 fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
192 let client = self.get_client();
193 client.request(method, url).bearer_auth(&self.api_token)
194 }
195}
196
197/// Internal client state to handle http requests and retries.
198#[derive(Debug)]
199struct ClientInner {
200 http_client: HttpClientWrapper,
201 /// HyperSync server URL.
202 url: Url,
203 /// Number of retries to attempt before returning error.
204 max_num_retries: usize,
205 /// Milliseconds that would be used for retry backoff increasing.
206 retry_backoff_ms: u64,
207 /// Initial wait time for request backoff.
208 retry_base_ms: u64,
209 /// Ceiling time for request backoff.
210 retry_ceiling_ms: u64,
211 /// Query serialization format to use for HTTP requests.
212 serialization_format: SerializationFormat,
213}
214
215/// Client to handle http requests and retries.
216#[derive(Clone, Debug)]
217pub struct Client {
218 inner: Arc<ClientInner>,
219}
220
221impl Client {
222 /// Creates a new client with the given configuration.
223 ///
224 /// Configuration must include the `url` and `api_token` fields.
225 /// # Example
226 /// ```
227 /// use hypersync_client::{Client, ClientConfig};
228 ///
229 /// let config = ClientConfig {
230 /// url: "https://eth.hypersync.xyz".to_string(),
231 /// api_token: std::env::var("ENVIO_API_TOKEN")?,
232 /// ..Default::default()
233 /// };
234 /// let client = Client::new(config)?;
235 /// # Ok::<(), anyhow::Error>(())
236 /// ```
237 ///
238 /// # Errors
239 /// This method fails if the config is invalid.
240 pub fn new(cfg: ClientConfig) -> Result<Self> {
241 // hscr stands for hypersync client rust
242 cfg.validate().context("invalid ClientConfig")?;
243 let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
244 Self::new_internal(cfg, user_agent)
245 }
246
247 /// Creates a new client builder.
248 ///
249 /// # Example
250 /// ```
251 /// use hypersync_client::Client;
252 ///
253 /// let client = Client::builder()
254 /// .chain_id(1)
255 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
256 /// .build()
257 /// .unwrap();
258 /// ```
259 pub fn builder() -> ClientBuilder {
260 ClientBuilder::new()
261 }
262
263 #[doc(hidden)]
264 pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
265 // Creates a new client with the given configuration and custom user agent.
266 // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
267 Self::new_internal(cfg, user_agent.into())
268 }
269
270 /// Internal constructor that takes both config and user agent.
271 fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
272 let http_client = HttpClientWrapper::new(
273 user_agent,
274 cfg.api_token,
275 Duration::from_millis(cfg.http_req_timeout_millis),
276 );
277
278 let url = Url::parse(&cfg.url).context("url is malformed")?;
279
280 Ok(Self {
281 inner: Arc::new(ClientInner {
282 http_client,
283 url,
284 max_num_retries: cfg.max_num_retries,
285 retry_backoff_ms: cfg.retry_backoff_ms,
286 retry_base_ms: cfg.retry_base_ms,
287 retry_ceiling_ms: cfg.retry_ceiling_ms,
288 serialization_format: cfg.serialization_format,
289 }),
290 })
291 }
292
293 /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
294 /// query and stream configuration.
295 ///
296 /// ### Implementation
297 /// Runs multiple queries simultaneously based on config.concurrency.
298 ///
299 /// Each query runs until it reaches query.to, server height, any max_num_* query param,
300 /// or execution timed out by server.
301 ///
302 /// # ⚠️ Important Warning
303 ///
304 /// This method will continue executing until the query has run to completion from beginning
305 /// to the end of the block range defined in the query. For heavy queries with large block
306 /// ranges or high data volumes, consider:
307 ///
308 /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
309 /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
310 /// - Break large queries into smaller block ranges
311 ///
312 /// # Example
313 /// ```no_run
314 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
315 ///
316 /// # async fn example() -> anyhow::Result<()> {
317 /// let client = Client::builder()
318 /// .chain_id(1)
319 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
320 /// .build()?;
321 ///
322 /// // Query ERC20 transfer events
323 /// let query = Query::new()
324 /// .from_block(19000000)
325 /// .to_block_excl(19000010)
326 /// .where_logs(
327 /// LogFilter::all()
328 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
329 /// )
330 /// .select_log_fields([LogField::Address, LogField::Data]);
331 /// let response = client.collect(query, StreamConfig::default()).await?;
332 ///
333 /// println!("Collected {} events", response.data.logs.len());
334 /// # Ok(())
335 /// # }
336 /// ```
337 pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
338 check_simple_stream_params(&config)?;
339
340 let mut recv = stream::stream_arrow(self, query, config)
341 .await
342 .context("start stream")?;
343
344 let mut data = ResponseData::default();
345 let mut archive_height = None;
346 let mut next_block = 0;
347 let mut total_execution_time = 0;
348
349 while let Some(res) = recv.recv().await {
350 let res = res.context("get response")?;
351 let res: QueryResponse =
352 QueryResponse::try_from(&res).context("convert arrow response")?;
353
354 for batch in res.data.blocks {
355 data.blocks.push(batch);
356 }
357 for batch in res.data.transactions {
358 data.transactions.push(batch);
359 }
360 for batch in res.data.logs {
361 data.logs.push(batch);
362 }
363 for batch in res.data.traces {
364 data.traces.push(batch);
365 }
366
367 archive_height = res.archive_height;
368 next_block = res.next_block;
369 total_execution_time += res.total_execution_time
370 }
371
372 Ok(QueryResponse {
373 archive_height,
374 next_block,
375 total_execution_time,
376 data,
377 rollback_guard: None,
378 })
379 }
380
381 /// Retrieves events through a stream using the provided query and stream configuration.
382 ///
383 /// # ⚠️ Important Warning
384 ///
385 /// This method will continue executing until the query has run to completion from beginning
386 /// to the end of the block range defined in the query. For heavy queries with large block
387 /// ranges or high data volumes, consider:
388 ///
389 /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
390 /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
391 /// - Break large queries into smaller block ranges
392 ///
393 /// # Example
394 /// ```no_run
395 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
396 ///
397 /// # async fn example() -> anyhow::Result<()> {
398 /// let client = Client::builder()
399 /// .chain_id(1)
400 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
401 /// .build()?;
402 ///
403 /// // Query transactions to a specific address
404 /// let query = Query::new()
405 /// .from_block(19000000)
406 /// .to_block_excl(19000100)
407 /// .where_transactions(
408 /// TransactionFilter::all()
409 /// .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
410 /// )
411 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
412 /// let response = client.collect_events(query, StreamConfig::default()).await?;
413 ///
414 /// println!("Collected {} events", response.data.len());
415 /// # Ok(())
416 /// # }
417 /// ```
418 pub async fn collect_events(
419 &self,
420 mut query: Query,
421 config: StreamConfig,
422 ) -> Result<EventResponse> {
423 check_simple_stream_params(&config)?;
424
425 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
426 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
427
428 let mut recv = stream::stream_arrow(self, query, config)
429 .await
430 .context("start stream")?;
431
432 let mut data = Vec::new();
433 let mut archive_height = None;
434 let mut next_block = 0;
435 let mut total_execution_time = 0;
436
437 while let Some(res) = recv.recv().await {
438 let res = res.context("get response")?;
439 let res: QueryResponse =
440 QueryResponse::try_from(&res).context("convert arrow response")?;
441 let events = event_join_strategy.join_from_response_data(res.data);
442
443 data.extend(events);
444
445 archive_height = res.archive_height;
446 next_block = res.next_block;
447 total_execution_time += res.total_execution_time
448 }
449
450 Ok(EventResponse {
451 archive_height,
452 next_block,
453 total_execution_time,
454 data,
455 rollback_guard: None,
456 })
457 }
458
459 /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
460 /// the provided query and stream configuration.
461 ///
462 /// Returns data in Apache Arrow format for high-performance columnar processing.
463 /// Useful for analytics workloads or when working with Arrow-compatible tools.
464 ///
465 /// # ⚠️ Important Warning
466 ///
467 /// This method will continue executing until the query has run to completion from beginning
468 /// to the end of the block range defined in the query. For heavy queries with large block
469 /// ranges or high data volumes, consider:
470 ///
471 /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
472 /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
473 /// - Break large queries into smaller block ranges
474 ///
475 /// # Example
476 /// ```no_run
477 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
478 ///
479 /// # async fn example() -> anyhow::Result<()> {
480 /// let client = Client::builder()
481 /// .chain_id(1)
482 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
483 /// .build()?;
484 ///
485 /// // Get block data in Arrow format for analytics
486 /// let query = Query::new()
487 /// .from_block(19000000)
488 /// .to_block_excl(19000100)
489 /// .include_all_blocks()
490 /// .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
491 /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
492 ///
493 /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
494 /// # Ok(())
495 /// # }
496 /// ```
497 pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
498 let mut recv = stream::stream_arrow(self, query, config)
499 .await
500 .context("start stream")?;
501
502 let mut data = ArrowResponseData::default();
503 let mut archive_height = None;
504 let mut next_block = 0;
505 let mut total_execution_time = 0;
506
507 while let Some(res) = recv.recv().await {
508 let res = res.context("get response")?;
509
510 for batch in res.data.blocks {
511 data.blocks.push(batch);
512 }
513 for batch in res.data.transactions {
514 data.transactions.push(batch);
515 }
516 for batch in res.data.logs {
517 data.logs.push(batch);
518 }
519 for batch in res.data.traces {
520 data.traces.push(batch);
521 }
522 for batch in res.data.decoded_logs {
523 data.decoded_logs.push(batch);
524 }
525
526 archive_height = res.archive_height;
527 next_block = res.next_block;
528 total_execution_time += res.total_execution_time
529 }
530
531 Ok(ArrowResponse {
532 archive_height,
533 next_block,
534 total_execution_time,
535 data,
536 rollback_guard: None,
537 })
538 }
539
540 /// Writes parquet file getting data through a stream using the provided path, query,
541 /// and stream configuration.
542 ///
543 /// Streams data directly to a Parquet file for efficient storage and later analysis.
544 /// Perfect for data exports or ETL pipelines.
545 ///
546 /// # ⚠️ Important Warning
547 ///
548 /// This method will continue executing until the query has run to completion from beginning
549 /// to the end of the block range defined in the query. For heavy queries with large block
550 /// ranges or high data volumes, consider:
551 ///
552 /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
553 /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
554 /// - Break large queries into smaller block ranges
555 ///
556 /// # Example
557 /// ```no_run
558 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
559 ///
560 /// # async fn example() -> anyhow::Result<()> {
561 /// let client = Client::builder()
562 /// .chain_id(1)
563 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
564 /// .build()?;
565 ///
566 /// // Export all DEX trades to Parquet for analysis
567 /// let query = Query::new()
568 /// .from_block(19000000)
569 /// .to_block_excl(19010000)
570 /// .where_logs(
571 /// LogFilter::all()
572 /// .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
573 /// )
574 /// .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
575 /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
576 ///
577 /// println!("Trade data exported to trades.parquet");
578 /// # Ok(())
579 /// # }
580 /// ```
581 pub async fn collect_parquet(
582 &self,
583 path: &str,
584 query: Query,
585 config: StreamConfig,
586 ) -> Result<()> {
587 parquet_out::collect_parquet(self, path, query, config).await
588 }
589
590 /// Internal implementation of getting chain_id from server
591 async fn get_chain_id_impl(&self) -> Result<u64> {
592 let mut url = self.inner.url.clone();
593 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
594 segments.push("chain_id");
595 std::mem::drop(segments);
596 let req = self.inner.http_client.request(Method::GET, url);
597
598 let res = req.send().await.context("execute http req")?;
599
600 let status = res.status();
601 if !status.is_success() {
602 return Err(anyhow!("http response status code {}", status));
603 }
604
605 let chain_id: ChainId = res.json().await.context("read response body json")?;
606
607 Ok(chain_id.chain_id)
608 }
609
610 /// Internal implementation of getting height from server
611 async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
612 let mut url = self.inner.url.clone();
613 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
614 segments.push("height");
615 std::mem::drop(segments);
616 let mut req = self.inner.http_client.request(Method::GET, url);
617 if let Some(http_timeout_override) = http_timeout_override {
618 req = req.timeout(http_timeout_override);
619 }
620
621 let res = req.send().await.context("execute http req")?;
622
623 let status = res.status();
624 if !status.is_success() {
625 return Err(anyhow!("http response status code {}", status));
626 }
627
628 let height: ArchiveHeight = res.json().await.context("read response body json")?;
629
630 Ok(height.height.unwrap_or(0))
631 }
632
633 /// Get the chain_id from the server with retries.
634 ///
635 /// # Example
636 /// ```no_run
637 /// use hypersync_client::Client;
638 ///
639 /// # async fn example() -> anyhow::Result<()> {
640 /// let client = Client::builder()
641 /// .chain_id(1)
642 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
643 /// .build()?;
644 ///
645 /// let chain_id = client.get_chain_id().await?;
646 /// println!("Connected to chain ID: {}", chain_id);
647 /// # Ok(())
648 /// # }
649 /// ```
650 pub async fn get_chain_id(&self) -> Result<u64> {
651 let mut base = self.inner.retry_base_ms;
652
653 let mut err = anyhow!("");
654
655 for _ in 0..self.inner.max_num_retries + 1 {
656 match self.get_chain_id_impl().await {
657 Ok(res) => return Ok(res),
658 Err(e) => {
659 log::error!(
660 "failed to get chain_id from server, retrying... The error was: {e:?}"
661 );
662 err = err.context(format!("{e:?}"));
663 }
664 }
665
666 let base_ms = Duration::from_millis(base);
667 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
668 rand::random(),
669 self.inner.retry_backoff_ms,
670 ));
671
672 tokio::time::sleep(base_ms + jitter).await;
673
674 base = std::cmp::min(
675 base + self.inner.retry_backoff_ms,
676 self.inner.retry_ceiling_ms,
677 );
678 }
679
680 Err(err)
681 }
682
683 /// Get the height of from server with retries.
684 ///
685 /// # Example
686 /// ```no_run
687 /// use hypersync_client::Client;
688 ///
689 /// # async fn example() -> anyhow::Result<()> {
690 /// let client = Client::builder()
691 /// .chain_id(1)
692 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
693 /// .build()?;
694 ///
695 /// let height = client.get_height().await?;
696 /// println!("Current block height: {}", height);
697 /// # Ok(())
698 /// # }
699 /// ```
700 pub async fn get_height(&self) -> Result<u64> {
701 let mut base = self.inner.retry_base_ms;
702
703 let mut err = anyhow!("");
704
705 for _ in 0..self.inner.max_num_retries + 1 {
706 match self.get_height_impl(None).await {
707 Ok(res) => return Ok(res),
708 Err(e) => {
709 log::error!(
710 "failed to get height from server, retrying... The error was: {e:?}"
711 );
712 err = err.context(format!("{e:?}"));
713 }
714 }
715
716 let base_ms = Duration::from_millis(base);
717 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
718 rand::random(),
719 self.inner.retry_backoff_ms,
720 ));
721
722 tokio::time::sleep(base_ms + jitter).await;
723
724 base = std::cmp::min(
725 base + self.inner.retry_backoff_ms,
726 self.inner.retry_ceiling_ms,
727 );
728 }
729
730 Err(err)
731 }
732
733 /// Get the height of the Client instance for health checks.
734 ///
735 /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
736 ///
737 /// # Example
738 /// ```no_run
739 /// use hypersync_client::Client;
740 /// use std::time::Duration;
741 ///
742 /// # async fn example() -> anyhow::Result<()> {
743 /// let client = Client::builder()
744 /// .chain_id(1)
745 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
746 /// .build()?;
747 ///
748 /// // Quick health check with 5 second timeout
749 /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
750 /// println!("Server is healthy at block: {}", height);
751 /// # Ok(())
752 /// # }
753 /// ```
754 pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
755 self.get_height_impl(http_req_timeout).await
756 }
757
758 /// Executes query with retries and returns the response.
759 ///
760 /// # Example
761 /// ```no_run
762 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
763 ///
764 /// # async fn example() -> anyhow::Result<()> {
765 /// let client = Client::builder()
766 /// .chain_id(1)
767 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
768 /// .build()?;
769 ///
770 /// // Query all blocks from a specific range
771 /// let query = Query::new()
772 /// .from_block(19000000)
773 /// .to_block_excl(19000010)
774 /// .include_all_blocks()
775 /// .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
776 /// let response = client.get(&query).await?;
777 ///
778 /// println!("Retrieved {} blocks", response.data.blocks.len());
779 /// # Ok(())
780 /// # }
781 /// ```
782 pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
783 let arrow_response = self.get_arrow(query).await.context("get data")?;
784 let converted =
785 QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
786 Ok(converted)
787 }
788
789 /// Add block, transaction and log fields selection to the query, executes it with retries
790 /// and returns the response.
791 ///
792 /// This method automatically joins blocks, transactions, and logs into unified events,
793 /// making it easier to work with related blockchain data.
794 ///
795 /// # Example
796 /// ```no_run
797 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
798 ///
799 /// # async fn example() -> anyhow::Result<()> {
800 /// let client = Client::builder()
801 /// .chain_id(1)
802 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
803 /// .build()?;
804 ///
805 /// // Query ERC20 transfers with transaction context
806 /// let query = Query::new()
807 /// .from_block(19000000)
808 /// .to_block_excl(19000010)
809 /// .where_logs(
810 /// LogFilter::all()
811 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
812 /// )
813 /// .select_log_fields([LogField::Address, LogField::Data])
814 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
815 /// let response = client.get_events(query).await?;
816 ///
817 /// println!("Retrieved {} joined events", response.data.len());
818 /// # Ok(())
819 /// # }
820 /// ```
821 pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
822 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
823 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
824 let arrow_response = self.get_arrow(&query).await.context("get data")?;
825 EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
826 }
827
828 /// Executes query once and returns the result in (Arrow, size) format using JSON serialization.
829 async fn get_arrow_impl_json(
830 &self,
831 query: &Query,
832 ) -> std::result::Result<(ArrowResponse, u64), HyperSyncResponseError> {
833 let mut url = self.inner.url.clone();
834 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
835 segments.push("query");
836 segments.push("arrow-ipc");
837 std::mem::drop(segments);
838 let req = self.inner.http_client.request(Method::POST, url);
839
840 let res = req.json(&query).send().await.context("execute http req")?;
841
842 let status = res.status();
843 if status == StatusCode::PAYLOAD_TOO_LARGE {
844 return Err(HyperSyncResponseError::PayloadTooLarge);
845 }
846 if !status.is_success() {
847 let text = res.text().await.context("read text to see error")?;
848
849 return Err(HyperSyncResponseError::Other(anyhow!(
850 "http response status code {}, err body: {}",
851 status,
852 text
853 )));
854 }
855
856 let bytes = res.bytes().await.context("read response body bytes")?;
857
858 let res = tokio::task::block_in_place(|| {
859 parse_query_response(&bytes).context("parse query response")
860 })?;
861
862 Ok((res, bytes.len().try_into().unwrap()))
863 }
864
865 fn should_cache_queries(&self) -> bool {
866 matches!(
867 self.inner.serialization_format,
868 SerializationFormat::CapnProto {
869 should_cache_queries: true
870 }
871 )
872 }
873
874 /// Executes query once and returns the result in (Arrow, size) format using Cap'n Proto serialization.
875 async fn get_arrow_impl_capnp(
876 &self,
877 query: &Query,
878 ) -> std::result::Result<(ArrowResponse, u64), HyperSyncResponseError> {
879 let mut url = self.inner.url.clone();
880 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
881 segments.push("query");
882 segments.push("arrow-ipc");
883 segments.push("capnp");
884 std::mem::drop(segments);
885
886 let should_cache = self.should_cache_queries();
887
888 if should_cache {
889 let query_with_id = {
890 let mut message = capnp::message::Builder::new_default();
891 let mut request_builder =
892 message.init_root::<hypersync_net_types_capnp::request::Builder>();
893
894 request_builder
895 .build_query_id_from_query(query)
896 .context("build query id")?;
897 let mut query_with_id = Vec::new();
898 capnp::serialize_packed::write_message(&mut query_with_id, &message)
899 .context("write capnp message")?;
900 query_with_id
901 };
902
903 let req = self.inner.http_client.request(Method::POST, url.clone());
904
905 let res = req
906 .body(query_with_id)
907 .send()
908 .await
909 .context("execute http req")?;
910
911 let status = res.status();
912 if status == StatusCode::PAYLOAD_TOO_LARGE {
913 return Err(HyperSyncResponseError::PayloadTooLarge);
914 }
915 if status.is_success() {
916 let bytes = res.bytes().await.context("read response body bytes")?;
917
918 let mut opts = capnp::message::ReaderOptions::new();
919 opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
920 let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
921 .context("create message reader")?;
922 let query_response = message_reader
923 .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
924 .context("get cached_query_response root")?;
925 match query_response.get_either().which().context("get either")? {
926 hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
927 query_response,
928 ) => {
929 let res = tokio::task::block_in_place(|| {
930 let res = query_response?;
931 read_query_response(&res).context("parse query response cached")
932 })?;
933 return Ok((res, bytes.len().try_into().unwrap()));
934 }
935 hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
936 log::trace!("query was not cached, retrying with full query");
937 }
938 }
939 } else {
940 let text = res.text().await.context("read text to see error")?;
941 log::warn!(
942 "Failed cache query, will retry full query. {}, err body: {}",
943 status,
944 text
945 );
946 }
947 };
948
949 let full_query_bytes = {
950 let mut message = capnp::message::Builder::new_default();
951 let mut query_builder =
952 message.init_root::<hypersync_net_types_capnp::request::Builder>();
953
954 query_builder
955 .build_full_query_from_query(query, should_cache)
956 .context("build full query")?;
957 let mut bytes = Vec::new();
958 capnp::serialize_packed::write_message(&mut bytes, &message)
959 .context("write full query capnp message")?;
960 bytes
961 };
962
963 let req = self.inner.http_client.request(Method::POST, url);
964
965 let res = req
966 .body(full_query_bytes)
967 .send()
968 .await
969 .context("execute http req")?;
970
971 let status = res.status();
972 if status == StatusCode::PAYLOAD_TOO_LARGE {
973 return Err(HyperSyncResponseError::PayloadTooLarge);
974 }
975 if !status.is_success() {
976 let text = res.text().await.context("read text to see error")?;
977
978 return Err(HyperSyncResponseError::Other(anyhow!(
979 "http response status code {}, err body: {}",
980 status,
981 text
982 )));
983 }
984
985 let bytes = res.bytes().await.context("read response body bytes")?;
986
987 let res = tokio::task::block_in_place(|| {
988 parse_query_response(&bytes).context("parse query response")
989 })?;
990
991 Ok((res, bytes.len().try_into().unwrap()))
992 }
993
994 /// Executes query once and returns the result in (Arrow, size) format.
995 async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
996 let mut query = query.clone();
997 loop {
998 let res = match self.inner.serialization_format {
999 SerializationFormat::Json => self.get_arrow_impl_json(&query).await,
1000 SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(&query).await,
1001 };
1002 match res {
1003 Ok(res) => return Ok(res),
1004 Err(HyperSyncResponseError::Other(e)) => return Err(e),
1005 Err(HyperSyncResponseError::PayloadTooLarge) => {
1006 let block_range = if let Some(to_block) = query.to_block {
1007 let current = to_block - query.from_block;
1008 if current < 2 {
1009 anyhow::bail!(
1010 "Payload is too large and query is using the minimum block range."
1011 )
1012 }
1013 // Half the current block range
1014 current / 2
1015 } else {
1016 200
1017 };
1018 let to_block = query.from_block + block_range;
1019 query.to_block = Some(to_block);
1020
1021 log::trace!(
1022 "Payload is too large, retrying with block range from: {} to: {}",
1023 query.from_block,
1024 to_block
1025 );
1026 }
1027 }
1028 }
1029 }
1030
1031 /// Executes query with retries and returns the response in Arrow format.
1032 pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
1033 self.get_arrow_with_size(query).await.map(|res| res.0)
1034 }
1035
1036 /// Internal implementation for get_arrow.
1037 async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
1038 let mut base = self.inner.retry_base_ms;
1039
1040 let mut err = anyhow!("");
1041
1042 for _ in 0..self.inner.max_num_retries + 1 {
1043 match self.get_arrow_impl(query).await {
1044 Ok(res) => return Ok(res),
1045 Err(e) => {
1046 log::error!(
1047 "failed to get arrow data from server, retrying... The error was: {e:?}"
1048 );
1049 err = err.context(format!("{e:?}"));
1050 }
1051 }
1052
1053 let base_ms = Duration::from_millis(base);
1054 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
1055 rand::random(),
1056 self.inner.retry_backoff_ms,
1057 ));
1058
1059 tokio::time::sleep(base_ms + jitter).await;
1060
1061 base = std::cmp::min(
1062 base + self.inner.retry_backoff_ms,
1063 self.inner.retry_ceiling_ms,
1064 );
1065 }
1066
1067 Err(err)
1068 }
1069
1070 /// Spawns task to execute query and return data via a channel.
1071 ///
1072 /// # Example
1073 /// ```no_run
1074 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
1075 ///
1076 /// # async fn example() -> anyhow::Result<()> {
1077 /// let client = Client::builder()
1078 /// .chain_id(1)
1079 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1080 /// .build()?;
1081 ///
1082 /// // Stream all ERC20 transfer events
1083 /// let query = Query::new()
1084 /// .from_block(19000000)
1085 /// .where_logs(
1086 /// LogFilter::all()
1087 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1088 /// )
1089 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
1090 /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
1091 ///
1092 /// while let Some(response) = receiver.recv().await {
1093 /// let response = response?;
1094 /// println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
1095 /// }
1096 /// # Ok(())
1097 /// # }
1098 /// ```
1099 pub async fn stream(
1100 &self,
1101 query: Query,
1102 config: StreamConfig,
1103 ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1104 check_simple_stream_params(&config)?;
1105
1106 let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1107 mpsc::channel(config.concurrency);
1108
1109 let mut inner_rx = self
1110 .stream_arrow(query, config)
1111 .await
1112 .context("start inner stream")?;
1113
1114 tokio::spawn(async move {
1115 while let Some(resp) = inner_rx.recv().await {
1116 let msg = resp
1117 .context("inner receiver")
1118 .and_then(|r| QueryResponse::try_from(&r));
1119 let is_err = msg.is_err();
1120 if tx.send(msg).await.is_err() || is_err {
1121 return;
1122 }
1123 }
1124 });
1125
1126 Ok(rx)
1127 }
1128
1129 /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1130 /// returning data via a channel.
1131 ///
1132 /// This method automatically joins blocks, transactions, and logs into unified events,
1133 /// then streams them via a channel for real-time processing.
1134 ///
1135 /// # Example
1136 /// ```no_run
1137 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1138 ///
1139 /// # async fn example() -> anyhow::Result<()> {
1140 /// let client = Client::builder()
1141 /// .chain_id(1)
1142 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1143 /// .build()?;
1144 ///
1145 /// // Stream NFT transfer events with transaction context
1146 /// let query = Query::new()
1147 /// .from_block(19000000)
1148 /// .where_logs(
1149 /// LogFilter::all()
1150 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1151 /// )
1152 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1153 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1154 /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1155 ///
1156 /// while let Some(response) = receiver.recv().await {
1157 /// let response = response?;
1158 /// println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1159 /// }
1160 /// # Ok(())
1161 /// # }
1162 /// ```
1163 pub async fn stream_events(
1164 &self,
1165 mut query: Query,
1166 config: StreamConfig,
1167 ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1168 check_simple_stream_params(&config)?;
1169
1170 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1171
1172 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1173
1174 let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1175 mpsc::channel(config.concurrency);
1176
1177 let mut inner_rx = self
1178 .stream_arrow(query, config)
1179 .await
1180 .context("start inner stream")?;
1181
1182 tokio::spawn(async move {
1183 while let Some(resp) = inner_rx.recv().await {
1184 let msg = resp
1185 .context("inner receiver")
1186 .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1187 let is_err = msg.is_err();
1188 if tx.send(msg).await.is_err() || is_err {
1189 return;
1190 }
1191 }
1192 });
1193
1194 Ok(rx)
1195 }
1196
1197 /// Spawns task to execute query and return data via a channel in Arrow format.
1198 ///
1199 /// Returns raw Apache Arrow data via a channel for high-performance processing.
1200 /// Ideal for applications that need to work directly with columnar data.
1201 ///
1202 /// # Example
1203 /// ```no_run
1204 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1205 ///
1206 /// # async fn example() -> anyhow::Result<()> {
1207 /// let client = Client::builder()
1208 /// .chain_id(1)
1209 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1210 /// .build()?;
1211 ///
1212 /// // Stream transaction data in Arrow format for analytics
1213 /// let query = Query::new()
1214 /// .from_block(19000000)
1215 /// .to_block_excl(19000100)
1216 /// .where_transactions(
1217 /// TransactionFilter::all()
1218 /// .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1219 /// )
1220 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1221 /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1222 ///
1223 /// while let Some(response) = receiver.recv().await {
1224 /// let response = response?;
1225 /// println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1226 /// }
1227 /// # Ok(())
1228 /// # }
1229 /// ```
1230 pub async fn stream_arrow(
1231 &self,
1232 query: Query,
1233 config: StreamConfig,
1234 ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1235 stream::stream_arrow(self, query, config).await
1236 }
1237
1238 /// Getter for url field.
1239 ///
1240 /// # Example
1241 /// ```
1242 /// use hypersync_client::Client;
1243 ///
1244 /// let client = Client::builder()
1245 /// .chain_id(1)
1246 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1247 /// .build()
1248 /// .unwrap();
1249 ///
1250 /// println!("Client URL: {}", client.url());
1251 /// ```
1252 pub fn url(&self) -> &Url {
1253 &self.inner.url
1254 }
1255}
1256
1257/// Builder for creating a hypersync client with configuration options.
1258///
1259/// This builder provides a fluent API for configuring client settings like URL,
1260/// authentication, timeouts, and retry behavior.
1261///
1262/// # Example
1263/// ```
1264/// use hypersync_client::{Client, SerializationFormat};
1265///
1266/// let client = Client::builder()
1267/// .chain_id(1)
1268/// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1269/// .http_req_timeout_millis(30000)
1270/// .max_num_retries(3)
1271/// .build()
1272/// .unwrap();
1273/// ```
1274#[derive(Debug, Clone, Default)]
1275pub struct ClientBuilder(ClientConfig);
1276
1277impl ClientBuilder {
1278 /// Creates a new ClientBuilder with default configuration.
1279 pub fn new() -> Self {
1280 Self::default()
1281 }
1282
1283 /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1284 ///
1285 /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1286 ///
1287 /// # Arguments
1288 /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1289 ///
1290 /// # Example
1291 /// ```
1292 /// use hypersync_client::Client;
1293 ///
1294 /// let client = Client::builder()
1295 /// .chain_id(1) // Ethereum mainnet
1296 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1297 /// .build()
1298 /// .unwrap();
1299 /// ```
1300 pub fn chain_id(mut self, chain_id: u64) -> Self {
1301 self.0.url = format!("https://{chain_id}.hypersync.xyz");
1302 self
1303 }
1304
1305 /// Sets a custom URL for the hypersync server.
1306 ///
1307 /// Use this method when you need to connect to a custom hypersync endpoint
1308 /// instead of the default public endpoints.
1309 ///
1310 /// # Arguments
1311 /// * `url` - The hypersync server URL
1312 ///
1313 /// # Example
1314 /// ```
1315 /// use hypersync_client::Client;
1316 ///
1317 /// let client = Client::builder()
1318 /// .url("https://my-custom-hypersync.example.com")
1319 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1320 /// .build()
1321 /// .unwrap();
1322 /// ```
1323 pub fn url<S: ToString>(mut self, url: S) -> Self {
1324 self.0.url = url.to_string();
1325 self
1326 }
1327
1328 /// Sets the api token for authentication.
1329 ///
1330 /// Required for accessing authenticated hypersync endpoints.
1331 ///
1332 /// # Arguments
1333 /// * `api_token` - The authentication token
1334 ///
1335 /// # Example
1336 /// ```
1337 /// use hypersync_client::Client;
1338 ///
1339 /// let client = Client::builder()
1340 /// .chain_id(1)
1341 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1342 /// .build()
1343 /// .unwrap();
1344 /// ```
1345 pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1346 self.0.api_token = api_token.to_string();
1347 self
1348 }
1349
1350 /// Sets the HTTP request timeout in milliseconds.
1351 ///
1352 /// # Arguments
1353 /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1354 ///
1355 /// # Example
1356 /// ```
1357 /// use hypersync_client::Client;
1358 ///
1359 /// let client = Client::builder()
1360 /// .chain_id(1)
1361 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1362 /// .http_req_timeout_millis(60000) // 60 second timeout
1363 /// .build()
1364 /// .unwrap();
1365 /// ```
1366 pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1367 self.0.http_req_timeout_millis = http_req_timeout_millis;
1368 self
1369 }
1370
1371 /// Sets the maximum number of retries for failed requests.
1372 ///
1373 /// # Arguments
1374 /// * `max_num_retries` - Maximum number of retries (default: 10)
1375 ///
1376 /// # Example
1377 /// ```
1378 /// use hypersync_client::Client;
1379 ///
1380 /// let client = Client::builder()
1381 /// .chain_id(1)
1382 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1383 /// .max_num_retries(5)
1384 /// .build()
1385 /// .unwrap();
1386 /// ```
1387 pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1388 self.0.max_num_retries = max_num_retries;
1389 self
1390 }
1391
1392 /// Sets the backoff increment for retry delays.
1393 ///
1394 /// This value is added to the base delay on each retry attempt.
1395 ///
1396 /// # Arguments
1397 /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1398 ///
1399 /// # Example
1400 /// ```
1401 /// use hypersync_client::Client;
1402 ///
1403 /// let client = Client::builder()
1404 /// .chain_id(1)
1405 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1406 /// .retry_backoff_ms(1000) // 1 second backoff increment
1407 /// .build()
1408 /// .unwrap();
1409 /// ```
1410 pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1411 self.0.retry_backoff_ms = retry_backoff_ms;
1412 self
1413 }
1414
1415 /// Sets the initial delay for retry attempts.
1416 ///
1417 /// # Arguments
1418 /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1419 ///
1420 /// # Example
1421 /// ```
1422 /// use hypersync_client::Client;
1423 ///
1424 /// let client = Client::builder()
1425 /// .chain_id(1)
1426 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1427 /// .retry_base_ms(1000) // Start with 1 second delay
1428 /// .build()
1429 /// .unwrap();
1430 /// ```
1431 pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1432 self.0.retry_base_ms = retry_base_ms;
1433 self
1434 }
1435
1436 /// Sets the maximum delay for retry attempts.
1437 ///
1438 /// The retry delay will not exceed this value, even with backoff increments.
1439 ///
1440 /// # Arguments
1441 /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1442 ///
1443 /// # Example
1444 /// ```
1445 /// use hypersync_client::Client;
1446 ///
1447 /// let client = Client::builder()
1448 /// .chain_id(1)
1449 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1450 /// .retry_ceiling_ms(30000) // Cap at 30 seconds
1451 /// .build()
1452 /// .unwrap();
1453 /// ```
1454 pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1455 self.0.retry_ceiling_ms = retry_ceiling_ms;
1456 self
1457 }
1458
1459 /// Sets the serialization format for client-server communication.
1460 ///
1461 /// # Arguments
1462 /// * `serialization_format` - The format to use (JSON or CapnProto)
1463 ///
1464 /// # Example
1465 /// ```
1466 /// use hypersync_client::{Client, SerializationFormat};
1467 ///
1468 /// let client = Client::builder()
1469 /// .chain_id(1)
1470 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1471 /// .serialization_format(SerializationFormat::Json)
1472 /// .build()
1473 /// .unwrap();
1474 /// ```
1475 pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1476 self.0.serialization_format = serialization_format;
1477 self
1478 }
1479
1480 /// Builds the client with the configured settings.
1481 ///
1482 /// # Returns
1483 /// * `Result<Client>` - The configured client or an error if configuration is invalid
1484 ///
1485 /// # Errors
1486 /// Returns an error if:
1487 /// * The URL is malformed
1488 /// * Required configuration is missing
1489 ///
1490 /// # Example
1491 /// ```
1492 /// use hypersync_client::Client;
1493 ///
1494 /// let client = Client::builder()
1495 /// .chain_id(1)
1496 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1497 /// .build()
1498 /// .unwrap();
1499 /// ```
1500 pub fn build(self) -> Result<Client> {
1501 if self.0.url.is_empty() {
1502 anyhow::bail!(
1503 "endpoint needs to be set, try using builder.chain_id(1) or\
1504 builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1505 )
1506 }
1507 Client::new(self.0)
1508 }
1509}
1510
1511/// 200ms
1512const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1513const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1514/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1515/// so we timeout after 15s (3x the ping interval).
1516const READ_TIMEOUT: Duration = Duration::from_secs(15);
1517
1518/// Events emitted by the height stream.
1519#[derive(Debug, Clone, PartialEq, Eq)]
1520pub enum HeightStreamEvent {
1521 /// Successfully connected or reconnected to the SSE stream.
1522 Connected,
1523 /// Received a height update from the server.
1524 Height(u64),
1525 /// Connection lost, will attempt to reconnect after the specified delay.
1526 Reconnecting {
1527 /// Duration to wait before attempting reconnection.
1528 delay: Duration,
1529 /// The error that caused the reconnection.
1530 error_msg: String,
1531 },
1532}
1533
1534enum InternalStreamEvent {
1535 Publish(HeightStreamEvent),
1536 Ping,
1537 Unknown(String),
1538}
1539
1540impl Client {
1541 fn get_es_stream(&self) -> Result<EventSource> {
1542 // Build the GET /height/sse request
1543 let mut url = self.inner.url.clone();
1544 url.path_segments_mut()
1545 .ok()
1546 .context("invalid base URL")?
1547 .extend(&["height", "sse"]);
1548
1549 let req = self
1550 .inner
1551 .http_client
1552 // Don't set timeout for SSE stream
1553 .request_no_timeout(Method::GET, url);
1554
1555 // Configure exponential backoff for library-level retries
1556 let retry_policy = ExponentialBackoff::new(
1557 INITIAL_RECONNECT_DELAY,
1558 2.0,
1559 Some(MAX_RECONNECT_DELAY),
1560 None, // unlimited retries
1561 );
1562
1563 // Turn the request into an EventSource stream with retries
1564 let mut es = reqwest_eventsource::EventSource::new(req)
1565 .context("unexpected error creating EventSource")?;
1566 es.set_retry_policy(Box::new(retry_policy));
1567 Ok(es)
1568 }
1569
1570 async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1571 let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1572 .await
1573 .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1574 else {
1575 return Ok(None);
1576 };
1577
1578 let e = match res.context("failed response")? {
1579 Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1580 Event::Message(event) => match event.event.as_str() {
1581 "height" => {
1582 let height = event
1583 .data
1584 .trim()
1585 .parse::<u64>()
1586 .context("parsing height from event data")?;
1587 InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1588 }
1589 "ping" => InternalStreamEvent::Ping,
1590 _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1591 },
1592 };
1593
1594 Ok(Some(e))
1595 }
1596
1597 async fn stream_height_events(
1598 es: &mut EventSource,
1599 tx: &mpsc::Sender<HeightStreamEvent>,
1600 ) -> Result<bool> {
1601 let mut received_an_event = false;
1602 while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1603 match event {
1604 InternalStreamEvent::Publish(event) => {
1605 received_an_event = true;
1606 if tx.send(event).await.is_err() {
1607 return Ok(received_an_event); // Receiver dropped, exit task
1608 }
1609 }
1610 InternalStreamEvent::Ping => (), // ignore pings
1611 InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1612 }
1613 }
1614 Ok(received_an_event)
1615 }
1616
1617 fn get_delay(consecutive_failures: u32) -> Duration {
1618 if consecutive_failures > 0 {
1619 /// helper function to calculate 2^x
1620 /// optimization using bit shifting
1621 const fn two_to_pow(x: u32) -> u32 {
1622 1 << x
1623 }
1624 // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1625 INITIAL_RECONNECT_DELAY
1626 .saturating_mul(two_to_pow(consecutive_failures - 1))
1627 .min(MAX_RECONNECT_DELAY)
1628 } else {
1629 // On zero consecutive failures, 0 delay
1630 Duration::from_millis(0)
1631 }
1632 }
1633
1634 async fn stream_height_events_with_retry(
1635 &self,
1636 tx: &mpsc::Sender<HeightStreamEvent>,
1637 ) -> Result<()> {
1638 let mut consecutive_failures = 0u32;
1639
1640 loop {
1641 // should always be able to creat a new es stream
1642 // something is wrong with the req builder otherwise
1643 let mut es = self.get_es_stream().context("get es stream")?;
1644
1645 let mut error = anyhow!("");
1646
1647 match Self::stream_height_events(&mut es, tx).await {
1648 Ok(received_an_event) => {
1649 if received_an_event {
1650 consecutive_failures = 0; // Reset after successful connection that then failed
1651 }
1652 log::trace!("Stream height exited");
1653 }
1654 Err(e) => {
1655 log::trace!("Stream height failed: {e:?}");
1656 error = e;
1657 }
1658 }
1659
1660 es.close();
1661
1662 // If the receiver is closed, exit the task
1663 if tx.is_closed() {
1664 break;
1665 }
1666
1667 let delay = Self::get_delay(consecutive_failures);
1668 log::trace!("Reconnecting in {:?}...", delay);
1669
1670 let error_msg = format!("{error:?}");
1671
1672 if tx
1673 .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1674 .await
1675 .is_err()
1676 {
1677 return Ok(()); // Receiver dropped, exit task
1678 }
1679 tokio::time::sleep(delay).await;
1680
1681 // increment consecutive failures so that on the next try
1682 // it will start using back offs
1683 consecutive_failures += 1;
1684 }
1685
1686 Ok(())
1687 }
1688
1689 /// Streams archive height updates from the server via Server-Sent Events.
1690 ///
1691 /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1692 /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1693 ///
1694 /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1695 /// and height updates. This allows applications to display connection status to users.
1696 ///
1697 /// # Returns
1698 /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1699 /// lifecycle and sends events through this channel.
1700 ///
1701 /// # Example
1702 /// ```
1703 /// # use hypersync_client::{Client, HeightStreamEvent};
1704 /// # async fn example() -> anyhow::Result<()> {
1705 /// let client = Client::builder()
1706 /// .url("https://eth.hypersync.xyz")
1707 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1708 /// .build()?;
1709 ///
1710 /// let mut rx = client.stream_height();
1711 ///
1712 /// while let Some(event) = rx.recv().await {
1713 /// match event {
1714 /// HeightStreamEvent::Connected => println!("Connected to stream"),
1715 /// HeightStreamEvent::Height(h) => println!("Height: {}", h),
1716 /// HeightStreamEvent::Reconnecting { delay, error_msg } => {
1717 /// println!("Reconnecting in {delay:?} due to error: {error_msg}")
1718 /// }
1719 /// }
1720 /// }
1721 /// # Ok(())
1722 /// # }
1723 /// ```
1724 pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1725 let (tx, rx) = mpsc::channel(16);
1726 let client = self.clone();
1727
1728 tokio::spawn(async move {
1729 if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1730 log::error!("Stream height failed unexpectedly: {e:?}");
1731 }
1732 });
1733
1734 rx
1735 }
1736}
1737
1738fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1739 if config.event_signature.is_some() {
1740 return Err(anyhow!(
1741 "config.event_signature can't be passed to simple type function. User is expected to \
1742 decode the logs using Decoder."
1743 ));
1744 }
1745 if config.column_mapping.is_some() {
1746 return Err(anyhow!(
1747 "config.column_mapping can't be passed to single type function. User is expected to \
1748 map values manually."
1749 ));
1750 }
1751
1752 Ok(())
1753}
1754
1755/// Used to indicate whether or not a retry should be attempted.
1756#[derive(Debug, thiserror::Error)]
1757pub enum HyperSyncResponseError {
1758 /// Means that the client should retry with a smaller block range.
1759 #[error("hypersync responded with 'payload too large' error")]
1760 PayloadTooLarge,
1761 /// Any other server error.
1762 #[error(transparent)]
1763 Other(#[from] anyhow::Error),
1764}
1765
1766#[cfg(test)]
1767mod tests {
1768 use super::*;
1769 #[test]
1770 fn test_get_delay() {
1771 assert_eq!(
1772 Client::get_delay(0),
1773 Duration::from_millis(0),
1774 "starts with 0 delay"
1775 );
1776 // powers of 2 backoff
1777 assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1778 assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1779 assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1780 // maxes out at 30s
1781 assert_eq!(
1782 Client::get_delay(9),
1783 Duration::from_secs(30),
1784 "max delay is 30s"
1785 );
1786 assert_eq!(
1787 Client::get_delay(10),
1788 Duration::from_secs(30),
1789 "max delay is 30s"
1790 );
1791 }
1792
1793 #[tokio::test]
1794 #[ignore = "integration test with live hs server for height stream"]
1795 async fn test_stream_height_events() -> anyhow::Result<()> {
1796 let (tx, mut rx) = mpsc::channel(16);
1797 let handle = tokio::spawn(async move {
1798 let client = Client::builder()
1799 .url("https://monad-testnet.hypersync.xyz")
1800 .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1801 .build()?;
1802 let mut es = client.get_es_stream().context("get es stream")?;
1803 Client::stream_height_events(&mut es, &tx).await
1804 });
1805
1806 let val = rx.recv().await;
1807 assert!(val.is_some());
1808 assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
1809 let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
1810 panic!("should have received height")
1811 };
1812 let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
1813 panic!("should have received height")
1814 };
1815 assert!(height2 > height);
1816 drop(rx);
1817
1818 let res = handle.await.expect("should have joined");
1819 let received_an_event =
1820 res.expect("should have ended the stream gracefully after dropping rx");
1821 assert!(received_an_event, "should have received an event");
1822
1823 Ok(())
1824 }
1825}