hypersync_client/lib.rs
1#![deny(missing_docs)]
2//! # HyperSync Client
3//!
4//! A high-performance Rust client for the HyperSync protocol, enabling efficient retrieval
5//! of blockchain data including blocks, transactions, logs, and traces.
6//!
7//! ## Features
8//!
9//! - **High-performance streaming**: Parallel data fetching with automatic retries
10//! - **Flexible querying**: Rich query builder API for precise data selection
11//! - **Multiple data formats**: Support for Arrow, Parquet, and simple Rust types
12//! - **Event decoding**: Automatic ABI decoding for smart contract events
13//! - **Real-time updates**: Live height streaming via Server-Sent Events
14//! - **Production ready**: Built-in rate limiting, retries, and error handling
15//!
16//! ## Quick Start
17//!
18//! ```no_run
19//! use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//! // Create a client for Ethereum mainnet
24//! let client = Client::builder()
25//! .chain_id(1)
26//! .api_token(std::env::var("ENVIO_API_TOKEN")?)
27//! .build()?;
28//!
29//! // Query ERC20 transfer events from USDC contract
30//! let query = Query::new()
31//! .from_block(19000000)
32//! .to_block_excl(19001000)
33//! .where_logs(
34//! LogFilter::all()
35//! .and_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
36//! .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
37//! )
38//! .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
39//!
40//! // Get all data in one response
41//! let response = client.get(&query).await?;
42//! println!("Retrieved {} blocks", response.data.blocks.len());
43//!
44//! // Or stream data for large ranges
45//! let mut receiver = client.stream(query, StreamConfig::default()).await?;
46//! while let Some(response) = receiver.recv().await {
47//! let response = response?;
48//! println!("Streaming: got blocks up to {}", response.next_block);
49//! }
50//!
51//! Ok(())
52//! }
53//! ```
54//!
55//! ## Main Types
56//!
57//! - [`Client`] - Main client for interacting with HyperSync servers
58//! - [`net_types::Query`] - Query builder for specifying what data to fetch
59//! - [`StreamConfig`] - Configuration for streaming operations
60//! - [`QueryResponse`] - Response containing blocks, transactions, logs, and traces
61//! - [`ArrowResponse`] - Response in Apache Arrow format for high-performance processing
62//!
63//! ## Authentication
64//!
65//! You'll need a HyperSync API token to access the service. Get one from
66//! [https://envio.dev/app/api-tokens](https://envio.dev/app/api-tokens).
67//!
68//! ## Examples
69//!
70//! See the `examples/` directory for more detailed usage patterns including:
71//! - ERC20 token transfers
72//! - Wallet transaction history
73//! - Event decoding and filtering
74//! - Real-time data streaming
75use std::{sync::Arc, time::Duration};
76
77use anyhow::{anyhow, Context, Result};
78use futures::StreamExt;
79use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
80use reqwest::{Method, StatusCode};
81use reqwest_eventsource::retry::ExponentialBackoff;
82use reqwest_eventsource::{Event, EventSource};
83
84pub mod arrow_reader;
85mod column_mapping;
86mod config;
87mod decode;
88mod decode_call;
89mod from_arrow;
90mod parquet_out;
91mod parse_response;
92pub mod preset_query;
93mod rayon_async;
94pub mod simple_types;
95mod stream;
96mod types;
97mod util;
98
99pub use hypersync_format as format;
100pub use hypersync_net_types as net_types;
101pub use hypersync_schema as schema;
102
103use parse_response::parse_query_response;
104use tokio::sync::mpsc;
105use types::{EventResponse, ResponseData};
106use url::Url;
107
108pub use column_mapping::{ColumnMapping, DataType};
109pub use config::HexOutput;
110pub use config::{ClientConfig, SerializationFormat, StreamConfig};
111pub use decode::Decoder;
112pub use decode_call::CallDecoder;
113pub use types::{ArrowResponse, ArrowResponseData, QueryResponse};
114
115use crate::parse_response::read_query_response;
116use crate::simple_types::InternalEventJoinStrategy;
117
118#[derive(Debug)]
119struct HttpClientWrapper {
120 /// Initialized reqwest instance for client url.
121 client: reqwest::Client,
122 /// HyperSync server api token.
123 api_token: String,
124 /// Standard timeout for http requests.
125 timeout: Duration,
126}
127
128impl HttpClientWrapper {
129 fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
130 self.client
131 .request(method, url)
132 .timeout(self.timeout)
133 .bearer_auth(&self.api_token)
134 }
135
136 fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
137 self.client
138 .request(method, url)
139 .bearer_auth(&self.api_token)
140 }
141}
142
143/// Internal client state to handle http requests and retries.
144#[derive(Debug)]
145struct ClientInner {
146 http_client: HttpClientWrapper,
147 /// HyperSync server URL.
148 url: Url,
149 /// Number of retries to attempt before returning error.
150 max_num_retries: usize,
151 /// Milliseconds that would be used for retry backoff increasing.
152 retry_backoff_ms: u64,
153 /// Initial wait time for request backoff.
154 retry_base_ms: u64,
155 /// Ceiling time for request backoff.
156 retry_ceiling_ms: u64,
157 /// Query serialization format to use for HTTP requests.
158 serialization_format: SerializationFormat,
159}
160
161/// Client to handle http requests and retries.
162#[derive(Clone, Debug)]
163pub struct Client {
164 inner: Arc<ClientInner>,
165}
166
167impl Client {
168 /// Creates a new client with the given configuration.
169 ///
170 /// Configuration must include the `url` and `api_token` fields.
171 /// # Example
172 /// ```
173 /// use hypersync_client::{Client, ClientConfig};
174 ///
175 /// let config = ClientConfig {
176 /// url: "https://eth.hypersync.xyz".to_string(),
177 /// api_token: std::env::var("ENVIO_API_TOKEN")?,
178 /// ..Default::default()
179 /// };
180 /// let client = Client::new(config)?;
181 /// # Ok::<(), anyhow::Error>(())
182 /// ```
183 ///
184 /// # Errors
185 /// This method fails if the config is invalid.
186 pub fn new(cfg: ClientConfig) -> Result<Self> {
187 // hscr stands for hypersync client rust
188 cfg.validate().context("invalid ClientConfig")?;
189 let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
190 Self::new_internal(cfg, user_agent)
191 }
192
193 /// Creates a new client builder.
194 ///
195 /// # Example
196 /// ```
197 /// use hypersync_client::Client;
198 ///
199 /// let client = Client::builder()
200 /// .chain_id(1)
201 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
202 /// .build()
203 /// .unwrap();
204 /// ```
205 pub fn builder() -> ClientBuilder {
206 ClientBuilder::new()
207 }
208
209 #[doc(hidden)]
210 pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
211 // Creates a new client with the given configuration and custom user agent.
212 // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
213 Self::new_internal(cfg, user_agent.into())
214 }
215
216 /// Internal constructor that takes both config and user agent.
217 fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
218 let http_client = HttpClientWrapper {
219 client: reqwest::Client::builder()
220 .no_gzip()
221 .user_agent(user_agent)
222 .build()
223 .unwrap(),
224 api_token: cfg.api_token,
225 timeout: Duration::from_millis(cfg.http_req_timeout_millis),
226 };
227
228 let url = Url::parse(&cfg.url).context("url is malformed")?;
229
230 Ok(Self {
231 inner: Arc::new(ClientInner {
232 http_client,
233 url,
234 max_num_retries: cfg.max_num_retries,
235 retry_backoff_ms: cfg.retry_backoff_ms,
236 retry_base_ms: cfg.retry_base_ms,
237 retry_ceiling_ms: cfg.retry_ceiling_ms,
238 serialization_format: cfg.serialization_format,
239 }),
240 })
241 }
242
243 /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
244 /// query and stream configuration.
245 ///
246 /// ### Implementation
247 /// Runs multiple queries simultaneously based on config.concurrency.
248 ///
249 /// Each query runs until it reaches query.to, server height, any max_num_* query param,
250 /// or execution timed out by server.
251 ///
252 /// # ⚠️ Important Warning
253 ///
254 /// This method will continue executing until the query has run to completion from beginning
255 /// to the end of the block range defined in the query. For heavy queries with large block
256 /// ranges or high data volumes, consider:
257 ///
258 /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
259 /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
260 /// - Break large queries into smaller block ranges
261 ///
262 /// # Example
263 /// ```no_run
264 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
265 ///
266 /// # async fn example() -> anyhow::Result<()> {
267 /// let client = Client::builder()
268 /// .chain_id(1)
269 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
270 /// .build()?;
271 ///
272 /// // Query ERC20 transfer events
273 /// let query = Query::new()
274 /// .from_block(19000000)
275 /// .to_block_excl(19000010)
276 /// .where_logs(
277 /// LogFilter::all()
278 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
279 /// )
280 /// .select_log_fields([LogField::Address, LogField::Data]);
281 /// let response = client.collect(query, StreamConfig::default()).await?;
282 ///
283 /// println!("Collected {} events", response.data.logs.len());
284 /// # Ok(())
285 /// # }
286 /// ```
287 pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
288 check_simple_stream_params(&config)?;
289
290 let mut recv = stream::stream_arrow(self, query, config)
291 .await
292 .context("start stream")?;
293
294 let mut data = ResponseData::default();
295 let mut archive_height = None;
296 let mut next_block = 0;
297 let mut total_execution_time = 0;
298
299 while let Some(res) = recv.recv().await {
300 let res = res.context("get response")?;
301 let res: QueryResponse =
302 QueryResponse::try_from(&res).context("convert arrow response")?;
303
304 for batch in res.data.blocks {
305 data.blocks.push(batch);
306 }
307 for batch in res.data.transactions {
308 data.transactions.push(batch);
309 }
310 for batch in res.data.logs {
311 data.logs.push(batch);
312 }
313 for batch in res.data.traces {
314 data.traces.push(batch);
315 }
316
317 archive_height = res.archive_height;
318 next_block = res.next_block;
319 total_execution_time += res.total_execution_time
320 }
321
322 Ok(QueryResponse {
323 archive_height,
324 next_block,
325 total_execution_time,
326 data,
327 rollback_guard: None,
328 })
329 }
330
331 /// Retrieves events through a stream using the provided query and stream configuration.
332 ///
333 /// # ⚠️ Important Warning
334 ///
335 /// This method will continue executing until the query has run to completion from beginning
336 /// to the end of the block range defined in the query. For heavy queries with large block
337 /// ranges or high data volumes, consider:
338 ///
339 /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
340 /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
341 /// - Break large queries into smaller block ranges
342 ///
343 /// # Example
344 /// ```no_run
345 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
346 ///
347 /// # async fn example() -> anyhow::Result<()> {
348 /// let client = Client::builder()
349 /// .chain_id(1)
350 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
351 /// .build()?;
352 ///
353 /// // Query transactions to a specific address
354 /// let query = Query::new()
355 /// .from_block(19000000)
356 /// .to_block_excl(19000100)
357 /// .where_transactions(
358 /// TransactionFilter::all()
359 /// .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
360 /// )
361 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
362 /// let response = client.collect_events(query, StreamConfig::default()).await?;
363 ///
364 /// println!("Collected {} events", response.data.len());
365 /// # Ok(())
366 /// # }
367 /// ```
368 pub async fn collect_events(
369 &self,
370 mut query: Query,
371 config: StreamConfig,
372 ) -> Result<EventResponse> {
373 check_simple_stream_params(&config)?;
374
375 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
376 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
377
378 let mut recv = stream::stream_arrow(self, query, config)
379 .await
380 .context("start stream")?;
381
382 let mut data = Vec::new();
383 let mut archive_height = None;
384 let mut next_block = 0;
385 let mut total_execution_time = 0;
386
387 while let Some(res) = recv.recv().await {
388 let res = res.context("get response")?;
389 let res: QueryResponse =
390 QueryResponse::try_from(&res).context("convert arrow response")?;
391 let events = event_join_strategy.join_from_response_data(res.data);
392
393 data.extend(events);
394
395 archive_height = res.archive_height;
396 next_block = res.next_block;
397 total_execution_time += res.total_execution_time
398 }
399
400 Ok(EventResponse {
401 archive_height,
402 next_block,
403 total_execution_time,
404 data,
405 rollback_guard: None,
406 })
407 }
408
409 /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
410 /// the provided query and stream configuration.
411 ///
412 /// Returns data in Apache Arrow format for high-performance columnar processing.
413 /// Useful for analytics workloads or when working with Arrow-compatible tools.
414 ///
415 /// # ⚠️ Important Warning
416 ///
417 /// This method will continue executing until the query has run to completion from beginning
418 /// to the end of the block range defined in the query. For heavy queries with large block
419 /// ranges or high data volumes, consider:
420 ///
421 /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
422 /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
423 /// - Break large queries into smaller block ranges
424 ///
425 /// # Example
426 /// ```no_run
427 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
428 ///
429 /// # async fn example() -> anyhow::Result<()> {
430 /// let client = Client::builder()
431 /// .chain_id(1)
432 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
433 /// .build()?;
434 ///
435 /// // Get block data in Arrow format for analytics
436 /// let query = Query::new()
437 /// .from_block(19000000)
438 /// .to_block_excl(19000100)
439 /// .include_all_blocks()
440 /// .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
441 /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
442 ///
443 /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
444 /// # Ok(())
445 /// # }
446 /// ```
447 pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
448 let mut recv = stream::stream_arrow(self, query, config)
449 .await
450 .context("start stream")?;
451
452 let mut data = ArrowResponseData::default();
453 let mut archive_height = None;
454 let mut next_block = 0;
455 let mut total_execution_time = 0;
456
457 while let Some(res) = recv.recv().await {
458 let res = res.context("get response")?;
459
460 for batch in res.data.blocks {
461 data.blocks.push(batch);
462 }
463 for batch in res.data.transactions {
464 data.transactions.push(batch);
465 }
466 for batch in res.data.logs {
467 data.logs.push(batch);
468 }
469 for batch in res.data.traces {
470 data.traces.push(batch);
471 }
472 for batch in res.data.decoded_logs {
473 data.decoded_logs.push(batch);
474 }
475
476 archive_height = res.archive_height;
477 next_block = res.next_block;
478 total_execution_time += res.total_execution_time
479 }
480
481 Ok(ArrowResponse {
482 archive_height,
483 next_block,
484 total_execution_time,
485 data,
486 rollback_guard: None,
487 })
488 }
489
490 /// Writes parquet file getting data through a stream using the provided path, query,
491 /// and stream configuration.
492 ///
493 /// Streams data directly to a Parquet file for efficient storage and later analysis.
494 /// Perfect for data exports or ETL pipelines.
495 ///
496 /// # ⚠️ Important Warning
497 ///
498 /// This method will continue executing until the query has run to completion from beginning
499 /// to the end of the block range defined in the query. For heavy queries with large block
500 /// ranges or high data volumes, consider:
501 ///
502 /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
503 /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
504 /// - Break large queries into smaller block ranges
505 ///
506 /// # Example
507 /// ```no_run
508 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
509 ///
510 /// # async fn example() -> anyhow::Result<()> {
511 /// let client = Client::builder()
512 /// .chain_id(1)
513 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
514 /// .build()?;
515 ///
516 /// // Export all DEX trades to Parquet for analysis
517 /// let query = Query::new()
518 /// .from_block(19000000)
519 /// .to_block_excl(19010000)
520 /// .where_logs(
521 /// LogFilter::all()
522 /// .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
523 /// )
524 /// .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
525 /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
526 ///
527 /// println!("Trade data exported to trades.parquet");
528 /// # Ok(())
529 /// # }
530 /// ```
531 pub async fn collect_parquet(
532 &self,
533 path: &str,
534 query: Query,
535 config: StreamConfig,
536 ) -> Result<()> {
537 parquet_out::collect_parquet(self, path, query, config).await
538 }
539
540 /// Internal implementation of getting chain_id from server
541 async fn get_chain_id_impl(&self) -> Result<u64> {
542 let mut url = self.inner.url.clone();
543 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
544 segments.push("chain_id");
545 std::mem::drop(segments);
546 let req = self.inner.http_client.request(Method::GET, url);
547
548 let res = req.send().await.context("execute http req")?;
549
550 let status = res.status();
551 if !status.is_success() {
552 return Err(anyhow!("http response status code {}", status));
553 }
554
555 let chain_id: ChainId = res.json().await.context("read response body json")?;
556
557 Ok(chain_id.chain_id)
558 }
559
560 /// Internal implementation of getting height from server
561 async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
562 let mut url = self.inner.url.clone();
563 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
564 segments.push("height");
565 std::mem::drop(segments);
566 let mut req = self.inner.http_client.request(Method::GET, url);
567 if let Some(http_timeout_override) = http_timeout_override {
568 req = req.timeout(http_timeout_override);
569 }
570
571 let res = req.send().await.context("execute http req")?;
572
573 let status = res.status();
574 if !status.is_success() {
575 return Err(anyhow!("http response status code {}", status));
576 }
577
578 let height: ArchiveHeight = res.json().await.context("read response body json")?;
579
580 Ok(height.height.unwrap_or(0))
581 }
582
583 /// Get the chain_id from the server with retries.
584 ///
585 /// # Example
586 /// ```no_run
587 /// use hypersync_client::Client;
588 ///
589 /// # async fn example() -> anyhow::Result<()> {
590 /// let client = Client::builder()
591 /// .chain_id(1)
592 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
593 /// .build()?;
594 ///
595 /// let chain_id = client.get_chain_id().await?;
596 /// println!("Connected to chain ID: {}", chain_id);
597 /// # Ok(())
598 /// # }
599 /// ```
600 pub async fn get_chain_id(&self) -> Result<u64> {
601 let mut base = self.inner.retry_base_ms;
602
603 let mut err = anyhow!("");
604
605 for _ in 0..self.inner.max_num_retries + 1 {
606 match self.get_chain_id_impl().await {
607 Ok(res) => return Ok(res),
608 Err(e) => {
609 log::error!(
610 "failed to get chain_id from server, retrying... The error was: {e:?}"
611 );
612 err = err.context(format!("{e:?}"));
613 }
614 }
615
616 let base_ms = Duration::from_millis(base);
617 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
618 rand::random(),
619 self.inner.retry_backoff_ms,
620 ));
621
622 tokio::time::sleep(base_ms + jitter).await;
623
624 base = std::cmp::min(
625 base + self.inner.retry_backoff_ms,
626 self.inner.retry_ceiling_ms,
627 );
628 }
629
630 Err(err)
631 }
632
633 /// Get the height of from server with retries.
634 ///
635 /// # Example
636 /// ```no_run
637 /// use hypersync_client::Client;
638 ///
639 /// # async fn example() -> anyhow::Result<()> {
640 /// let client = Client::builder()
641 /// .chain_id(1)
642 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
643 /// .build()?;
644 ///
645 /// let height = client.get_height().await?;
646 /// println!("Current block height: {}", height);
647 /// # Ok(())
648 /// # }
649 /// ```
650 pub async fn get_height(&self) -> Result<u64> {
651 let mut base = self.inner.retry_base_ms;
652
653 let mut err = anyhow!("");
654
655 for _ in 0..self.inner.max_num_retries + 1 {
656 match self.get_height_impl(None).await {
657 Ok(res) => return Ok(res),
658 Err(e) => {
659 log::error!(
660 "failed to get height from server, retrying... The error was: {e:?}"
661 );
662 err = err.context(format!("{e:?}"));
663 }
664 }
665
666 let base_ms = Duration::from_millis(base);
667 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
668 rand::random(),
669 self.inner.retry_backoff_ms,
670 ));
671
672 tokio::time::sleep(base_ms + jitter).await;
673
674 base = std::cmp::min(
675 base + self.inner.retry_backoff_ms,
676 self.inner.retry_ceiling_ms,
677 );
678 }
679
680 Err(err)
681 }
682
683 /// Get the height of the Client instance for health checks.
684 ///
685 /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
686 ///
687 /// # Example
688 /// ```no_run
689 /// use hypersync_client::Client;
690 /// use std::time::Duration;
691 ///
692 /// # async fn example() -> anyhow::Result<()> {
693 /// let client = Client::builder()
694 /// .chain_id(1)
695 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
696 /// .build()?;
697 ///
698 /// // Quick health check with 5 second timeout
699 /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
700 /// println!("Server is healthy at block: {}", height);
701 /// # Ok(())
702 /// # }
703 /// ```
704 pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
705 self.get_height_impl(http_req_timeout).await
706 }
707
708 /// Executes query with retries and returns the response.
709 ///
710 /// # Example
711 /// ```no_run
712 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
713 ///
714 /// # async fn example() -> anyhow::Result<()> {
715 /// let client = Client::builder()
716 /// .chain_id(1)
717 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
718 /// .build()?;
719 ///
720 /// // Query all blocks from a specific range
721 /// let query = Query::new()
722 /// .from_block(19000000)
723 /// .to_block_excl(19000010)
724 /// .include_all_blocks()
725 /// .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
726 /// let response = client.get(&query).await?;
727 ///
728 /// println!("Retrieved {} blocks", response.data.blocks.len());
729 /// # Ok(())
730 /// # }
731 /// ```
732 pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
733 let arrow_response = self.get_arrow(query).await.context("get data")?;
734 let converted =
735 QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
736 Ok(converted)
737 }
738
739 /// Add block, transaction and log fields selection to the query, executes it with retries
740 /// and returns the response.
741 ///
742 /// This method automatically joins blocks, transactions, and logs into unified events,
743 /// making it easier to work with related blockchain data.
744 ///
745 /// # Example
746 /// ```no_run
747 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
748 ///
749 /// # async fn example() -> anyhow::Result<()> {
750 /// let client = Client::builder()
751 /// .chain_id(1)
752 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
753 /// .build()?;
754 ///
755 /// // Query ERC20 transfers with transaction context
756 /// let query = Query::new()
757 /// .from_block(19000000)
758 /// .to_block_excl(19000010)
759 /// .where_logs(
760 /// LogFilter::all()
761 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
762 /// )
763 /// .select_log_fields([LogField::Address, LogField::Data])
764 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
765 /// let response = client.get_events(query).await?;
766 ///
767 /// println!("Retrieved {} joined events", response.data.len());
768 /// # Ok(())
769 /// # }
770 /// ```
771 pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
772 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
773 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
774 let arrow_response = self.get_arrow(&query).await.context("get data")?;
775 EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
776 }
777
778 /// Executes query once and returns the result in (Arrow, size) format using JSON serialization.
779 async fn get_arrow_impl_json(
780 &self,
781 query: &Query,
782 ) -> std::result::Result<(ArrowResponse, u64), HyperSyncResponseError> {
783 let mut url = self.inner.url.clone();
784 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
785 segments.push("query");
786 segments.push("arrow-ipc");
787 std::mem::drop(segments);
788 let req = self.inner.http_client.request(Method::POST, url);
789
790 let res = req.json(&query).send().await.context("execute http req")?;
791
792 let status = res.status();
793 if status == StatusCode::PAYLOAD_TOO_LARGE {
794 return Err(HyperSyncResponseError::PayloadTooLarge);
795 }
796 if !status.is_success() {
797 let text = res.text().await.context("read text to see error")?;
798
799 return Err(HyperSyncResponseError::Other(anyhow!(
800 "http response status code {}, err body: {}",
801 status,
802 text
803 )));
804 }
805
806 let bytes = res.bytes().await.context("read response body bytes")?;
807
808 let res = tokio::task::block_in_place(|| {
809 parse_query_response(&bytes).context("parse query response")
810 })?;
811
812 Ok((res, bytes.len().try_into().unwrap()))
813 }
814
815 fn should_cache_queries(&self) -> bool {
816 matches!(
817 self.inner.serialization_format,
818 SerializationFormat::CapnProto {
819 should_cache_queries: true
820 }
821 )
822 }
823
824 /// Executes query once and returns the result in (Arrow, size) format using Cap'n Proto serialization.
825 async fn get_arrow_impl_capnp(
826 &self,
827 query: &Query,
828 ) -> std::result::Result<(ArrowResponse, u64), HyperSyncResponseError> {
829 let mut url = self.inner.url.clone();
830 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
831 segments.push("query");
832 segments.push("arrow-ipc");
833 segments.push("capnp");
834 std::mem::drop(segments);
835
836 let should_cache = self.should_cache_queries();
837
838 if should_cache {
839 let query_with_id = {
840 let mut message = capnp::message::Builder::new_default();
841 let mut request_builder =
842 message.init_root::<hypersync_net_types_capnp::request::Builder>();
843
844 request_builder
845 .build_query_id_from_query(query)
846 .context("build query id")?;
847 let mut query_with_id = Vec::new();
848 capnp::serialize_packed::write_message(&mut query_with_id, &message)
849 .context("write capnp message")?;
850 query_with_id
851 };
852
853 let mut req = self.inner.http_client.request(Method::POST, url.clone());
854 req = req.header("content-type", "application/x-capnp");
855
856 let res = req
857 .body(query_with_id)
858 .send()
859 .await
860 .context("execute http req")?;
861
862 let status = res.status();
863 if status == StatusCode::PAYLOAD_TOO_LARGE {
864 return Err(HyperSyncResponseError::PayloadTooLarge);
865 }
866 if status.is_success() {
867 let bytes = res.bytes().await.context("read response body bytes")?;
868
869 let mut opts = capnp::message::ReaderOptions::new();
870 opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
871 let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
872 .context("create message reader")?;
873 let query_response = message_reader
874 .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
875 .context("get cached_query_response root")?;
876 match query_response.get_either().which().context("get either")? {
877 hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
878 query_response,
879 ) => {
880 let res = tokio::task::block_in_place(|| {
881 let res = query_response?;
882 read_query_response(&res).context("parse query response cached")
883 })?;
884 return Ok((res, bytes.len().try_into().unwrap()));
885 }
886 hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
887 log::trace!("query was not cached, retrying with full query");
888 }
889 }
890 } else {
891 let text = res.text().await.context("read text to see error")?;
892 log::warn!(
893 "Failed cache query, will retry full query. {}, err body: {}",
894 status,
895 text
896 );
897 }
898 };
899
900 let full_query_bytes = {
901 let mut message = capnp::message::Builder::new_default();
902 let mut query_builder =
903 message.init_root::<hypersync_net_types_capnp::request::Builder>();
904
905 query_builder
906 .build_full_query_from_query(query, should_cache)
907 .context("build full query")?;
908 let mut bytes = Vec::new();
909 capnp::serialize_packed::write_message(&mut bytes, &message)
910 .context("write full query capnp message")?;
911 bytes
912 };
913
914 let mut req = self.inner.http_client.request(Method::POST, url);
915 req = req.header("content-type", "application/x-capnp");
916
917 let res = req
918 .header("content-type", "application/x-capnp")
919 .body(full_query_bytes)
920 .send()
921 .await
922 .context("execute http req")?;
923
924 let status = res.status();
925 if status == StatusCode::PAYLOAD_TOO_LARGE {
926 return Err(HyperSyncResponseError::PayloadTooLarge);
927 }
928 if !status.is_success() {
929 let text = res.text().await.context("read text to see error")?;
930
931 return Err(HyperSyncResponseError::Other(anyhow!(
932 "http response status code {}, err body: {}",
933 status,
934 text
935 )));
936 }
937
938 let bytes = res.bytes().await.context("read response body bytes")?;
939
940 let res = tokio::task::block_in_place(|| {
941 parse_query_response(&bytes).context("parse query response")
942 })?;
943
944 Ok((res, bytes.len().try_into().unwrap()))
945 }
946
947 /// Executes query once and returns the result in (Arrow, size) format.
948 async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
949 let mut query = query.clone();
950 loop {
951 let res = match self.inner.serialization_format {
952 SerializationFormat::Json => self.get_arrow_impl_json(&query).await,
953 SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(&query).await,
954 };
955 match res {
956 Ok(res) => return Ok(res),
957 Err(HyperSyncResponseError::Other(e)) => return Err(e),
958 Err(HyperSyncResponseError::PayloadTooLarge) => {
959 let block_range = if let Some(to_block) = query.to_block {
960 let current = to_block - query.from_block;
961 if current < 2 {
962 anyhow::bail!(
963 "Payload is too large and query is using the minimum block range."
964 )
965 }
966 // Half the current block range
967 current / 2
968 } else {
969 200
970 };
971 let to_block = query.from_block + block_range;
972 query.to_block = Some(to_block);
973
974 log::trace!(
975 "Payload is too large, retrying with block range from: {} to: {}",
976 query.from_block,
977 to_block
978 );
979 }
980 }
981 }
982 }
983
984 /// Executes query with retries and returns the response in Arrow format.
985 pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
986 self.get_arrow_with_size(query).await.map(|res| res.0)
987 }
988
989 /// Internal implementation for get_arrow.
990 async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
991 let mut base = self.inner.retry_base_ms;
992
993 let mut err = anyhow!("");
994
995 for _ in 0..self.inner.max_num_retries + 1 {
996 match self.get_arrow_impl(query).await {
997 Ok(res) => return Ok(res),
998 Err(e) => {
999 log::error!(
1000 "failed to get arrow data from server, retrying... The error was: {e:?}"
1001 );
1002 err = err.context(format!("{e:?}"));
1003 }
1004 }
1005
1006 let base_ms = Duration::from_millis(base);
1007 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
1008 rand::random(),
1009 self.inner.retry_backoff_ms,
1010 ));
1011
1012 tokio::time::sleep(base_ms + jitter).await;
1013
1014 base = std::cmp::min(
1015 base + self.inner.retry_backoff_ms,
1016 self.inner.retry_ceiling_ms,
1017 );
1018 }
1019
1020 Err(err)
1021 }
1022
1023 /// Spawns task to execute query and return data via a channel.
1024 ///
1025 /// # Example
1026 /// ```no_run
1027 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
1028 ///
1029 /// # async fn example() -> anyhow::Result<()> {
1030 /// let client = Client::builder()
1031 /// .chain_id(1)
1032 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1033 /// .build()?;
1034 ///
1035 /// // Stream all ERC20 transfer events
1036 /// let query = Query::new()
1037 /// .from_block(19000000)
1038 /// .where_logs(
1039 /// LogFilter::all()
1040 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1041 /// )
1042 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
1043 /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
1044 ///
1045 /// while let Some(response) = receiver.recv().await {
1046 /// let response = response?;
1047 /// println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
1048 /// }
1049 /// # Ok(())
1050 /// # }
1051 /// ```
1052 pub async fn stream(
1053 &self,
1054 query: Query,
1055 config: StreamConfig,
1056 ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1057 check_simple_stream_params(&config)?;
1058
1059 let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1060 mpsc::channel(config.concurrency);
1061
1062 let mut inner_rx = self
1063 .stream_arrow(query, config)
1064 .await
1065 .context("start inner stream")?;
1066
1067 tokio::spawn(async move {
1068 while let Some(resp) = inner_rx.recv().await {
1069 let msg = resp
1070 .context("inner receiver")
1071 .and_then(|r| QueryResponse::try_from(&r));
1072 let is_err = msg.is_err();
1073 if tx.send(msg).await.is_err() || is_err {
1074 return;
1075 }
1076 }
1077 });
1078
1079 Ok(rx)
1080 }
1081
1082 /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1083 /// returning data via a channel.
1084 ///
1085 /// This method automatically joins blocks, transactions, and logs into unified events,
1086 /// then streams them via a channel for real-time processing.
1087 ///
1088 /// # Example
1089 /// ```no_run
1090 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1091 ///
1092 /// # async fn example() -> anyhow::Result<()> {
1093 /// let client = Client::builder()
1094 /// .chain_id(1)
1095 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1096 /// .build()?;
1097 ///
1098 /// // Stream NFT transfer events with transaction context
1099 /// let query = Query::new()
1100 /// .from_block(19000000)
1101 /// .where_logs(
1102 /// LogFilter::all()
1103 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1104 /// )
1105 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1106 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1107 /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1108 ///
1109 /// while let Some(response) = receiver.recv().await {
1110 /// let response = response?;
1111 /// println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1112 /// }
1113 /// # Ok(())
1114 /// # }
1115 /// ```
1116 pub async fn stream_events(
1117 &self,
1118 mut query: Query,
1119 config: StreamConfig,
1120 ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1121 check_simple_stream_params(&config)?;
1122
1123 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1124
1125 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1126
1127 let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1128 mpsc::channel(config.concurrency);
1129
1130 let mut inner_rx = self
1131 .stream_arrow(query, config)
1132 .await
1133 .context("start inner stream")?;
1134
1135 tokio::spawn(async move {
1136 while let Some(resp) = inner_rx.recv().await {
1137 let msg = resp
1138 .context("inner receiver")
1139 .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1140 let is_err = msg.is_err();
1141 if tx.send(msg).await.is_err() || is_err {
1142 return;
1143 }
1144 }
1145 });
1146
1147 Ok(rx)
1148 }
1149
1150 /// Spawns task to execute query and return data via a channel in Arrow format.
1151 ///
1152 /// Returns raw Apache Arrow data via a channel for high-performance processing.
1153 /// Ideal for applications that need to work directly with columnar data.
1154 ///
1155 /// # Example
1156 /// ```no_run
1157 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1158 ///
1159 /// # async fn example() -> anyhow::Result<()> {
1160 /// let client = Client::builder()
1161 /// .chain_id(1)
1162 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1163 /// .build()?;
1164 ///
1165 /// // Stream transaction data in Arrow format for analytics
1166 /// let query = Query::new()
1167 /// .from_block(19000000)
1168 /// .to_block_excl(19000100)
1169 /// .where_transactions(
1170 /// TransactionFilter::all()
1171 /// .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1172 /// )
1173 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1174 /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1175 ///
1176 /// while let Some(response) = receiver.recv().await {
1177 /// let response = response?;
1178 /// println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1179 /// }
1180 /// # Ok(())
1181 /// # }
1182 /// ```
1183 pub async fn stream_arrow(
1184 &self,
1185 query: Query,
1186 config: StreamConfig,
1187 ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1188 stream::stream_arrow(self, query, config).await
1189 }
1190
1191 /// Getter for url field.
1192 ///
1193 /// # Example
1194 /// ```
1195 /// use hypersync_client::Client;
1196 ///
1197 /// let client = Client::builder()
1198 /// .chain_id(1)
1199 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1200 /// .build()
1201 /// .unwrap();
1202 ///
1203 /// println!("Client URL: {}", client.url());
1204 /// ```
1205 pub fn url(&self) -> &Url {
1206 &self.inner.url
1207 }
1208}
1209
1210/// Builder for creating a hypersync client with configuration options.
1211///
1212/// This builder provides a fluent API for configuring client settings like URL,
1213/// authentication, timeouts, and retry behavior.
1214///
1215/// # Example
1216/// ```
1217/// use hypersync_client::{Client, SerializationFormat};
1218///
1219/// let client = Client::builder()
1220/// .chain_id(1)
1221/// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1222/// .http_req_timeout_millis(30000)
1223/// .max_num_retries(3)
1224/// .build()
1225/// .unwrap();
1226/// ```
1227#[derive(Debug, Clone, Default)]
1228pub struct ClientBuilder(ClientConfig);
1229
1230impl ClientBuilder {
1231 /// Creates a new ClientBuilder with default configuration.
1232 pub fn new() -> Self {
1233 Self::default()
1234 }
1235
1236 /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1237 ///
1238 /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1239 ///
1240 /// # Arguments
1241 /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1242 ///
1243 /// # Example
1244 /// ```
1245 /// use hypersync_client::Client;
1246 ///
1247 /// let client = Client::builder()
1248 /// .chain_id(1) // Ethereum mainnet
1249 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1250 /// .build()
1251 /// .unwrap();
1252 /// ```
1253 pub fn chain_id(mut self, chain_id: u64) -> Self {
1254 self.0.url = format!("https://{chain_id}.hypersync.xyz");
1255 self
1256 }
1257
1258 /// Sets a custom URL for the hypersync server.
1259 ///
1260 /// Use this method when you need to connect to a custom hypersync endpoint
1261 /// instead of the default public endpoints.
1262 ///
1263 /// # Arguments
1264 /// * `url` - The hypersync server URL
1265 ///
1266 /// # Example
1267 /// ```
1268 /// use hypersync_client::Client;
1269 ///
1270 /// let client = Client::builder()
1271 /// .url("https://my-custom-hypersync.example.com")
1272 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1273 /// .build()
1274 /// .unwrap();
1275 /// ```
1276 pub fn url<S: ToString>(mut self, url: S) -> Self {
1277 self.0.url = url.to_string();
1278 self
1279 }
1280
1281 /// Sets the api token for authentication.
1282 ///
1283 /// Required for accessing authenticated hypersync endpoints.
1284 ///
1285 /// # Arguments
1286 /// * `api_token` - The authentication token
1287 ///
1288 /// # Example
1289 /// ```
1290 /// use hypersync_client::Client;
1291 ///
1292 /// let client = Client::builder()
1293 /// .chain_id(1)
1294 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1295 /// .build()
1296 /// .unwrap();
1297 /// ```
1298 pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1299 self.0.api_token = api_token.to_string();
1300 self
1301 }
1302
1303 /// Sets the HTTP request timeout in milliseconds.
1304 ///
1305 /// # Arguments
1306 /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1307 ///
1308 /// # Example
1309 /// ```
1310 /// use hypersync_client::Client;
1311 ///
1312 /// let client = Client::builder()
1313 /// .chain_id(1)
1314 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1315 /// .http_req_timeout_millis(60000) // 60 second timeout
1316 /// .build()
1317 /// .unwrap();
1318 /// ```
1319 pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1320 self.0.http_req_timeout_millis = http_req_timeout_millis;
1321 self
1322 }
1323
1324 /// Sets the maximum number of retries for failed requests.
1325 ///
1326 /// # Arguments
1327 /// * `max_num_retries` - Maximum number of retries (default: 10)
1328 ///
1329 /// # Example
1330 /// ```
1331 /// use hypersync_client::Client;
1332 ///
1333 /// let client = Client::builder()
1334 /// .chain_id(1)
1335 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1336 /// .max_num_retries(5)
1337 /// .build()
1338 /// .unwrap();
1339 /// ```
1340 pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1341 self.0.max_num_retries = max_num_retries;
1342 self
1343 }
1344
1345 /// Sets the backoff increment for retry delays.
1346 ///
1347 /// This value is added to the base delay on each retry attempt.
1348 ///
1349 /// # Arguments
1350 /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1351 ///
1352 /// # Example
1353 /// ```
1354 /// use hypersync_client::Client;
1355 ///
1356 /// let client = Client::builder()
1357 /// .chain_id(1)
1358 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1359 /// .retry_backoff_ms(1000) // 1 second backoff increment
1360 /// .build()
1361 /// .unwrap();
1362 /// ```
1363 pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1364 self.0.retry_backoff_ms = retry_backoff_ms;
1365 self
1366 }
1367
1368 /// Sets the initial delay for retry attempts.
1369 ///
1370 /// # Arguments
1371 /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1372 ///
1373 /// # Example
1374 /// ```
1375 /// use hypersync_client::Client;
1376 ///
1377 /// let client = Client::builder()
1378 /// .chain_id(1)
1379 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1380 /// .retry_base_ms(1000) // Start with 1 second delay
1381 /// .build()
1382 /// .unwrap();
1383 /// ```
1384 pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1385 self.0.retry_base_ms = retry_base_ms;
1386 self
1387 }
1388
1389 /// Sets the maximum delay for retry attempts.
1390 ///
1391 /// The retry delay will not exceed this value, even with backoff increments.
1392 ///
1393 /// # Arguments
1394 /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1395 ///
1396 /// # Example
1397 /// ```
1398 /// use hypersync_client::Client;
1399 ///
1400 /// let client = Client::builder()
1401 /// .chain_id(1)
1402 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1403 /// .retry_ceiling_ms(30000) // Cap at 30 seconds
1404 /// .build()
1405 /// .unwrap();
1406 /// ```
1407 pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1408 self.0.retry_ceiling_ms = retry_ceiling_ms;
1409 self
1410 }
1411
1412 /// Sets the serialization format for client-server communication.
1413 ///
1414 /// # Arguments
1415 /// * `serialization_format` - The format to use (JSON or CapnProto)
1416 ///
1417 /// # Example
1418 /// ```
1419 /// use hypersync_client::{Client, SerializationFormat};
1420 ///
1421 /// let client = Client::builder()
1422 /// .chain_id(1)
1423 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1424 /// .serialization_format(SerializationFormat::Json)
1425 /// .build()
1426 /// .unwrap();
1427 /// ```
1428 pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1429 self.0.serialization_format = serialization_format;
1430 self
1431 }
1432
1433 /// Builds the client with the configured settings.
1434 ///
1435 /// # Returns
1436 /// * `Result<Client>` - The configured client or an error if configuration is invalid
1437 ///
1438 /// # Errors
1439 /// Returns an error if:
1440 /// * The URL is malformed
1441 /// * Required configuration is missing
1442 ///
1443 /// # Example
1444 /// ```
1445 /// use hypersync_client::Client;
1446 ///
1447 /// let client = Client::builder()
1448 /// .chain_id(1)
1449 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1450 /// .build()
1451 /// .unwrap();
1452 /// ```
1453 pub fn build(self) -> Result<Client> {
1454 if self.0.url.is_empty() {
1455 anyhow::bail!(
1456 "endpoint needs to be set, try using builder.chain_id(1) or\
1457 builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1458 )
1459 }
1460 Client::new(self.0)
1461 }
1462}
1463
1464/// 200ms
1465const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1466const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1467/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1468/// so we timeout after 15s (3x the ping interval).
1469const READ_TIMEOUT: Duration = Duration::from_secs(15);
1470
1471/// Events emitted by the height stream.
1472#[derive(Debug, Clone, PartialEq, Eq)]
1473pub enum HeightStreamEvent {
1474 /// Successfully connected or reconnected to the SSE stream.
1475 Connected,
1476 /// Received a height update from the server.
1477 Height(u64),
1478 /// Connection lost, will attempt to reconnect after the specified delay.
1479 Reconnecting {
1480 /// Duration to wait before attempting reconnection.
1481 delay: Duration,
1482 /// The error that caused the reconnection.
1483 error_msg: String,
1484 },
1485}
1486
1487enum InternalStreamEvent {
1488 Publish(HeightStreamEvent),
1489 Ping,
1490 Unknown(String),
1491}
1492
1493impl Client {
1494 fn get_es_stream(&self) -> Result<EventSource> {
1495 // Build the GET /height/sse request
1496 let mut url = self.inner.url.clone();
1497 url.path_segments_mut()
1498 .ok()
1499 .context("invalid base URL")?
1500 .extend(&["height", "sse"]);
1501
1502 let req = self
1503 .inner
1504 .http_client
1505 // Don't set timeout for SSE stream
1506 .request_no_timeout(Method::GET, url);
1507
1508 // Configure exponential backoff for library-level retries
1509 let retry_policy = ExponentialBackoff::new(
1510 INITIAL_RECONNECT_DELAY,
1511 2.0,
1512 Some(MAX_RECONNECT_DELAY),
1513 None, // unlimited retries
1514 );
1515
1516 // Turn the request into an EventSource stream with retries
1517 let mut es = reqwest_eventsource::EventSource::new(req)
1518 .context("unexpected error creating EventSource")?;
1519 es.set_retry_policy(Box::new(retry_policy));
1520 Ok(es)
1521 }
1522
1523 async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1524 let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1525 .await
1526 .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1527 else {
1528 return Ok(None);
1529 };
1530
1531 let e = match res.context("failed response")? {
1532 Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1533 Event::Message(event) => match event.event.as_str() {
1534 "height" => {
1535 let height = event
1536 .data
1537 .trim()
1538 .parse::<u64>()
1539 .context("parsing height from event data")?;
1540 InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1541 }
1542 "ping" => InternalStreamEvent::Ping,
1543 _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1544 },
1545 };
1546
1547 Ok(Some(e))
1548 }
1549
1550 async fn stream_height_events(
1551 es: &mut EventSource,
1552 tx: &mpsc::Sender<HeightStreamEvent>,
1553 ) -> Result<bool> {
1554 let mut received_an_event = false;
1555 while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1556 match event {
1557 InternalStreamEvent::Publish(event) => {
1558 received_an_event = true;
1559 if tx.send(event).await.is_err() {
1560 return Ok(received_an_event); // Receiver dropped, exit task
1561 }
1562 }
1563 InternalStreamEvent::Ping => (), // ignore pings
1564 InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1565 }
1566 }
1567 Ok(received_an_event)
1568 }
1569
1570 fn get_delay(consecutive_failures: u32) -> Duration {
1571 if consecutive_failures > 0 {
1572 /// helper function to calculate 2^x
1573 /// optimization using bit shifting
1574 const fn two_to_pow(x: u32) -> u32 {
1575 1 << x
1576 }
1577 // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1578 INITIAL_RECONNECT_DELAY
1579 .saturating_mul(two_to_pow(consecutive_failures - 1))
1580 .min(MAX_RECONNECT_DELAY)
1581 } else {
1582 // On zero consecutive failures, 0 delay
1583 Duration::from_millis(0)
1584 }
1585 }
1586
1587 async fn stream_height_events_with_retry(
1588 &self,
1589 tx: &mpsc::Sender<HeightStreamEvent>,
1590 ) -> Result<()> {
1591 let mut consecutive_failures = 0u32;
1592
1593 loop {
1594 // should always be able to creat a new es stream
1595 // something is wrong with the req builder otherwise
1596 let mut es = self.get_es_stream().context("get es stream")?;
1597
1598 let mut error = anyhow!("");
1599
1600 match Self::stream_height_events(&mut es, tx).await {
1601 Ok(received_an_event) => {
1602 if received_an_event {
1603 consecutive_failures = 0; // Reset after successful connection that then failed
1604 }
1605 log::trace!("Stream height exited");
1606 }
1607 Err(e) => {
1608 log::trace!("Stream height failed: {e:?}");
1609 error = e;
1610 }
1611 }
1612
1613 es.close();
1614
1615 // If the receiver is closed, exit the task
1616 if tx.is_closed() {
1617 break;
1618 }
1619
1620 let delay = Self::get_delay(consecutive_failures);
1621 log::trace!("Reconnecting in {:?}...", delay);
1622
1623 let error_msg = format!("{error:?}");
1624
1625 if tx
1626 .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1627 .await
1628 .is_err()
1629 {
1630 return Ok(()); // Receiver dropped, exit task
1631 }
1632 tokio::time::sleep(delay).await;
1633
1634 // increment consecutive failures so that on the next try
1635 // it will start using back offs
1636 consecutive_failures += 1;
1637 }
1638
1639 Ok(())
1640 }
1641
1642 /// Streams archive height updates from the server via Server-Sent Events.
1643 ///
1644 /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1645 /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1646 ///
1647 /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1648 /// and height updates. This allows applications to display connection status to users.
1649 ///
1650 /// # Returns
1651 /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1652 /// lifecycle and sends events through this channel.
1653 ///
1654 /// # Example
1655 /// ```
1656 /// # use hypersync_client::{Client, HeightStreamEvent};
1657 /// # async fn example() -> anyhow::Result<()> {
1658 /// let client = Client::builder()
1659 /// .url("https://eth.hypersync.xyz")
1660 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1661 /// .build()?;
1662 ///
1663 /// let mut rx = client.stream_height();
1664 ///
1665 /// while let Some(event) = rx.recv().await {
1666 /// match event {
1667 /// HeightStreamEvent::Connected => println!("Connected to stream"),
1668 /// HeightStreamEvent::Height(h) => println!("Height: {}", h),
1669 /// HeightStreamEvent::Reconnecting { delay, error_msg } => {
1670 /// println!("Reconnecting in {delay:?} due to error: {error_msg}")
1671 /// }
1672 /// }
1673 /// }
1674 /// # Ok(())
1675 /// # }
1676 /// ```
1677 pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1678 let (tx, rx) = mpsc::channel(16);
1679 let client = self.clone();
1680
1681 tokio::spawn(async move {
1682 if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1683 log::error!("Stream height failed unexpectedly: {e:?}");
1684 }
1685 });
1686
1687 rx
1688 }
1689}
1690
1691fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1692 if config.event_signature.is_some() {
1693 return Err(anyhow!(
1694 "config.event_signature can't be passed to simple type function. User is expected to \
1695 decode the logs using Decoder."
1696 ));
1697 }
1698 if config.column_mapping.is_some() {
1699 return Err(anyhow!(
1700 "config.column_mapping can't be passed to single type function. User is expected to \
1701 map values manually."
1702 ));
1703 }
1704
1705 Ok(())
1706}
1707
1708/// Used to indicate whether or not a retry should be attempted.
1709#[derive(Debug, thiserror::Error)]
1710pub enum HyperSyncResponseError {
1711 /// Means that the client should retry with a smaller block range.
1712 #[error("hypersync responded with 'payload too large' error")]
1713 PayloadTooLarge,
1714 /// Any other server error.
1715 #[error(transparent)]
1716 Other(#[from] anyhow::Error),
1717}
1718
1719#[cfg(test)]
1720mod tests {
1721 use super::*;
1722 #[test]
1723 fn test_get_delay() {
1724 assert_eq!(
1725 Client::get_delay(0),
1726 Duration::from_millis(0),
1727 "starts with 0 delay"
1728 );
1729 // powers of 2 backoff
1730 assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1731 assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1732 assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1733 // maxes out at 30s
1734 assert_eq!(
1735 Client::get_delay(9),
1736 Duration::from_secs(30),
1737 "max delay is 30s"
1738 );
1739 assert_eq!(
1740 Client::get_delay(10),
1741 Duration::from_secs(30),
1742 "max delay is 30s"
1743 );
1744 }
1745
1746 #[tokio::test]
1747 #[ignore = "integration test with live hs server for height stream"]
1748 async fn test_stream_height_events() -> anyhow::Result<()> {
1749 let (tx, mut rx) = mpsc::channel(16);
1750 let handle = tokio::spawn(async move {
1751 let client = Client::builder()
1752 .url("https://monad-testnet.hypersync.xyz")
1753 .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1754 .build()?;
1755 let mut es = client.get_es_stream().context("get es stream")?;
1756 Client::stream_height_events(&mut es, &tx).await
1757 });
1758
1759 let val = rx.recv().await;
1760 assert!(val.is_some());
1761 assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
1762 let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
1763 panic!("should have received height")
1764 };
1765 let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
1766 panic!("should have received height")
1767 };
1768 assert!(height2 > height);
1769 drop(rx);
1770
1771 let res = handle.await.expect("should have joined");
1772 let received_an_event =
1773 res.expect("should have ended the stream gracefully after dropping rx");
1774 assert!(received_an_event, "should have received an event");
1775
1776 Ok(())
1777 }
1778}