hypersync_client/lib.rs
1#![deny(missing_docs)]
2//! # HyperSync Client
3//!
4//! A high-performance Rust client for the HyperSync protocol, enabling efficient retrieval
5//! of blockchain data including blocks, transactions, logs, and traces.
6//!
7//! ## Features
8//!
9//! - **High-performance streaming**: Parallel data fetching with automatic retries
10//! - **Flexible querying**: Rich query builder API for precise data selection
11//! - **Multiple data formats**: Support for Arrow, Parquet, and simple Rust types
12//! - **Event decoding**: Automatic ABI decoding for smart contract events
13//! - **Real-time updates**: Live height streaming via Server-Sent Events
14//! - **Production ready**: Built-in rate limiting, retries, and error handling
15//!
16//! ## Quick Start
17//!
18//! ```no_run
19//! use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//! // Create a client for Ethereum mainnet
24//! let client = Client::builder()
25//! .chain_id(1)
26//! .api_token(std::env::var("ENVIO_API_TOKEN")?)
27//! .build()?;
28//!
29//! // Query ERC20 transfer events from USDC contract
30//! let query = Query::new()
31//! .from_block(19000000)
32//! .to_block_excl(19001000)
33//! .where_logs(
34//! LogFilter::all()
35//! .and_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
36//! .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
37//! )
38//! .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
39//!
40//! // Get all data in one response
41//! let response = client.get(&query).await?;
42//! println!("Retrieved {} blocks", response.data.blocks.len());
43//!
44//! // Or stream data for large ranges
45//! let mut receiver = client.stream(query, StreamConfig::default()).await?;
46//! while let Some(response) = receiver.recv().await {
47//! let response = response?;
48//! println!("Streaming: got blocks up to {}", response.next_block);
49//! }
50//!
51//! Ok(())
52//! }
53//! ```
54//!
55//! ## Main Types
56//!
57//! - [`Client`] - Main client for interacting with HyperSync servers
58//! - [`net_types::Query`] - Query builder for specifying what data to fetch
59//! - [`StreamConfig`] - Configuration for streaming operations
60//! - [`QueryResponse`] - Response containing blocks, transactions, logs, and traces
61//! - [`ArrowResponse`] - Response in Apache Arrow format for high-performance processing
62//!
63//! ## Authentication
64//!
65//! You'll need a HyperSync API token to access the service. Get one from
66//! [https://envio.dev/app/api-tokens](https://envio.dev/app/api-tokens).
67//!
68//! ## Examples
69//!
70//! See the `examples/` directory for more detailed usage patterns including:
71//! - ERC20 token transfers
72//! - Wallet transaction history
73//! - Event decoding and filtering
74//! - Real-time data streaming
75use std::{sync::Arc, time::Duration};
76
77use anyhow::{anyhow, Context, Result};
78use futures::StreamExt;
79use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
80use reqwest::Method;
81use reqwest_eventsource::retry::ExponentialBackoff;
82use reqwest_eventsource::{Event, EventSource};
83
84pub mod arrow_reader;
85mod column_mapping;
86mod config;
87mod decode;
88mod decode_call;
89mod from_arrow;
90mod parquet_out;
91mod parse_response;
92pub mod preset_query;
93mod rayon_async;
94pub mod simple_types;
95mod stream;
96mod types;
97mod util;
98
99pub use hypersync_format as format;
100pub use hypersync_net_types as net_types;
101pub use hypersync_schema as schema;
102
103use parse_response::parse_query_response;
104use tokio::sync::mpsc;
105use types::{EventResponse, ResponseData};
106use url::Url;
107
108pub use column_mapping::{ColumnMapping, DataType};
109pub use config::HexOutput;
110pub use config::{ClientConfig, SerializationFormat, StreamConfig};
111pub use decode::Decoder;
112pub use decode_call::CallDecoder;
113pub use types::{ArrowResponse, ArrowResponseData, QueryResponse};
114
115use crate::parse_response::read_query_response;
116use crate::simple_types::InternalEventJoinStrategy;
117
118#[derive(Debug)]
119struct HttpClientWrapper {
120 /// Initialized reqwest instance for client url.
121 client: reqwest::Client,
122 /// HyperSync server api token.
123 api_token: String,
124 /// Standard timeout for http requests.
125 timeout: Duration,
126}
127
128impl HttpClientWrapper {
129 fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
130 self.client
131 .request(method, url)
132 .timeout(self.timeout)
133 .bearer_auth(&self.api_token)
134 }
135
136 fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
137 self.client
138 .request(method, url)
139 .bearer_auth(&self.api_token)
140 }
141}
142
143/// Internal client state to handle http requests and retries.
144#[derive(Debug)]
145struct ClientInner {
146 http_client: HttpClientWrapper,
147 /// HyperSync server URL.
148 url: Url,
149 /// Number of retries to attempt before returning error.
150 max_num_retries: usize,
151 /// Milliseconds that would be used for retry backoff increasing.
152 retry_backoff_ms: u64,
153 /// Initial wait time for request backoff.
154 retry_base_ms: u64,
155 /// Ceiling time for request backoff.
156 retry_ceiling_ms: u64,
157 /// Query serialization format to use for HTTP requests.
158 serialization_format: SerializationFormat,
159}
160
161/// Client to handle http requests and retries.
162#[derive(Clone, Debug)]
163pub struct Client {
164 inner: Arc<ClientInner>,
165}
166
167impl Client {
168 /// Creates a new client with the given configuration.
169 ///
170 /// Configuration must include the `url` and `api_token` fields.
171 /// # Example
172 /// ```
173 /// use hypersync_client::{Client, ClientConfig};
174 ///
175 /// let config = ClientConfig {
176 /// url: "https://eth.hypersync.xyz".to_string(),
177 /// api_token: std::env::var("ENVIO_API_TOKEN")?,
178 /// ..Default::default()
179 /// };
180 /// let client = Client::new(config)?;
181 /// # Ok::<(), anyhow::Error>(())
182 /// ```
183 ///
184 /// # Errors
185 /// This method fails if the config is invalid.
186 pub fn new(cfg: ClientConfig) -> Result<Self> {
187 // hscr stands for hypersync client rust
188 cfg.validate().context("invalid ClientConfig")?;
189 let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
190 Self::new_internal(cfg, user_agent)
191 }
192
193 /// Creates a new client builder.
194 ///
195 /// # Example
196 /// ```
197 /// use hypersync_client::Client;
198 ///
199 /// let client = Client::builder()
200 /// .chain_id(1)
201 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
202 /// .build()
203 /// .unwrap();
204 /// ```
205 pub fn builder() -> ClientBuilder {
206 ClientBuilder::new()
207 }
208
209 #[doc(hidden)]
210 pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
211 // Creates a new client with the given configuration and custom user agent.
212 // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
213 Self::new_internal(cfg, user_agent.into())
214 }
215
216 /// Internal constructor that takes both config and user agent.
217 fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
218 let http_client = HttpClientWrapper {
219 client: reqwest::Client::builder()
220 .no_gzip()
221 .user_agent(user_agent)
222 .build()
223 .unwrap(),
224 api_token: cfg.api_token,
225 timeout: Duration::from_millis(cfg.http_req_timeout_millis),
226 };
227
228 let url = Url::parse(&cfg.url).context("url is malformed")?;
229
230 Ok(Self {
231 inner: Arc::new(ClientInner {
232 http_client,
233 url,
234 max_num_retries: cfg.max_num_retries,
235 retry_backoff_ms: cfg.retry_backoff_ms,
236 retry_base_ms: cfg.retry_base_ms,
237 retry_ceiling_ms: cfg.retry_ceiling_ms,
238 serialization_format: cfg.serialization_format,
239 }),
240 })
241 }
242
243 /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
244 /// query and stream configuration.
245 ///
246 /// ### Implementation
247 /// Runs multiple queries simultaneously based on config.concurrency.
248 ///
249 /// Each query runs until it reaches query.to, server height, any max_num_* query param,
250 /// or execution timed out by server.
251 ///
252 /// # ⚠️ Important Warning
253 ///
254 /// This method will continue executing until the query has run to completion from beginning
255 /// to the end of the block range defined in the query. For heavy queries with large block
256 /// ranges or high data volumes, consider:
257 ///
258 /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
259 /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
260 /// - Break large queries into smaller block ranges
261 ///
262 /// # Example
263 /// ```no_run
264 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
265 ///
266 /// # async fn example() -> anyhow::Result<()> {
267 /// let client = Client::builder()
268 /// .chain_id(1)
269 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
270 /// .build()?;
271 ///
272 /// // Query ERC20 transfer events
273 /// let query = Query::new()
274 /// .from_block(19000000)
275 /// .to_block_excl(19000010)
276 /// .where_logs(
277 /// LogFilter::all()
278 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
279 /// )
280 /// .select_log_fields([LogField::Address, LogField::Data]);
281 /// let response = client.collect(query, StreamConfig::default()).await?;
282 ///
283 /// println!("Collected {} events", response.data.logs.len());
284 /// # Ok(())
285 /// # }
286 /// ```
287 pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
288 check_simple_stream_params(&config)?;
289
290 let mut recv = stream::stream_arrow(self, query, config)
291 .await
292 .context("start stream")?;
293
294 let mut data = ResponseData::default();
295 let mut archive_height = None;
296 let mut next_block = 0;
297 let mut total_execution_time = 0;
298
299 while let Some(res) = recv.recv().await {
300 let res = res.context("get response")?;
301 let res: QueryResponse =
302 QueryResponse::try_from(&res).context("convert arrow response")?;
303
304 for batch in res.data.blocks {
305 data.blocks.push(batch);
306 }
307 for batch in res.data.transactions {
308 data.transactions.push(batch);
309 }
310 for batch in res.data.logs {
311 data.logs.push(batch);
312 }
313 for batch in res.data.traces {
314 data.traces.push(batch);
315 }
316
317 archive_height = res.archive_height;
318 next_block = res.next_block;
319 total_execution_time += res.total_execution_time
320 }
321
322 Ok(QueryResponse {
323 archive_height,
324 next_block,
325 total_execution_time,
326 data,
327 rollback_guard: None,
328 })
329 }
330
331 /// Retrieves events through a stream using the provided query and stream configuration.
332 ///
333 /// # ⚠️ Important Warning
334 ///
335 /// This method will continue executing until the query has run to completion from beginning
336 /// to the end of the block range defined in the query. For heavy queries with large block
337 /// ranges or high data volumes, consider:
338 ///
339 /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
340 /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
341 /// - Break large queries into smaller block ranges
342 ///
343 /// # Example
344 /// ```no_run
345 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
346 ///
347 /// # async fn example() -> anyhow::Result<()> {
348 /// let client = Client::builder()
349 /// .chain_id(1)
350 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
351 /// .build()?;
352 ///
353 /// // Query transactions to a specific address
354 /// let query = Query::new()
355 /// .from_block(19000000)
356 /// .to_block_excl(19000100)
357 /// .where_transactions(
358 /// TransactionFilter::all()
359 /// .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
360 /// )
361 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
362 /// let response = client.collect_events(query, StreamConfig::default()).await?;
363 ///
364 /// println!("Collected {} events", response.data.len());
365 /// # Ok(())
366 /// # }
367 /// ```
368 pub async fn collect_events(
369 &self,
370 mut query: Query,
371 config: StreamConfig,
372 ) -> Result<EventResponse> {
373 check_simple_stream_params(&config)?;
374
375 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
376 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
377
378 let mut recv = stream::stream_arrow(self, query, config)
379 .await
380 .context("start stream")?;
381
382 let mut data = Vec::new();
383 let mut archive_height = None;
384 let mut next_block = 0;
385 let mut total_execution_time = 0;
386
387 while let Some(res) = recv.recv().await {
388 let res = res.context("get response")?;
389 let res: QueryResponse =
390 QueryResponse::try_from(&res).context("convert arrow response")?;
391 let events = event_join_strategy.join_from_response_data(res.data);
392
393 data.extend(events);
394
395 archive_height = res.archive_height;
396 next_block = res.next_block;
397 total_execution_time += res.total_execution_time
398 }
399
400 Ok(EventResponse {
401 archive_height,
402 next_block,
403 total_execution_time,
404 data,
405 rollback_guard: None,
406 })
407 }
408
409 /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
410 /// the provided query and stream configuration.
411 ///
412 /// Returns data in Apache Arrow format for high-performance columnar processing.
413 /// Useful for analytics workloads or when working with Arrow-compatible tools.
414 ///
415 /// # ⚠️ Important Warning
416 ///
417 /// This method will continue executing until the query has run to completion from beginning
418 /// to the end of the block range defined in the query. For heavy queries with large block
419 /// ranges or high data volumes, consider:
420 ///
421 /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
422 /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
423 /// - Break large queries into smaller block ranges
424 ///
425 /// # Example
426 /// ```no_run
427 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
428 ///
429 /// # async fn example() -> anyhow::Result<()> {
430 /// let client = Client::builder()
431 /// .chain_id(1)
432 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
433 /// .build()?;
434 ///
435 /// // Get block data in Arrow format for analytics
436 /// let query = Query::new()
437 /// .from_block(19000000)
438 /// .to_block_excl(19000100)
439 /// .include_all_blocks()
440 /// .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
441 /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
442 ///
443 /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
444 /// # Ok(())
445 /// # }
446 /// ```
447 pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
448 let mut recv = stream::stream_arrow(self, query, config)
449 .await
450 .context("start stream")?;
451
452 let mut data = ArrowResponseData::default();
453 let mut archive_height = None;
454 let mut next_block = 0;
455 let mut total_execution_time = 0;
456
457 while let Some(res) = recv.recv().await {
458 let res = res.context("get response")?;
459
460 for batch in res.data.blocks {
461 data.blocks.push(batch);
462 }
463 for batch in res.data.transactions {
464 data.transactions.push(batch);
465 }
466 for batch in res.data.logs {
467 data.logs.push(batch);
468 }
469 for batch in res.data.traces {
470 data.traces.push(batch);
471 }
472 for batch in res.data.decoded_logs {
473 data.decoded_logs.push(batch);
474 }
475
476 archive_height = res.archive_height;
477 next_block = res.next_block;
478 total_execution_time += res.total_execution_time
479 }
480
481 Ok(ArrowResponse {
482 archive_height,
483 next_block,
484 total_execution_time,
485 data,
486 rollback_guard: None,
487 })
488 }
489
490 /// Writes parquet file getting data through a stream using the provided path, query,
491 /// and stream configuration.
492 ///
493 /// Streams data directly to a Parquet file for efficient storage and later analysis.
494 /// Perfect for data exports or ETL pipelines.
495 ///
496 /// # ⚠️ Important Warning
497 ///
498 /// This method will continue executing until the query has run to completion from beginning
499 /// to the end of the block range defined in the query. For heavy queries with large block
500 /// ranges or high data volumes, consider:
501 ///
502 /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
503 /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
504 /// - Break large queries into smaller block ranges
505 ///
506 /// # Example
507 /// ```no_run
508 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
509 ///
510 /// # async fn example() -> anyhow::Result<()> {
511 /// let client = Client::builder()
512 /// .chain_id(1)
513 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
514 /// .build()?;
515 ///
516 /// // Export all DEX trades to Parquet for analysis
517 /// let query = Query::new()
518 /// .from_block(19000000)
519 /// .to_block_excl(19010000)
520 /// .where_logs(
521 /// LogFilter::all()
522 /// .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
523 /// )
524 /// .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
525 /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
526 ///
527 /// println!("Trade data exported to trades.parquet");
528 /// # Ok(())
529 /// # }
530 /// ```
531 pub async fn collect_parquet(
532 &self,
533 path: &str,
534 query: Query,
535 config: StreamConfig,
536 ) -> Result<()> {
537 parquet_out::collect_parquet(self, path, query, config).await
538 }
539
540 /// Internal implementation of getting chain_id from server
541 async fn get_chain_id_impl(&self) -> Result<u64> {
542 let mut url = self.inner.url.clone();
543 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
544 segments.push("chain_id");
545 std::mem::drop(segments);
546 let req = self.inner.http_client.request(Method::GET, url);
547
548 let res = req.send().await.context("execute http req")?;
549
550 let status = res.status();
551 if !status.is_success() {
552 return Err(anyhow!("http response status code {}", status));
553 }
554
555 let chain_id: ChainId = res.json().await.context("read response body json")?;
556
557 Ok(chain_id.chain_id)
558 }
559
560 /// Internal implementation of getting height from server
561 async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
562 let mut url = self.inner.url.clone();
563 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
564 segments.push("height");
565 std::mem::drop(segments);
566 let mut req = self.inner.http_client.request(Method::GET, url);
567 if let Some(http_timeout_override) = http_timeout_override {
568 req = req.timeout(http_timeout_override);
569 }
570
571 let res = req.send().await.context("execute http req")?;
572
573 let status = res.status();
574 if !status.is_success() {
575 return Err(anyhow!("http response status code {}", status));
576 }
577
578 let height: ArchiveHeight = res.json().await.context("read response body json")?;
579
580 Ok(height.height.unwrap_or(0))
581 }
582
583 /// Get the chain_id from the server with retries.
584 ///
585 /// # Example
586 /// ```no_run
587 /// use hypersync_client::Client;
588 ///
589 /// # async fn example() -> anyhow::Result<()> {
590 /// let client = Client::builder()
591 /// .chain_id(1)
592 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
593 /// .build()?;
594 ///
595 /// let chain_id = client.get_chain_id().await?;
596 /// println!("Connected to chain ID: {}", chain_id);
597 /// # Ok(())
598 /// # }
599 /// ```
600 pub async fn get_chain_id(&self) -> Result<u64> {
601 let mut base = self.inner.retry_base_ms;
602
603 let mut err = anyhow!("");
604
605 for _ in 0..self.inner.max_num_retries + 1 {
606 match self.get_chain_id_impl().await {
607 Ok(res) => return Ok(res),
608 Err(e) => {
609 log::error!(
610 "failed to get chain_id from server, retrying... The error was: {e:?}"
611 );
612 err = err.context(format!("{e:?}"));
613 }
614 }
615
616 let base_ms = Duration::from_millis(base);
617 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
618 rand::random(),
619 self.inner.retry_backoff_ms,
620 ));
621
622 tokio::time::sleep(base_ms + jitter).await;
623
624 base = std::cmp::min(
625 base + self.inner.retry_backoff_ms,
626 self.inner.retry_ceiling_ms,
627 );
628 }
629
630 Err(err)
631 }
632
633 /// Get the height of from server with retries.
634 ///
635 /// # Example
636 /// ```no_run
637 /// use hypersync_client::Client;
638 ///
639 /// # async fn example() -> anyhow::Result<()> {
640 /// let client = Client::builder()
641 /// .chain_id(1)
642 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
643 /// .build()?;
644 ///
645 /// let height = client.get_height().await?;
646 /// println!("Current block height: {}", height);
647 /// # Ok(())
648 /// # }
649 /// ```
650 pub async fn get_height(&self) -> Result<u64> {
651 let mut base = self.inner.retry_base_ms;
652
653 let mut err = anyhow!("");
654
655 for _ in 0..self.inner.max_num_retries + 1 {
656 match self.get_height_impl(None).await {
657 Ok(res) => return Ok(res),
658 Err(e) => {
659 log::error!(
660 "failed to get height from server, retrying... The error was: {e:?}"
661 );
662 err = err.context(format!("{e:?}"));
663 }
664 }
665
666 let base_ms = Duration::from_millis(base);
667 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
668 rand::random(),
669 self.inner.retry_backoff_ms,
670 ));
671
672 tokio::time::sleep(base_ms + jitter).await;
673
674 base = std::cmp::min(
675 base + self.inner.retry_backoff_ms,
676 self.inner.retry_ceiling_ms,
677 );
678 }
679
680 Err(err)
681 }
682
683 /// Get the height of the Client instance for health checks.
684 ///
685 /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
686 ///
687 /// # Example
688 /// ```no_run
689 /// use hypersync_client::Client;
690 /// use std::time::Duration;
691 ///
692 /// # async fn example() -> anyhow::Result<()> {
693 /// let client = Client::builder()
694 /// .chain_id(1)
695 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
696 /// .build()?;
697 ///
698 /// // Quick health check with 5 second timeout
699 /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
700 /// println!("Server is healthy at block: {}", height);
701 /// # Ok(())
702 /// # }
703 /// ```
704 pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
705 self.get_height_impl(http_req_timeout).await
706 }
707
708 /// Executes query with retries and returns the response.
709 ///
710 /// # Example
711 /// ```no_run
712 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
713 ///
714 /// # async fn example() -> anyhow::Result<()> {
715 /// let client = Client::builder()
716 /// .chain_id(1)
717 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
718 /// .build()?;
719 ///
720 /// // Query all blocks from a specific range
721 /// let query = Query::new()
722 /// .from_block(19000000)
723 /// .to_block_excl(19000010)
724 /// .include_all_blocks()
725 /// .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
726 /// let response = client.get(&query).await?;
727 ///
728 /// println!("Retrieved {} blocks", response.data.blocks.len());
729 /// # Ok(())
730 /// # }
731 /// ```
732 pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
733 let arrow_response = self.get_arrow(query).await.context("get data")?;
734 let converted =
735 QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
736 Ok(converted)
737 }
738
739 /// Add block, transaction and log fields selection to the query, executes it with retries
740 /// and returns the response.
741 ///
742 /// This method automatically joins blocks, transactions, and logs into unified events,
743 /// making it easier to work with related blockchain data.
744 ///
745 /// # Example
746 /// ```no_run
747 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
748 ///
749 /// # async fn example() -> anyhow::Result<()> {
750 /// let client = Client::builder()
751 /// .chain_id(1)
752 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
753 /// .build()?;
754 ///
755 /// // Query ERC20 transfers with transaction context
756 /// let query = Query::new()
757 /// .from_block(19000000)
758 /// .to_block_excl(19000010)
759 /// .where_logs(
760 /// LogFilter::all()
761 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
762 /// )
763 /// .select_log_fields([LogField::Address, LogField::Data])
764 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
765 /// let response = client.get_events(query).await?;
766 ///
767 /// println!("Retrieved {} joined events", response.data.len());
768 /// # Ok(())
769 /// # }
770 /// ```
771 pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
772 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
773 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
774 let arrow_response = self.get_arrow(&query).await.context("get data")?;
775 EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
776 }
777
778 /// Executes query once and returns the result in (Arrow, size) format using JSON serialization.
779 async fn get_arrow_impl_json(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
780 let mut url = self.inner.url.clone();
781 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
782 segments.push("query");
783 segments.push("arrow-ipc");
784 std::mem::drop(segments);
785 let req = self.inner.http_client.request(Method::POST, url);
786
787 let res = req.json(&query).send().await.context("execute http req")?;
788
789 let status = res.status();
790 if !status.is_success() {
791 let text = res.text().await.context("read text to see error")?;
792
793 return Err(anyhow!(
794 "http response status code {}, err body: {}",
795 status,
796 text
797 ));
798 }
799
800 let bytes = res.bytes().await.context("read response body bytes")?;
801
802 let res = tokio::task::block_in_place(|| {
803 parse_query_response(&bytes).context("parse query response")
804 })?;
805
806 Ok((res, bytes.len().try_into().unwrap()))
807 }
808
809 fn should_cache_queries(&self) -> bool {
810 matches!(
811 self.inner.serialization_format,
812 SerializationFormat::CapnProto {
813 should_cache_queries: true
814 }
815 )
816 }
817
818 /// Executes query once and returns the result in (Arrow, size) format using Cap'n Proto serialization.
819 async fn get_arrow_impl_capnp(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
820 let mut url = self.inner.url.clone();
821 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
822 segments.push("query");
823 segments.push("arrow-ipc");
824 segments.push("capnp");
825 std::mem::drop(segments);
826
827 let should_cache = self.should_cache_queries();
828
829 if should_cache {
830 let query_with_id = {
831 let mut message = capnp::message::Builder::new_default();
832 let mut request_builder =
833 message.init_root::<hypersync_net_types_capnp::request::Builder>();
834
835 request_builder.build_query_id_from_query(query)?;
836 let mut query_with_id = Vec::new();
837 capnp::serialize_packed::write_message(&mut query_with_id, &message)?;
838 query_with_id
839 };
840
841 let mut req = self.inner.http_client.request(Method::POST, url.clone());
842 req = req.header("content-type", "application/x-capnp");
843
844 let res = req
845 .body(query_with_id)
846 .send()
847 .await
848 .context("execute http req")?;
849
850 let status = res.status();
851 if status.is_success() {
852 let bytes = res.bytes().await.context("read response body bytes")?;
853
854 let mut opts = capnp::message::ReaderOptions::new();
855 opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
856 let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
857 .context("create message reader")?;
858 let query_response = message_reader
859 .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
860 .context("get cached_query_response root")?;
861 match query_response.get_either().which()? {
862 hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
863 query_response,
864 ) => {
865 let res = tokio::task::block_in_place(|| {
866 let res = query_response?;
867 read_query_response(&res).context("parse query response cached")
868 })?;
869 return Ok((res, bytes.len().try_into().unwrap()));
870 }
871 hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
872 log::trace!("query was not cached, retrying with full query");
873 }
874 }
875 } else {
876 let text = res.text().await.context("read text to see error")?;
877 log::warn!(
878 "Failed cache query, will retry full query. {}, err body: {}",
879 status,
880 text
881 );
882 }
883 };
884
885 let full_query_bytes = {
886 let mut message = capnp::message::Builder::new_default();
887 let mut query_builder =
888 message.init_root::<hypersync_net_types_capnp::request::Builder>();
889
890 query_builder.build_full_query_from_query(query, should_cache)?;
891 let mut bytes = Vec::new();
892 capnp::serialize_packed::write_message(&mut bytes, &message)?;
893 bytes
894 };
895
896 let mut req = self.inner.http_client.request(Method::POST, url);
897 req = req.header("content-type", "application/x-capnp");
898
899 let res = req
900 .header("content-type", "application/x-capnp")
901 .body(full_query_bytes)
902 .send()
903 .await
904 .context("execute http req")?;
905
906 let status = res.status();
907 if !status.is_success() {
908 let text = res.text().await.context("read text to see error")?;
909
910 return Err(anyhow!(
911 "http response status code {}, err body: {}",
912 status,
913 text
914 ));
915 }
916
917 let bytes = res.bytes().await.context("read response body bytes")?;
918
919 let res = tokio::task::block_in_place(|| {
920 parse_query_response(&bytes).context("parse query response")
921 })?;
922
923 Ok((res, bytes.len().try_into().unwrap()))
924 }
925
926 /// Executes query once and returns the result in (Arrow, size) format.
927 async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
928 match self.inner.serialization_format {
929 SerializationFormat::Json => self.get_arrow_impl_json(query).await,
930 SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(query).await,
931 }
932 }
933
934 /// Executes query with retries and returns the response in Arrow format.
935 pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
936 self.get_arrow_with_size(query).await.map(|res| res.0)
937 }
938
939 /// Internal implementation for get_arrow.
940 async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
941 let mut base = self.inner.retry_base_ms;
942
943 let mut err = anyhow!("");
944
945 for _ in 0..self.inner.max_num_retries + 1 {
946 match self.get_arrow_impl(query).await {
947 Ok(res) => return Ok(res),
948 Err(e) => {
949 log::error!(
950 "failed to get arrow data from server, retrying... The error was: {e:?}"
951 );
952 err = err.context(format!("{e:?}"));
953 }
954 }
955
956 let base_ms = Duration::from_millis(base);
957 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
958 rand::random(),
959 self.inner.retry_backoff_ms,
960 ));
961
962 tokio::time::sleep(base_ms + jitter).await;
963
964 base = std::cmp::min(
965 base + self.inner.retry_backoff_ms,
966 self.inner.retry_ceiling_ms,
967 );
968 }
969
970 Err(err)
971 }
972
973 /// Spawns task to execute query and return data via a channel.
974 ///
975 /// # Example
976 /// ```no_run
977 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
978 ///
979 /// # async fn example() -> anyhow::Result<()> {
980 /// let client = Client::builder()
981 /// .chain_id(1)
982 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
983 /// .build()?;
984 ///
985 /// // Stream all ERC20 transfer events
986 /// let query = Query::new()
987 /// .from_block(19000000)
988 /// .where_logs(
989 /// LogFilter::all()
990 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
991 /// )
992 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
993 /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
994 ///
995 /// while let Some(response) = receiver.recv().await {
996 /// let response = response?;
997 /// println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
998 /// }
999 /// # Ok(())
1000 /// # }
1001 /// ```
1002 pub async fn stream(
1003 &self,
1004 query: Query,
1005 config: StreamConfig,
1006 ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1007 check_simple_stream_params(&config)?;
1008
1009 let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1010 mpsc::channel(config.concurrency);
1011
1012 let mut inner_rx = self
1013 .stream_arrow(query, config)
1014 .await
1015 .context("start inner stream")?;
1016
1017 tokio::spawn(async move {
1018 while let Some(resp) = inner_rx.recv().await {
1019 let msg = resp
1020 .context("inner receiver")
1021 .and_then(|r| QueryResponse::try_from(&r));
1022 let is_err = msg.is_err();
1023 if tx.send(msg).await.is_err() || is_err {
1024 return;
1025 }
1026 }
1027 });
1028
1029 Ok(rx)
1030 }
1031
1032 /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1033 /// returning data via a channel.
1034 ///
1035 /// This method automatically joins blocks, transactions, and logs into unified events,
1036 /// then streams them via a channel for real-time processing.
1037 ///
1038 /// # Example
1039 /// ```no_run
1040 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1041 ///
1042 /// # async fn example() -> anyhow::Result<()> {
1043 /// let client = Client::builder()
1044 /// .chain_id(1)
1045 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1046 /// .build()?;
1047 ///
1048 /// // Stream NFT transfer events with transaction context
1049 /// let query = Query::new()
1050 /// .from_block(19000000)
1051 /// .where_logs(
1052 /// LogFilter::all()
1053 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1054 /// )
1055 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1056 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1057 /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1058 ///
1059 /// while let Some(response) = receiver.recv().await {
1060 /// let response = response?;
1061 /// println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1062 /// }
1063 /// # Ok(())
1064 /// # }
1065 /// ```
1066 pub async fn stream_events(
1067 &self,
1068 mut query: Query,
1069 config: StreamConfig,
1070 ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1071 check_simple_stream_params(&config)?;
1072
1073 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1074
1075 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1076
1077 let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1078 mpsc::channel(config.concurrency);
1079
1080 let mut inner_rx = self
1081 .stream_arrow(query, config)
1082 .await
1083 .context("start inner stream")?;
1084
1085 tokio::spawn(async move {
1086 while let Some(resp) = inner_rx.recv().await {
1087 let msg = resp
1088 .context("inner receiver")
1089 .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1090 let is_err = msg.is_err();
1091 if tx.send(msg).await.is_err() || is_err {
1092 return;
1093 }
1094 }
1095 });
1096
1097 Ok(rx)
1098 }
1099
1100 /// Spawns task to execute query and return data via a channel in Arrow format.
1101 ///
1102 /// Returns raw Apache Arrow data via a channel for high-performance processing.
1103 /// Ideal for applications that need to work directly with columnar data.
1104 ///
1105 /// # Example
1106 /// ```no_run
1107 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1108 ///
1109 /// # async fn example() -> anyhow::Result<()> {
1110 /// let client = Client::builder()
1111 /// .chain_id(1)
1112 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1113 /// .build()?;
1114 ///
1115 /// // Stream transaction data in Arrow format for analytics
1116 /// let query = Query::new()
1117 /// .from_block(19000000)
1118 /// .to_block_excl(19000100)
1119 /// .where_transactions(
1120 /// TransactionFilter::all()
1121 /// .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1122 /// )
1123 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1124 /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1125 ///
1126 /// while let Some(response) = receiver.recv().await {
1127 /// let response = response?;
1128 /// println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1129 /// }
1130 /// # Ok(())
1131 /// # }
1132 /// ```
1133 pub async fn stream_arrow(
1134 &self,
1135 query: Query,
1136 config: StreamConfig,
1137 ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1138 stream::stream_arrow(self, query, config).await
1139 }
1140
1141 /// Getter for url field.
1142 ///
1143 /// # Example
1144 /// ```
1145 /// use hypersync_client::Client;
1146 ///
1147 /// let client = Client::builder()
1148 /// .chain_id(1)
1149 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1150 /// .build()
1151 /// .unwrap();
1152 ///
1153 /// println!("Client URL: {}", client.url());
1154 /// ```
1155 pub fn url(&self) -> &Url {
1156 &self.inner.url
1157 }
1158}
1159
1160/// Builder for creating a hypersync client with configuration options.
1161///
1162/// This builder provides a fluent API for configuring client settings like URL,
1163/// authentication, timeouts, and retry behavior.
1164///
1165/// # Example
1166/// ```
1167/// use hypersync_client::{Client, SerializationFormat};
1168///
1169/// let client = Client::builder()
1170/// .chain_id(1)
1171/// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1172/// .http_req_timeout_millis(30000)
1173/// .max_num_retries(3)
1174/// .build()
1175/// .unwrap();
1176/// ```
1177#[derive(Debug, Clone, Default)]
1178pub struct ClientBuilder(ClientConfig);
1179
1180impl ClientBuilder {
1181 /// Creates a new ClientBuilder with default configuration.
1182 pub fn new() -> Self {
1183 Self::default()
1184 }
1185
1186 /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1187 ///
1188 /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1189 ///
1190 /// # Arguments
1191 /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1192 ///
1193 /// # Example
1194 /// ```
1195 /// use hypersync_client::Client;
1196 ///
1197 /// let client = Client::builder()
1198 /// .chain_id(1) // Ethereum mainnet
1199 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1200 /// .build()
1201 /// .unwrap();
1202 /// ```
1203 pub fn chain_id(mut self, chain_id: u64) -> Self {
1204 self.0.url = format!("https://{chain_id}.hypersync.xyz");
1205 self
1206 }
1207
1208 /// Sets a custom URL for the hypersync server.
1209 ///
1210 /// Use this method when you need to connect to a custom hypersync endpoint
1211 /// instead of the default public endpoints.
1212 ///
1213 /// # Arguments
1214 /// * `url` - The hypersync server URL
1215 ///
1216 /// # Example
1217 /// ```
1218 /// use hypersync_client::Client;
1219 ///
1220 /// let client = Client::builder()
1221 /// .url("https://my-custom-hypersync.example.com")
1222 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1223 /// .build()
1224 /// .unwrap();
1225 /// ```
1226 pub fn url<S: ToString>(mut self, url: S) -> Self {
1227 self.0.url = url.to_string();
1228 self
1229 }
1230
1231 /// Sets the api token for authentication.
1232 ///
1233 /// Required for accessing authenticated hypersync endpoints.
1234 ///
1235 /// # Arguments
1236 /// * `api_token` - The authentication token
1237 ///
1238 /// # Example
1239 /// ```
1240 /// use hypersync_client::Client;
1241 ///
1242 /// let client = Client::builder()
1243 /// .chain_id(1)
1244 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1245 /// .build()
1246 /// .unwrap();
1247 /// ```
1248 pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1249 self.0.api_token = api_token.to_string();
1250 self
1251 }
1252
1253 /// Sets the HTTP request timeout in milliseconds.
1254 ///
1255 /// # Arguments
1256 /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1257 ///
1258 /// # Example
1259 /// ```
1260 /// use hypersync_client::Client;
1261 ///
1262 /// let client = Client::builder()
1263 /// .chain_id(1)
1264 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1265 /// .http_req_timeout_millis(60000) // 60 second timeout
1266 /// .build()
1267 /// .unwrap();
1268 /// ```
1269 pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1270 self.0.http_req_timeout_millis = http_req_timeout_millis;
1271 self
1272 }
1273
1274 /// Sets the maximum number of retries for failed requests.
1275 ///
1276 /// # Arguments
1277 /// * `max_num_retries` - Maximum number of retries (default: 10)
1278 ///
1279 /// # Example
1280 /// ```
1281 /// use hypersync_client::Client;
1282 ///
1283 /// let client = Client::builder()
1284 /// .chain_id(1)
1285 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1286 /// .max_num_retries(5)
1287 /// .build()
1288 /// .unwrap();
1289 /// ```
1290 pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1291 self.0.max_num_retries = max_num_retries;
1292 self
1293 }
1294
1295 /// Sets the backoff increment for retry delays.
1296 ///
1297 /// This value is added to the base delay on each retry attempt.
1298 ///
1299 /// # Arguments
1300 /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1301 ///
1302 /// # Example
1303 /// ```
1304 /// use hypersync_client::Client;
1305 ///
1306 /// let client = Client::builder()
1307 /// .chain_id(1)
1308 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1309 /// .retry_backoff_ms(1000) // 1 second backoff increment
1310 /// .build()
1311 /// .unwrap();
1312 /// ```
1313 pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1314 self.0.retry_backoff_ms = retry_backoff_ms;
1315 self
1316 }
1317
1318 /// Sets the initial delay for retry attempts.
1319 ///
1320 /// # Arguments
1321 /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1322 ///
1323 /// # Example
1324 /// ```
1325 /// use hypersync_client::Client;
1326 ///
1327 /// let client = Client::builder()
1328 /// .chain_id(1)
1329 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1330 /// .retry_base_ms(1000) // Start with 1 second delay
1331 /// .build()
1332 /// .unwrap();
1333 /// ```
1334 pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1335 self.0.retry_base_ms = retry_base_ms;
1336 self
1337 }
1338
1339 /// Sets the maximum delay for retry attempts.
1340 ///
1341 /// The retry delay will not exceed this value, even with backoff increments.
1342 ///
1343 /// # Arguments
1344 /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1345 ///
1346 /// # Example
1347 /// ```
1348 /// use hypersync_client::Client;
1349 ///
1350 /// let client = Client::builder()
1351 /// .chain_id(1)
1352 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1353 /// .retry_ceiling_ms(30000) // Cap at 30 seconds
1354 /// .build()
1355 /// .unwrap();
1356 /// ```
1357 pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1358 self.0.retry_ceiling_ms = retry_ceiling_ms;
1359 self
1360 }
1361
1362 /// Sets the serialization format for client-server communication.
1363 ///
1364 /// # Arguments
1365 /// * `serialization_format` - The format to use (JSON or CapnProto)
1366 ///
1367 /// # Example
1368 /// ```
1369 /// use hypersync_client::{Client, SerializationFormat};
1370 ///
1371 /// let client = Client::builder()
1372 /// .chain_id(1)
1373 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1374 /// .serialization_format(SerializationFormat::Json)
1375 /// .build()
1376 /// .unwrap();
1377 /// ```
1378 pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1379 self.0.serialization_format = serialization_format;
1380 self
1381 }
1382
1383 /// Builds the client with the configured settings.
1384 ///
1385 /// # Returns
1386 /// * `Result<Client>` - The configured client or an error if configuration is invalid
1387 ///
1388 /// # Errors
1389 /// Returns an error if:
1390 /// * The URL is malformed
1391 /// * Required configuration is missing
1392 ///
1393 /// # Example
1394 /// ```
1395 /// use hypersync_client::Client;
1396 ///
1397 /// let client = Client::builder()
1398 /// .chain_id(1)
1399 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1400 /// .build()
1401 /// .unwrap();
1402 /// ```
1403 pub fn build(self) -> Result<Client> {
1404 if self.0.url.is_empty() {
1405 anyhow::bail!(
1406 "endpoint needs to be set, try using builder.chain_id(1) or\
1407 builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1408 )
1409 }
1410 Client::new(self.0)
1411 }
1412}
1413
1414/// 200ms
1415const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1416const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1417/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1418/// so we timeout after 15s (3x the ping interval).
1419const READ_TIMEOUT: Duration = Duration::from_secs(15);
1420
1421/// Events emitted by the height stream.
1422#[derive(Debug, Clone, PartialEq, Eq)]
1423pub enum HeightStreamEvent {
1424 /// Successfully connected or reconnected to the SSE stream.
1425 Connected,
1426 /// Received a height update from the server.
1427 Height(u64),
1428 /// Connection lost, will attempt to reconnect after the specified delay.
1429 Reconnecting {
1430 /// Duration to wait before attempting reconnection.
1431 delay: Duration,
1432 /// The error that caused the reconnection.
1433 error_msg: String,
1434 },
1435}
1436
1437enum InternalStreamEvent {
1438 Publish(HeightStreamEvent),
1439 Ping,
1440 Unknown(String),
1441}
1442
1443impl Client {
1444 fn get_es_stream(&self) -> Result<EventSource> {
1445 // Build the GET /height/sse request
1446 let mut url = self.inner.url.clone();
1447 url.path_segments_mut()
1448 .ok()
1449 .context("invalid base URL")?
1450 .extend(&["height", "sse"]);
1451
1452 let req = self
1453 .inner
1454 .http_client
1455 // Don't set timeout for SSE stream
1456 .request_no_timeout(Method::GET, url);
1457
1458 // Configure exponential backoff for library-level retries
1459 let retry_policy = ExponentialBackoff::new(
1460 INITIAL_RECONNECT_DELAY,
1461 2.0,
1462 Some(MAX_RECONNECT_DELAY),
1463 None, // unlimited retries
1464 );
1465
1466 // Turn the request into an EventSource stream with retries
1467 let mut es = reqwest_eventsource::EventSource::new(req)
1468 .context("unexpected error creating EventSource")?;
1469 es.set_retry_policy(Box::new(retry_policy));
1470 Ok(es)
1471 }
1472
1473 async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1474 let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1475 .await
1476 .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1477 else {
1478 return Ok(None);
1479 };
1480
1481 let e = match res.context("failed response")? {
1482 Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1483 Event::Message(event) => match event.event.as_str() {
1484 "height" => {
1485 let height = event
1486 .data
1487 .trim()
1488 .parse::<u64>()
1489 .context("parsing height from event data")?;
1490 InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1491 }
1492 "ping" => InternalStreamEvent::Ping,
1493 _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1494 },
1495 };
1496
1497 Ok(Some(e))
1498 }
1499
1500 async fn stream_height_events(
1501 es: &mut EventSource,
1502 tx: &mpsc::Sender<HeightStreamEvent>,
1503 ) -> Result<bool> {
1504 let mut received_an_event = false;
1505 while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1506 match event {
1507 InternalStreamEvent::Publish(event) => {
1508 received_an_event = true;
1509 if tx.send(event).await.is_err() {
1510 return Ok(received_an_event); // Receiver dropped, exit task
1511 }
1512 }
1513 InternalStreamEvent::Ping => (), // ignore pings
1514 InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1515 }
1516 }
1517 Ok(received_an_event)
1518 }
1519
1520 fn get_delay(consecutive_failures: u32) -> Duration {
1521 if consecutive_failures > 0 {
1522 /// helper function to calculate 2^x
1523 /// optimization using bit shifting
1524 const fn two_to_pow(x: u32) -> u32 {
1525 1 << x
1526 }
1527 // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1528 INITIAL_RECONNECT_DELAY
1529 .saturating_mul(two_to_pow(consecutive_failures - 1))
1530 .min(MAX_RECONNECT_DELAY)
1531 } else {
1532 // On zero consecutive failures, 0 delay
1533 Duration::from_millis(0)
1534 }
1535 }
1536
1537 async fn stream_height_events_with_retry(
1538 &self,
1539 tx: &mpsc::Sender<HeightStreamEvent>,
1540 ) -> Result<()> {
1541 let mut consecutive_failures = 0u32;
1542
1543 loop {
1544 // should always be able to creat a new es stream
1545 // something is wrong with the req builder otherwise
1546 let mut es = self.get_es_stream().context("get es stream")?;
1547
1548 let mut error = anyhow!("");
1549
1550 match Self::stream_height_events(&mut es, tx).await {
1551 Ok(received_an_event) => {
1552 if received_an_event {
1553 consecutive_failures = 0; // Reset after successful connection that then failed
1554 }
1555 log::trace!("Stream height exited");
1556 }
1557 Err(e) => {
1558 log::trace!("Stream height failed: {e:?}");
1559 error = e;
1560 }
1561 }
1562
1563 es.close();
1564
1565 // If the receiver is closed, exit the task
1566 if tx.is_closed() {
1567 break;
1568 }
1569
1570 let delay = Self::get_delay(consecutive_failures);
1571 log::trace!("Reconnecting in {:?}...", delay);
1572
1573 let error_msg = format!("{error:?}");
1574
1575 if tx
1576 .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1577 .await
1578 .is_err()
1579 {
1580 return Ok(()); // Receiver dropped, exit task
1581 }
1582 tokio::time::sleep(delay).await;
1583
1584 // increment consecutive failures so that on the next try
1585 // it will start using back offs
1586 consecutive_failures += 1;
1587 }
1588
1589 Ok(())
1590 }
1591
1592 /// Streams archive height updates from the server via Server-Sent Events.
1593 ///
1594 /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1595 /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1596 ///
1597 /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1598 /// and height updates. This allows applications to display connection status to users.
1599 ///
1600 /// # Returns
1601 /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1602 /// lifecycle and sends events through this channel.
1603 ///
1604 /// # Example
1605 /// ```
1606 /// # use hypersync_client::{Client, HeightStreamEvent};
1607 /// # async fn example() -> anyhow::Result<()> {
1608 /// let client = Client::builder()
1609 /// .url("https://eth.hypersync.xyz")
1610 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1611 /// .build()?;
1612 ///
1613 /// let mut rx = client.stream_height();
1614 ///
1615 /// while let Some(event) = rx.recv().await {
1616 /// match event {
1617 /// HeightStreamEvent::Connected => println!("Connected to stream"),
1618 /// HeightStreamEvent::Height(h) => println!("Height: {}", h),
1619 /// HeightStreamEvent::Reconnecting { delay, error_msg } => {
1620 /// println!("Reconnecting in {delay:?} due to error: {error_msg}")
1621 /// }
1622 /// }
1623 /// }
1624 /// # Ok(())
1625 /// # }
1626 /// ```
1627 pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1628 let (tx, rx) = mpsc::channel(16);
1629 let client = self.clone();
1630
1631 tokio::spawn(async move {
1632 if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1633 log::error!("Stream height failed unexpectedly: {e:?}");
1634 }
1635 });
1636
1637 rx
1638 }
1639}
1640
1641fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1642 if config.event_signature.is_some() {
1643 return Err(anyhow!(
1644 "config.event_signature can't be passed to simple type function. User is expected to \
1645 decode the logs using Decoder."
1646 ));
1647 }
1648 if config.column_mapping.is_some() {
1649 return Err(anyhow!(
1650 "config.column_mapping can't be passed to single type function. User is expected to \
1651 map values manually."
1652 ));
1653 }
1654
1655 Ok(())
1656}
1657
1658#[cfg(test)]
1659mod tests {
1660 use super::*;
1661 #[test]
1662 fn test_get_delay() {
1663 assert_eq!(
1664 Client::get_delay(0),
1665 Duration::from_millis(0),
1666 "starts with 0 delay"
1667 );
1668 // powers of 2 backoff
1669 assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1670 assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1671 assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1672 // maxes out at 30s
1673 assert_eq!(
1674 Client::get_delay(9),
1675 Duration::from_secs(30),
1676 "max delay is 30s"
1677 );
1678 assert_eq!(
1679 Client::get_delay(10),
1680 Duration::from_secs(30),
1681 "max delay is 30s"
1682 );
1683 }
1684
1685 #[tokio::test]
1686 #[ignore = "integration test with live hs server for height stream"]
1687 async fn test_stream_height_events() -> anyhow::Result<()> {
1688 let (tx, mut rx) = mpsc::channel(16);
1689 let handle = tokio::spawn(async move {
1690 let client = Client::builder()
1691 .url("https://monad-testnet.hypersync.xyz")
1692 .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1693 .build()?;
1694 let mut es = client.get_es_stream().context("get es stream")?;
1695 Client::stream_height_events(&mut es, &tx).await
1696 });
1697
1698 let val = rx.recv().await;
1699 assert!(val.is_some());
1700 assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
1701 let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
1702 panic!("should have received height")
1703 };
1704 let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
1705 panic!("should have received height")
1706 };
1707 assert!(height2 > height);
1708 drop(rx);
1709
1710 let res = handle.await.expect("should have joined");
1711 let received_an_event =
1712 res.expect("should have ended the stream gracefully after dropping rx");
1713 assert!(received_an_event, "should have received an event");
1714
1715 Ok(())
1716 }
1717}