hypersync_client/lib.rs
1#![deny(missing_docs)]
2//! # HyperSync Client
3//!
4//! A high-performance Rust client for the HyperSync protocol, enabling efficient retrieval
5//! of blockchain data including blocks, transactions, logs, and traces.
6//!
7//! ## Features
8//!
9//! - **High-performance streaming**: Parallel data fetching with automatic retries
10//! - **Flexible querying**: Rich query builder API for precise data selection
11//! - **Multiple data formats**: Support for Arrow, Parquet, and simple Rust types
12//! - **Event decoding**: Automatic ABI decoding for smart contract events
13//! - **Real-time updates**: Live height streaming via Server-Sent Events
14//! - **Production ready**: Built-in rate limiting, retries, and error handling
15//!
16//! ## Quick Start
17//!
18//! ```no_run
19//! use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//! // Create a client for Ethereum mainnet
24//! let client = Client::builder()
25//! .chain_id(1)
26//! .api_token(std::env::var("ENVIO_API_TOKEN")?)
27//! .build()?;
28//!
29//! // Query ERC20 transfer events from USDC contract
30//! let query = Query::new()
31//! .from_block(19000000)
32//! .to_block_excl(19001000)
33//! .where_logs(
34//! LogFilter::all()
35//! .and_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
36//! .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
37//! )
38//! .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
39//!
40//! // Get all data in one response
41//! let response = client.get(&query).await?;
42//! println!("Retrieved {} blocks", response.data.blocks.len());
43//!
44//! // Or stream data for large ranges
45//! let mut receiver = client.stream(query, StreamConfig::default()).await?;
46//! while let Some(response) = receiver.recv().await {
47//! let response = response?;
48//! println!("Streaming: got blocks up to {}", response.next_block);
49//! }
50//!
51//! Ok(())
52//! }
53//! ```
54//!
55//! ## Main Types
56//!
57//! - [`Client`] - Main client for interacting with HyperSync servers
58//! - [`net_types::Query`] - Query builder for specifying what data to fetch
59//! - [`StreamConfig`] - Configuration for streaming operations
60//! - [`QueryResponse`] - Response containing blocks, transactions, logs, and traces
61//! - [`ArrowResponse`] - Response in Apache Arrow format for high-performance processing
62//!
63//! ## Authentication
64//!
65//! You'll need a HyperSync API token to access the service. Get one from
66//! [https://envio.dev/app/api-tokens](https://envio.dev/app/api-tokens).
67//!
68//! ## Examples
69//!
70//! See the `examples/` directory for more detailed usage patterns including:
71//! - ERC20 token transfers
72//! - Wallet transaction history
73//! - Event decoding and filtering
74//! - Real-time data streaming
75use std::time::Instant;
76use std::{sync::Arc, time::Duration};
77
78use anyhow::{anyhow, Context, Result};
79use futures::StreamExt;
80use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
81use reqwest::{Method, StatusCode};
82use reqwest_eventsource::retry::ExponentialBackoff;
83use reqwest_eventsource::{Event, EventSource};
84
85pub mod arrow_reader;
86mod column_mapping;
87mod config;
88mod decode;
89mod decode_call;
90mod from_arrow;
91mod parquet_out;
92mod parse_response;
93pub mod preset_query;
94mod rate_limit;
95mod rayon_async;
96pub mod simple_types;
97mod stream;
98mod types;
99mod util;
100
101pub use hypersync_format as format;
102pub use hypersync_net_types as net_types;
103pub use hypersync_schema as schema;
104
105use parse_response::parse_query_response;
106use tokio::sync::mpsc;
107use types::ResponseData;
108use url::Url;
109
110pub use column_mapping::{ColumnMapping, DataType};
111pub use config::HexOutput;
112pub use config::{ClientConfig, SerializationFormat, StreamConfig};
113pub use decode::Decoder;
114pub use decode_call::CallDecoder;
115pub use rate_limit::RateLimitInfo;
116pub use types::{
117 ArrowResponse, ArrowResponseData, EventResponse, QueryResponse, QueryResponseWithRateLimit,
118};
119
120use crate::parse_response::read_query_response;
121use crate::simple_types::InternalEventJoinStrategy;
122
123#[derive(Debug)]
124struct HttpClientWrapper {
125 /// Mutable state that needs to be refreshed periodically
126 inner: std::sync::Mutex<HttpClientWrapperInner>,
127 /// HyperSync server api token.
128 api_token: String,
129 /// Standard timeout for http requests.
130 timeout: Duration,
131 /// Pools are never idle since polling often happens on the client
132 /// so connections never timeout and dns lookups never happen again
133 /// this is problematic if the underlying ip address changes during
134 /// failovers on hypersync. Since reqwest doesn't implement this we
135 /// simply recreate the client every time
136 max_connection_age: Duration,
137 /// For refreshing the client
138 user_agent: String,
139}
140
141#[derive(Debug)]
142struct HttpClientWrapperInner {
143 /// Initialized reqwest instance for client url.
144 client: reqwest::Client,
145 /// Timestamp when the client was created
146 created_at: Instant,
147}
148
149impl HttpClientWrapper {
150 fn new(user_agent: String, api_token: String, timeout: Duration) -> Self {
151 let client = reqwest::Client::builder()
152 .no_gzip()
153 .user_agent(&user_agent)
154 .build()
155 .unwrap();
156
157 Self {
158 inner: std::sync::Mutex::new(HttpClientWrapperInner {
159 client,
160 created_at: Instant::now(),
161 }),
162 api_token,
163 timeout,
164 max_connection_age: Duration::from_secs(60),
165 user_agent,
166 }
167 }
168
169 fn get_client(&self) -> reqwest::Client {
170 let mut inner = self.inner.lock().expect("HttpClientWrapper mutex poisoned");
171
172 // Check if client needs refresh due to age
173 if inner.created_at.elapsed() > self.max_connection_age {
174 // Recreate client to force new DNS lookup for failover scenarios
175 inner.client = reqwest::Client::builder()
176 .no_gzip()
177 .user_agent(&self.user_agent)
178 .build()
179 .unwrap();
180 inner.created_at = Instant::now();
181 }
182
183 // Clone is cheap for reqwest::Client (it's Arc-wrapped internally)
184 inner.client.clone()
185 }
186
187 fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
188 let client = self.get_client();
189 client
190 .request(method, url)
191 .timeout(self.timeout)
192 .bearer_auth(&self.api_token)
193 }
194
195 fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
196 let client = self.get_client();
197 client.request(method, url).bearer_auth(&self.api_token)
198 }
199}
200
201/// Internal client state to handle http requests and retries.
202#[derive(Debug)]
203struct ClientInner {
204 http_client: HttpClientWrapper,
205 /// HyperSync server URL.
206 url: Url,
207 /// Number of retries to attempt before returning error.
208 max_num_retries: usize,
209 /// Milliseconds that would be used for retry backoff increasing.
210 retry_backoff_ms: u64,
211 /// Initial wait time for request backoff.
212 retry_base_ms: u64,
213 /// Ceiling time for request backoff.
214 retry_ceiling_ms: u64,
215 /// Query serialization format to use for HTTP requests.
216 serialization_format: SerializationFormat,
217 /// Most recently observed rate limit info from the server, paired with the
218 /// instant it was captured so elapsed time can be subtracted from `reset_secs`.
219 rate_limit_state: std::sync::Mutex<Option<(RateLimitInfo, Instant)>>,
220 /// Whether to proactively sleep when the rate limit is exhausted.
221 proactive_rate_limit_sleep: bool,
222}
223
224/// Client to handle http requests and retries.
225#[derive(Clone, Debug)]
226pub struct Client {
227 inner: Arc<ClientInner>,
228}
229
230impl Client {
231 /// Creates a new client with the given configuration.
232 ///
233 /// Configuration must include the `url` and `api_token` fields.
234 /// # Example
235 /// ```
236 /// use hypersync_client::{Client, ClientConfig};
237 ///
238 /// let config = ClientConfig {
239 /// url: "https://eth.hypersync.xyz".to_string(),
240 /// api_token: std::env::var("ENVIO_API_TOKEN")?,
241 /// ..Default::default()
242 /// };
243 /// let client = Client::new(config)?;
244 /// # Ok::<(), anyhow::Error>(())
245 /// ```
246 ///
247 /// # Errors
248 /// This method fails if the config is invalid.
249 pub fn new(cfg: ClientConfig) -> Result<Self> {
250 // hscr stands for hypersync client rust
251 cfg.validate().context("invalid ClientConfig")?;
252 let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
253 Self::new_internal(cfg, user_agent)
254 }
255
256 /// Creates a new client builder.
257 ///
258 /// # Example
259 /// ```
260 /// use hypersync_client::Client;
261 ///
262 /// let client = Client::builder()
263 /// .chain_id(1)
264 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
265 /// .build()
266 /// .unwrap();
267 /// ```
268 pub fn builder() -> ClientBuilder {
269 ClientBuilder::new()
270 }
271
272 #[doc(hidden)]
273 pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
274 // Creates a new client with the given configuration and custom user agent.
275 // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
276 Self::new_internal(cfg, user_agent.into())
277 }
278
279 /// Internal constructor that takes both config and user agent.
280 fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
281 let http_client = HttpClientWrapper::new(
282 user_agent,
283 cfg.api_token,
284 Duration::from_millis(cfg.http_req_timeout_millis),
285 );
286
287 let url = Url::parse(&cfg.url).context("url is malformed")?;
288
289 Ok(Self {
290 inner: Arc::new(ClientInner {
291 http_client,
292 url,
293 max_num_retries: cfg.max_num_retries,
294 retry_backoff_ms: cfg.retry_backoff_ms,
295 retry_base_ms: cfg.retry_base_ms,
296 retry_ceiling_ms: cfg.retry_ceiling_ms,
297 serialization_format: cfg.serialization_format,
298 rate_limit_state: std::sync::Mutex::new(None),
299 proactive_rate_limit_sleep: cfg.proactive_rate_limit_sleep,
300 }),
301 })
302 }
303
304 /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
305 /// query and stream configuration.
306 ///
307 /// ### Implementation
308 /// Runs multiple queries simultaneously based on config.concurrency.
309 ///
310 /// Each query runs until it reaches query.to, server height, any max_num_* query param,
311 /// or execution timed out by server.
312 ///
313 /// # ⚠️ Important Warning
314 ///
315 /// This method will continue executing until the query has run to completion from beginning
316 /// to the end of the block range defined in the query. For heavy queries with large block
317 /// ranges or high data volumes, consider:
318 ///
319 /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
320 /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
321 /// - Break large queries into smaller block ranges
322 ///
323 /// # Example
324 /// ```no_run
325 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
326 ///
327 /// # async fn example() -> anyhow::Result<()> {
328 /// let client = Client::builder()
329 /// .chain_id(1)
330 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
331 /// .build()?;
332 ///
333 /// // Query ERC20 transfer events
334 /// let query = Query::new()
335 /// .from_block(19000000)
336 /// .to_block_excl(19000010)
337 /// .where_logs(
338 /// LogFilter::all()
339 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
340 /// )
341 /// .select_log_fields([LogField::Address, LogField::Data]);
342 /// let response = client.collect(query, StreamConfig::default()).await?;
343 ///
344 /// println!("Collected {} events", response.data.logs.len());
345 /// # Ok(())
346 /// # }
347 /// ```
348 pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
349 check_simple_stream_params(&config)?;
350
351 let mut recv = stream::stream_arrow(self, query, config)
352 .await
353 .context("start stream")?;
354
355 let mut data = ResponseData::default();
356 let mut archive_height = None;
357 let mut next_block = 0;
358 let mut total_execution_time = 0;
359
360 while let Some(res) = recv.recv().await {
361 let res = res.context("get response")?;
362 let res: QueryResponse =
363 QueryResponse::try_from(&res).context("convert arrow response")?;
364
365 for batch in res.data.blocks {
366 data.blocks.push(batch);
367 }
368 for batch in res.data.transactions {
369 data.transactions.push(batch);
370 }
371 for batch in res.data.logs {
372 data.logs.push(batch);
373 }
374 for batch in res.data.traces {
375 data.traces.push(batch);
376 }
377
378 archive_height = res.archive_height;
379 next_block = res.next_block;
380 total_execution_time += res.total_execution_time
381 }
382
383 Ok(QueryResponse {
384 archive_height,
385 next_block,
386 total_execution_time,
387 data,
388 rollback_guard: None,
389 })
390 }
391
392 /// Retrieves events through a stream using the provided query and stream configuration.
393 ///
394 /// # ⚠️ Important Warning
395 ///
396 /// This method will continue executing until the query has run to completion from beginning
397 /// to the end of the block range defined in the query. For heavy queries with large block
398 /// ranges or high data volumes, consider:
399 ///
400 /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
401 /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
402 /// - Break large queries into smaller block ranges
403 ///
404 /// # Example
405 /// ```no_run
406 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
407 ///
408 /// # async fn example() -> anyhow::Result<()> {
409 /// let client = Client::builder()
410 /// .chain_id(1)
411 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
412 /// .build()?;
413 ///
414 /// // Query transactions to a specific address
415 /// let query = Query::new()
416 /// .from_block(19000000)
417 /// .to_block_excl(19000100)
418 /// .where_transactions(
419 /// TransactionFilter::all()
420 /// .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
421 /// )
422 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
423 /// let response = client.collect_events(query, StreamConfig::default()).await?;
424 ///
425 /// println!("Collected {} events", response.data.len());
426 /// # Ok(())
427 /// # }
428 /// ```
429 pub async fn collect_events(
430 &self,
431 mut query: Query,
432 config: StreamConfig,
433 ) -> Result<EventResponse> {
434 check_simple_stream_params(&config)?;
435
436 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
437 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
438
439 let mut recv = stream::stream_arrow(self, query, config)
440 .await
441 .context("start stream")?;
442
443 let mut data = Vec::new();
444 let mut archive_height = None;
445 let mut next_block = 0;
446 let mut total_execution_time = 0;
447
448 while let Some(res) = recv.recv().await {
449 let res = res.context("get response")?;
450 let res: QueryResponse =
451 QueryResponse::try_from(&res).context("convert arrow response")?;
452 let events = event_join_strategy.join_from_response_data(res.data);
453
454 data.extend(events);
455
456 archive_height = res.archive_height;
457 next_block = res.next_block;
458 total_execution_time += res.total_execution_time
459 }
460
461 Ok(EventResponse {
462 archive_height,
463 next_block,
464 total_execution_time,
465 data,
466 rollback_guard: None,
467 })
468 }
469
470 /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
471 /// the provided query and stream configuration.
472 ///
473 /// Returns data in Apache Arrow format for high-performance columnar processing.
474 /// Useful for analytics workloads or when working with Arrow-compatible tools.
475 ///
476 /// # ⚠️ Important Warning
477 ///
478 /// This method will continue executing until the query has run to completion from beginning
479 /// to the end of the block range defined in the query. For heavy queries with large block
480 /// ranges or high data volumes, consider:
481 ///
482 /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
483 /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
484 /// - Break large queries into smaller block ranges
485 ///
486 /// # Example
487 /// ```no_run
488 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
489 ///
490 /// # async fn example() -> anyhow::Result<()> {
491 /// let client = Client::builder()
492 /// .chain_id(1)
493 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
494 /// .build()?;
495 ///
496 /// // Get block data in Arrow format for analytics
497 /// let query = Query::new()
498 /// .from_block(19000000)
499 /// .to_block_excl(19000100)
500 /// .include_all_blocks()
501 /// .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
502 /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
503 ///
504 /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
505 /// # Ok(())
506 /// # }
507 /// ```
508 pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
509 let mut recv = stream::stream_arrow(self, query, config)
510 .await
511 .context("start stream")?;
512
513 let mut data = ArrowResponseData::default();
514 let mut archive_height = None;
515 let mut next_block = 0;
516 let mut total_execution_time = 0;
517
518 while let Some(res) = recv.recv().await {
519 let res = res.context("get response")?;
520
521 for batch in res.data.blocks {
522 data.blocks.push(batch);
523 }
524 for batch in res.data.transactions {
525 data.transactions.push(batch);
526 }
527 for batch in res.data.logs {
528 data.logs.push(batch);
529 }
530 for batch in res.data.traces {
531 data.traces.push(batch);
532 }
533 for batch in res.data.decoded_logs {
534 data.decoded_logs.push(batch);
535 }
536
537 archive_height = res.archive_height;
538 next_block = res.next_block;
539 total_execution_time += res.total_execution_time
540 }
541
542 Ok(ArrowResponse {
543 archive_height,
544 next_block,
545 total_execution_time,
546 data,
547 rollback_guard: None,
548 })
549 }
550
551 /// Writes parquet file getting data through a stream using the provided path, query,
552 /// and stream configuration.
553 ///
554 /// Streams data directly to a Parquet file for efficient storage and later analysis.
555 /// Perfect for data exports or ETL pipelines.
556 ///
557 /// # ⚠️ Important Warning
558 ///
559 /// This method will continue executing until the query has run to completion from beginning
560 /// to the end of the block range defined in the query. For heavy queries with large block
561 /// ranges or high data volumes, consider:
562 ///
563 /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
564 /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
565 /// - Break large queries into smaller block ranges
566 ///
567 /// # Example
568 /// ```no_run
569 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
570 ///
571 /// # async fn example() -> anyhow::Result<()> {
572 /// let client = Client::builder()
573 /// .chain_id(1)
574 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
575 /// .build()?;
576 ///
577 /// // Export all DEX trades to Parquet for analysis
578 /// let query = Query::new()
579 /// .from_block(19000000)
580 /// .to_block_excl(19010000)
581 /// .where_logs(
582 /// LogFilter::all()
583 /// .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
584 /// )
585 /// .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
586 /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
587 ///
588 /// println!("Trade data exported to trades.parquet");
589 /// # Ok(())
590 /// # }
591 /// ```
592 pub async fn collect_parquet(
593 &self,
594 path: &str,
595 query: Query,
596 config: StreamConfig,
597 ) -> Result<()> {
598 parquet_out::collect_parquet(self, path, query, config).await
599 }
600
601 /// Internal implementation of getting chain_id from server
602 async fn get_chain_id_impl(&self) -> Result<u64> {
603 let mut url = self.inner.url.clone();
604 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
605 segments.push("chain_id");
606 std::mem::drop(segments);
607 let req = self.inner.http_client.request(Method::GET, url);
608
609 let res = req.send().await.context("execute http req")?;
610
611 let status = res.status();
612 if !status.is_success() {
613 return Err(anyhow!("http response status code {}", status));
614 }
615
616 let chain_id: ChainId = res.json().await.context("read response body json")?;
617
618 Ok(chain_id.chain_id)
619 }
620
621 /// Internal implementation of getting height from server
622 async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
623 let mut url = self.inner.url.clone();
624 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
625 segments.push("height");
626 std::mem::drop(segments);
627 let mut req = self.inner.http_client.request(Method::GET, url);
628 if let Some(http_timeout_override) = http_timeout_override {
629 req = req.timeout(http_timeout_override);
630 }
631
632 let res = req.send().await.context("execute http req")?;
633
634 let status = res.status();
635 if !status.is_success() {
636 return Err(anyhow!("http response status code {}", status));
637 }
638
639 let height: ArchiveHeight = res.json().await.context("read response body json")?;
640
641 Ok(height.height.unwrap_or(0))
642 }
643
644 /// Get the chain_id from the server with retries.
645 ///
646 /// # Example
647 /// ```no_run
648 /// use hypersync_client::Client;
649 ///
650 /// # async fn example() -> anyhow::Result<()> {
651 /// let client = Client::builder()
652 /// .chain_id(1)
653 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
654 /// .build()?;
655 ///
656 /// let chain_id = client.get_chain_id().await?;
657 /// println!("Connected to chain ID: {}", chain_id);
658 /// # Ok(())
659 /// # }
660 /// ```
661 pub async fn get_chain_id(&self) -> Result<u64> {
662 let mut base = self.inner.retry_base_ms;
663
664 let mut err = anyhow!("");
665
666 for _ in 0..self.inner.max_num_retries + 1 {
667 match self.get_chain_id_impl().await {
668 Ok(res) => return Ok(res),
669 Err(e) => {
670 log::error!(
671 "failed to get chain_id from server, retrying... The error was: {e:?}"
672 );
673 err = err.context(format!("{e:?}"));
674 }
675 }
676
677 let base_ms = Duration::from_millis(base);
678 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
679 rand::random(),
680 self.inner.retry_backoff_ms,
681 ));
682
683 tokio::time::sleep(base_ms + jitter).await;
684
685 base = std::cmp::min(
686 base + self.inner.retry_backoff_ms,
687 self.inner.retry_ceiling_ms,
688 );
689 }
690
691 Err(err)
692 }
693
694 /// Get the height of from server with retries.
695 ///
696 /// # Example
697 /// ```no_run
698 /// use hypersync_client::Client;
699 ///
700 /// # async fn example() -> anyhow::Result<()> {
701 /// let client = Client::builder()
702 /// .chain_id(1)
703 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
704 /// .build()?;
705 ///
706 /// let height = client.get_height().await?;
707 /// println!("Current block height: {}", height);
708 /// # Ok(())
709 /// # }
710 /// ```
711 pub async fn get_height(&self) -> Result<u64> {
712 let mut base = self.inner.retry_base_ms;
713
714 let mut err = anyhow!("");
715
716 for _ in 0..self.inner.max_num_retries + 1 {
717 match self.get_height_impl(None).await {
718 Ok(res) => return Ok(res),
719 Err(e) => {
720 log::error!(
721 "failed to get height from server, retrying... The error was: {e:?}"
722 );
723 err = err.context(format!("{e:?}"));
724 }
725 }
726
727 let base_ms = Duration::from_millis(base);
728 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
729 rand::random(),
730 self.inner.retry_backoff_ms,
731 ));
732
733 tokio::time::sleep(base_ms + jitter).await;
734
735 base = std::cmp::min(
736 base + self.inner.retry_backoff_ms,
737 self.inner.retry_ceiling_ms,
738 );
739 }
740
741 Err(err)
742 }
743
744 /// Get the height of the Client instance for health checks.
745 ///
746 /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
747 ///
748 /// # Example
749 /// ```no_run
750 /// use hypersync_client::Client;
751 /// use std::time::Duration;
752 ///
753 /// # async fn example() -> anyhow::Result<()> {
754 /// let client = Client::builder()
755 /// .chain_id(1)
756 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
757 /// .build()?;
758 ///
759 /// // Quick health check with 5 second timeout
760 /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
761 /// println!("Server is healthy at block: {}", height);
762 /// # Ok(())
763 /// # }
764 /// ```
765 pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
766 self.get_height_impl(http_req_timeout).await
767 }
768
769 /// Executes query with retries and returns the response.
770 ///
771 /// # Example
772 /// ```no_run
773 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
774 ///
775 /// # async fn example() -> anyhow::Result<()> {
776 /// let client = Client::builder()
777 /// .chain_id(1)
778 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
779 /// .build()?;
780 ///
781 /// // Query all blocks from a specific range
782 /// let query = Query::new()
783 /// .from_block(19000000)
784 /// .to_block_excl(19000010)
785 /// .include_all_blocks()
786 /// .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
787 /// let response = client.get(&query).await?;
788 ///
789 /// println!("Retrieved {} blocks", response.data.blocks.len());
790 /// # Ok(())
791 /// # }
792 /// ```
793 pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
794 let arrow_response = self.get_arrow(query).await.context("get data")?;
795 let converted =
796 QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
797 Ok(converted)
798 }
799
800 /// Add block, transaction and log fields selection to the query, executes it with retries
801 /// and returns the response.
802 ///
803 /// This method automatically joins blocks, transactions, and logs into unified events,
804 /// making it easier to work with related blockchain data.
805 ///
806 /// # Example
807 /// ```no_run
808 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
809 ///
810 /// # async fn example() -> anyhow::Result<()> {
811 /// let client = Client::builder()
812 /// .chain_id(1)
813 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
814 /// .build()?;
815 ///
816 /// // Query ERC20 transfers with transaction context
817 /// let query = Query::new()
818 /// .from_block(19000000)
819 /// .to_block_excl(19000010)
820 /// .where_logs(
821 /// LogFilter::all()
822 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
823 /// )
824 /// .select_log_fields([LogField::Address, LogField::Data])
825 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
826 /// let response = client.get_events(query).await?;
827 ///
828 /// println!("Retrieved {} joined events", response.data.len());
829 /// # Ok(())
830 /// # }
831 /// ```
832 pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
833 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
834 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
835 let arrow_response = self.get_arrow(&query).await.context("get data")?;
836 EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
837 }
838
839 /// Executes query once and returns the result using JSON serialization.
840 async fn get_arrow_impl_json(
841 &self,
842 query: &Query,
843 ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
844 let mut url = self.inner.url.clone();
845 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
846 segments.push("query");
847 segments.push("arrow-ipc");
848 std::mem::drop(segments);
849 let req = self.inner.http_client.request(Method::POST, url);
850
851 let res = req.json(&query).send().await.context("execute http req")?;
852
853 let status = res.status();
854 let rate_limit = RateLimitInfo::from_response(&res);
855
856 if status == StatusCode::PAYLOAD_TOO_LARGE {
857 return Err(HyperSyncResponseError::PayloadTooLarge);
858 }
859 if status == StatusCode::TOO_MANY_REQUESTS {
860 return Err(HyperSyncResponseError::RateLimited { rate_limit });
861 }
862 if !status.is_success() {
863 let text = res.text().await.context("read text to see error")?;
864
865 return Err(HyperSyncResponseError::Other(anyhow!(
866 "http response status code {}, err body: {}",
867 status,
868 text
869 )));
870 }
871
872 let bytes = res.bytes().await.context("read response body bytes")?;
873
874 let res = tokio::task::block_in_place(|| {
875 parse_query_response(&bytes).context("parse query response")
876 })?;
877
878 Ok(ArrowImplResponse {
879 response: res,
880 response_bytes: bytes.len().try_into().unwrap(),
881 rate_limit,
882 })
883 }
884
885 fn should_cache_queries(&self) -> bool {
886 matches!(
887 self.inner.serialization_format,
888 SerializationFormat::CapnProto {
889 should_cache_queries: true
890 }
891 )
892 }
893
894 /// Executes query once and returns the result using Cap'n Proto serialization.
895 async fn get_arrow_impl_capnp(
896 &self,
897 query: &Query,
898 ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
899 let mut url = self.inner.url.clone();
900 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
901 segments.push("query");
902 segments.push("arrow-ipc");
903 segments.push("capnp");
904 std::mem::drop(segments);
905
906 let should_cache = self.should_cache_queries();
907
908 if should_cache {
909 let query_with_id = {
910 let mut message = capnp::message::Builder::new_default();
911 let mut request_builder =
912 message.init_root::<hypersync_net_types_capnp::request::Builder>();
913
914 request_builder
915 .build_query_id_from_query(query)
916 .context("build query id")?;
917 let mut query_with_id = Vec::new();
918 capnp::serialize_packed::write_message(&mut query_with_id, &message)
919 .context("write capnp message")?;
920 query_with_id
921 };
922
923 let req = self.inner.http_client.request(Method::POST, url.clone());
924
925 let res = req
926 .body(query_with_id)
927 .send()
928 .await
929 .context("execute http req")?;
930
931 let status = res.status();
932 let rate_limit = RateLimitInfo::from_response(&res);
933
934 if status == StatusCode::PAYLOAD_TOO_LARGE {
935 return Err(HyperSyncResponseError::PayloadTooLarge);
936 }
937 if status == StatusCode::TOO_MANY_REQUESTS {
938 return Err(HyperSyncResponseError::RateLimited { rate_limit });
939 }
940 if status.is_success() {
941 let bytes = res.bytes().await.context("read response body bytes")?;
942
943 let mut opts = capnp::message::ReaderOptions::new();
944 opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
945 let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
946 .context("create message reader")?;
947 let query_response = message_reader
948 .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
949 .context("get cached_query_response root")?;
950 match query_response.get_either().which().context("get either")? {
951 hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
952 query_response,
953 ) => {
954 let res = tokio::task::block_in_place(|| {
955 let res = query_response?;
956 read_query_response(&res).context("parse query response cached")
957 })?;
958 return Ok(ArrowImplResponse {
959 response: res,
960 response_bytes: bytes.len().try_into().unwrap(),
961 rate_limit,
962 });
963 }
964 hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
965 log::trace!("query was not cached, retrying with full query");
966 }
967 }
968 } else {
969 let text = res.text().await.context("read text to see error")?;
970 log::warn!(
971 "Failed cache query, will retry full query. {}, err body: {}",
972 status,
973 text
974 );
975 }
976 };
977
978 let full_query_bytes = {
979 let mut message = capnp::message::Builder::new_default();
980 let mut query_builder =
981 message.init_root::<hypersync_net_types_capnp::request::Builder>();
982
983 query_builder
984 .build_full_query_from_query(query, should_cache)
985 .context("build full query")?;
986 let mut bytes = Vec::new();
987 capnp::serialize_packed::write_message(&mut bytes, &message)
988 .context("write full query capnp message")?;
989 bytes
990 };
991
992 let req = self.inner.http_client.request(Method::POST, url);
993
994 let res = req
995 .body(full_query_bytes)
996 .send()
997 .await
998 .context("execute http req")?;
999
1000 let status = res.status();
1001 let rate_limit = RateLimitInfo::from_response(&res);
1002
1003 if status == StatusCode::PAYLOAD_TOO_LARGE {
1004 return Err(HyperSyncResponseError::PayloadTooLarge);
1005 }
1006 if status == StatusCode::TOO_MANY_REQUESTS {
1007 return Err(HyperSyncResponseError::RateLimited { rate_limit });
1008 }
1009 if !status.is_success() {
1010 let text = res.text().await.context("read text to see error")?;
1011
1012 return Err(HyperSyncResponseError::Other(anyhow!(
1013 "http response status code {}, err body: {}",
1014 status,
1015 text
1016 )));
1017 }
1018
1019 let bytes = res.bytes().await.context("read response body bytes")?;
1020
1021 let res = tokio::task::block_in_place(|| {
1022 parse_query_response(&bytes).context("parse query response")
1023 })?;
1024
1025 Ok(ArrowImplResponse {
1026 response: res,
1027 response_bytes: bytes.len().try_into().unwrap(),
1028 rate_limit,
1029 })
1030 }
1031
1032 /// Executes query once and returns the result, handling payload-too-large by halving block range.
1033 async fn get_arrow_impl(
1034 &self,
1035 query: &Query,
1036 ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
1037 let mut query = query.clone();
1038 loop {
1039 let res = match self.inner.serialization_format {
1040 SerializationFormat::Json => self.get_arrow_impl_json(&query).await,
1041 SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(&query).await,
1042 };
1043 match res {
1044 Ok(res) => return Ok(res),
1045 Err(
1046 e @ (HyperSyncResponseError::Other(_)
1047 | HyperSyncResponseError::RateLimited { .. }),
1048 ) => return Err(e),
1049 Err(HyperSyncResponseError::PayloadTooLarge) => {
1050 let block_range = if let Some(to_block) = query.to_block {
1051 let current = to_block - query.from_block;
1052 if current < 2 {
1053 return Err(HyperSyncResponseError::Other(anyhow!(
1054 "Payload is too large and query is using the minimum block range."
1055 )));
1056 }
1057 // Half the current block range
1058 current / 2
1059 } else {
1060 200
1061 };
1062 let to_block = query.from_block + block_range;
1063 query.to_block = Some(to_block);
1064
1065 log::trace!(
1066 "Payload is too large, retrying with block range from: {} to: {}",
1067 query.from_block,
1068 to_block
1069 );
1070 }
1071 }
1072 }
1073 }
1074
1075 /// Executes query with retries and returns the response in Arrow format.
1076 pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
1077 self.get_arrow_with_size(query)
1078 .await
1079 .map(|res| res.response)
1080 }
1081
1082 /// Internal implementation for get_arrow.
1083 async fn get_arrow_with_size(&self, query: &Query) -> Result<ArrowImplResponse> {
1084 let mut base = self.inner.retry_base_ms;
1085
1086 let mut err = anyhow!("");
1087
1088 // Proactive throttling: if we know we're rate limited, wait before sending
1089 if self.inner.proactive_rate_limit_sleep {
1090 self.wait_for_rate_limit().await;
1091 }
1092
1093 for _ in 0..self.inner.max_num_retries + 1 {
1094 match self.get_arrow_impl(query).await {
1095 Ok(res) => {
1096 self.update_rate_limit_state(&res.rate_limit);
1097 return Ok(res);
1098 }
1099 Err(HyperSyncResponseError::RateLimited { rate_limit }) => {
1100 self.update_rate_limit_state(&rate_limit);
1101 let wait_secs = rate_limit.suggested_wait_secs().unwrap_or(1) + 1;
1102 log::warn!(
1103 "rate limited by server ({rate_limit}), waiting {wait_secs}s before retry"
1104 );
1105 err = err.context(format!("rate limited by server ({rate_limit})"));
1106 tokio::time::sleep(Duration::from_secs(wait_secs)).await;
1107 continue;
1108 }
1109 Err(HyperSyncResponseError::Other(e)) => {
1110 log::error!(
1111 "failed to get arrow data from server, retrying... The error was: {e:?}"
1112 );
1113 err = err.context(format!("{e:?}"));
1114 }
1115 Err(HyperSyncResponseError::PayloadTooLarge) => {
1116 // This shouldn't happen since get_arrow_impl handles it, but just in case
1117 log::error!("unexpected PayloadTooLarge from get_arrow_impl, retrying...");
1118 err = err.context("unexpected PayloadTooLarge");
1119 }
1120 }
1121
1122 let base_ms = Duration::from_millis(base);
1123 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
1124 rand::random(),
1125 self.inner.retry_backoff_ms,
1126 ));
1127
1128 tokio::time::sleep(base_ms + jitter).await;
1129
1130 base = std::cmp::min(
1131 base + self.inner.retry_backoff_ms,
1132 self.inner.retry_ceiling_ms,
1133 );
1134 }
1135
1136 Err(err)
1137 }
1138
1139 /// Spawns task to execute query and return data via a channel.
1140 ///
1141 /// # Example
1142 /// ```no_run
1143 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
1144 ///
1145 /// # async fn example() -> anyhow::Result<()> {
1146 /// let client = Client::builder()
1147 /// .chain_id(1)
1148 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1149 /// .build()?;
1150 ///
1151 /// // Stream all ERC20 transfer events
1152 /// let query = Query::new()
1153 /// .from_block(19000000)
1154 /// .where_logs(
1155 /// LogFilter::all()
1156 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1157 /// )
1158 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
1159 /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
1160 ///
1161 /// while let Some(response) = receiver.recv().await {
1162 /// let response = response?;
1163 /// println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
1164 /// }
1165 /// # Ok(())
1166 /// # }
1167 /// ```
1168 pub async fn stream(
1169 &self,
1170 query: Query,
1171 config: StreamConfig,
1172 ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1173 check_simple_stream_params(&config)?;
1174
1175 let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1176 mpsc::channel(config.concurrency);
1177
1178 let mut inner_rx = self
1179 .stream_arrow(query, config)
1180 .await
1181 .context("start inner stream")?;
1182
1183 tokio::spawn(async move {
1184 while let Some(resp) = inner_rx.recv().await {
1185 let msg = resp
1186 .context("inner receiver")
1187 .and_then(|r| QueryResponse::try_from(&r));
1188 let is_err = msg.is_err();
1189 if tx.send(msg).await.is_err() || is_err {
1190 return;
1191 }
1192 }
1193 });
1194
1195 Ok(rx)
1196 }
1197
1198 /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1199 /// returning data via a channel.
1200 ///
1201 /// This method automatically joins blocks, transactions, and logs into unified events,
1202 /// then streams them via a channel for real-time processing.
1203 ///
1204 /// # Example
1205 /// ```no_run
1206 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1207 ///
1208 /// # async fn example() -> anyhow::Result<()> {
1209 /// let client = Client::builder()
1210 /// .chain_id(1)
1211 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1212 /// .build()?;
1213 ///
1214 /// // Stream NFT transfer events with transaction context
1215 /// let query = Query::new()
1216 /// .from_block(19000000)
1217 /// .where_logs(
1218 /// LogFilter::all()
1219 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1220 /// )
1221 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1222 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1223 /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1224 ///
1225 /// while let Some(response) = receiver.recv().await {
1226 /// let response = response?;
1227 /// println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1228 /// }
1229 /// # Ok(())
1230 /// # }
1231 /// ```
1232 pub async fn stream_events(
1233 &self,
1234 mut query: Query,
1235 config: StreamConfig,
1236 ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1237 check_simple_stream_params(&config)?;
1238
1239 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1240
1241 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1242
1243 let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1244 mpsc::channel(config.concurrency);
1245
1246 let mut inner_rx = self
1247 .stream_arrow(query, config)
1248 .await
1249 .context("start inner stream")?;
1250
1251 tokio::spawn(async move {
1252 while let Some(resp) = inner_rx.recv().await {
1253 let msg = resp
1254 .context("inner receiver")
1255 .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1256 let is_err = msg.is_err();
1257 if tx.send(msg).await.is_err() || is_err {
1258 return;
1259 }
1260 }
1261 });
1262
1263 Ok(rx)
1264 }
1265
1266 /// Spawns task to execute query and return data via a channel in Arrow format.
1267 ///
1268 /// Returns raw Apache Arrow data via a channel for high-performance processing.
1269 /// Ideal for applications that need to work directly with columnar data.
1270 ///
1271 /// # Example
1272 /// ```no_run
1273 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1274 ///
1275 /// # async fn example() -> anyhow::Result<()> {
1276 /// let client = Client::builder()
1277 /// .chain_id(1)
1278 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1279 /// .build()?;
1280 ///
1281 /// // Stream transaction data in Arrow format for analytics
1282 /// let query = Query::new()
1283 /// .from_block(19000000)
1284 /// .to_block_excl(19000100)
1285 /// .where_transactions(
1286 /// TransactionFilter::all()
1287 /// .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1288 /// )
1289 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1290 /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1291 ///
1292 /// while let Some(response) = receiver.recv().await {
1293 /// let response = response?;
1294 /// println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1295 /// }
1296 /// # Ok(())
1297 /// # }
1298 /// ```
1299 pub async fn stream_arrow(
1300 &self,
1301 query: Query,
1302 config: StreamConfig,
1303 ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1304 stream::stream_arrow(self, query, config).await
1305 }
1306
1307 /// Executes query with retries and returns the response in Arrow format along with
1308 /// rate limit information from the server.
1309 ///
1310 /// This is useful for consumers that want to inspect rate limit headers and implement
1311 /// their own rate limiting logic in external systems.
1312 pub async fn get_arrow_with_rate_limit(
1313 &self,
1314 query: &Query,
1315 ) -> Result<QueryResponseWithRateLimit<ArrowResponseData>> {
1316 let result = self.get_arrow_with_size(query).await?;
1317 Ok(QueryResponseWithRateLimit {
1318 response: result.response,
1319 rate_limit: result.rate_limit,
1320 })
1321 }
1322
1323 /// Executes query with retries and returns the response along with
1324 /// rate limit information from the server.
1325 ///
1326 /// This is useful for consumers that want to inspect rate limit headers and implement
1327 /// their own rate limiting logic in external systems.
1328 pub async fn get_with_rate_limit(
1329 &self,
1330 query: &Query,
1331 ) -> Result<QueryResponseWithRateLimit<ResponseData>> {
1332 let result = self.get_arrow_with_rate_limit(query).await?;
1333 let converted =
1334 QueryResponse::try_from(&result.response).context("convert arrow response")?;
1335 Ok(QueryResponseWithRateLimit {
1336 response: converted,
1337 rate_limit: result.rate_limit,
1338 })
1339 }
1340
1341 /// Returns the most recently observed rate limit information, if any.
1342 ///
1343 /// Updated after every request (including inside streams). Returns `None`
1344 /// if no requests have been made yet or the server hasn't returned rate limit headers.
1345 pub fn rate_limit_info(&self) -> Option<RateLimitInfo> {
1346 self.inner
1347 .rate_limit_state
1348 .lock()
1349 .expect("rate_limit_state mutex poisoned")
1350 .as_ref()
1351 .map(|(info, _captured_at)| info.clone())
1352 }
1353
1354 /// Waits until the current rate limit window resets, if the client is rate limited.
1355 ///
1356 /// Returns immediately if:
1357 /// - No rate limit information has been observed yet
1358 /// - There is remaining quota in the current window
1359 ///
1360 /// This method is useful for consumers who want to explicitly wait before making
1361 /// requests, for example when coordinating rate limits across multiple systems.
1362 pub async fn wait_for_rate_limit(&self) {
1363 let wait_info = {
1364 let state = self
1365 .inner
1366 .rate_limit_state
1367 .lock()
1368 .expect("rate_limit_state mutex poisoned");
1369 match state.as_ref() {
1370 Some((info, captured_at)) if info.is_rate_limited() => {
1371 info.suggested_wait_secs().map(|secs| {
1372 let elapsed = captured_at.elapsed().as_secs();
1373 let remaining_wait = secs.saturating_sub(elapsed);
1374 (remaining_wait, info.clone())
1375 })
1376 }
1377 _ => None,
1378 }
1379 };
1380 if let Some((secs, info)) = wait_info {
1381 if secs > 0 {
1382 log::warn!(
1383 "rate limit exhausted ({info}), proactively waiting {secs}s for window reset"
1384 );
1385 tokio::time::sleep(Duration::from_secs(secs)).await;
1386 }
1387 }
1388 }
1389
1390 /// Updates the internally tracked rate limit state with the current timestamp.
1391 fn update_rate_limit_state(&self, rate_limit: &RateLimitInfo) {
1392 // Only update if the response actually contained rate limit headers
1393 if rate_limit.limit.is_some()
1394 || rate_limit.remaining.is_some()
1395 || rate_limit.reset_secs.is_some()
1396 {
1397 let mut state = self
1398 .inner
1399 .rate_limit_state
1400 .lock()
1401 .expect("rate_limit_state mutex poisoned");
1402 *state = Some((rate_limit.clone(), Instant::now()));
1403 }
1404 }
1405
1406 /// Getter for url field.
1407 ///
1408 /// # Example
1409 /// ```
1410 /// use hypersync_client::Client;
1411 ///
1412 /// let client = Client::builder()
1413 /// .chain_id(1)
1414 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1415 /// .build()
1416 /// .unwrap();
1417 ///
1418 /// println!("Client URL: {}", client.url());
1419 /// ```
1420 pub fn url(&self) -> &Url {
1421 &self.inner.url
1422 }
1423}
1424
1425/// Builder for creating a hypersync client with configuration options.
1426///
1427/// This builder provides a fluent API for configuring client settings like URL,
1428/// authentication, timeouts, and retry behavior.
1429///
1430/// # Example
1431/// ```
1432/// use hypersync_client::{Client, SerializationFormat};
1433///
1434/// let client = Client::builder()
1435/// .chain_id(1)
1436/// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1437/// .http_req_timeout_millis(30000)
1438/// .max_num_retries(3)
1439/// .build()
1440/// .unwrap();
1441/// ```
1442#[derive(Debug, Clone, Default)]
1443pub struct ClientBuilder(ClientConfig);
1444
1445impl ClientBuilder {
1446 /// Creates a new ClientBuilder with default configuration.
1447 pub fn new() -> Self {
1448 Self::default()
1449 }
1450
1451 /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1452 ///
1453 /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1454 ///
1455 /// # Arguments
1456 /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1457 ///
1458 /// # Example
1459 /// ```
1460 /// use hypersync_client::Client;
1461 ///
1462 /// let client = Client::builder()
1463 /// .chain_id(1) // Ethereum mainnet
1464 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1465 /// .build()
1466 /// .unwrap();
1467 /// ```
1468 pub fn chain_id(mut self, chain_id: u64) -> Self {
1469 self.0.url = format!("https://{chain_id}.hypersync.xyz");
1470 self
1471 }
1472
1473 /// Sets a custom URL for the hypersync server.
1474 ///
1475 /// Use this method when you need to connect to a custom hypersync endpoint
1476 /// instead of the default public endpoints.
1477 ///
1478 /// # Arguments
1479 /// * `url` - The hypersync server URL
1480 ///
1481 /// # Example
1482 /// ```
1483 /// use hypersync_client::Client;
1484 ///
1485 /// let client = Client::builder()
1486 /// .url("https://my-custom-hypersync.example.com")
1487 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1488 /// .build()
1489 /// .unwrap();
1490 /// ```
1491 pub fn url<S: ToString>(mut self, url: S) -> Self {
1492 self.0.url = url.to_string();
1493 self
1494 }
1495
1496 /// Sets the api token for authentication.
1497 ///
1498 /// Required for accessing authenticated hypersync endpoints.
1499 ///
1500 /// # Arguments
1501 /// * `api_token` - The authentication token
1502 ///
1503 /// # Example
1504 /// ```
1505 /// use hypersync_client::Client;
1506 ///
1507 /// let client = Client::builder()
1508 /// .chain_id(1)
1509 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1510 /// .build()
1511 /// .unwrap();
1512 /// ```
1513 pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1514 self.0.api_token = api_token.to_string();
1515 self
1516 }
1517
1518 /// Sets the HTTP request timeout in milliseconds.
1519 ///
1520 /// # Arguments
1521 /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1522 ///
1523 /// # Example
1524 /// ```
1525 /// use hypersync_client::Client;
1526 ///
1527 /// let client = Client::builder()
1528 /// .chain_id(1)
1529 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1530 /// .http_req_timeout_millis(60000) // 60 second timeout
1531 /// .build()
1532 /// .unwrap();
1533 /// ```
1534 pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1535 self.0.http_req_timeout_millis = http_req_timeout_millis;
1536 self
1537 }
1538
1539 /// Sets the maximum number of retries for failed requests.
1540 ///
1541 /// # Arguments
1542 /// * `max_num_retries` - Maximum number of retries (default: 10)
1543 ///
1544 /// # Example
1545 /// ```
1546 /// use hypersync_client::Client;
1547 ///
1548 /// let client = Client::builder()
1549 /// .chain_id(1)
1550 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1551 /// .max_num_retries(5)
1552 /// .build()
1553 /// .unwrap();
1554 /// ```
1555 pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1556 self.0.max_num_retries = max_num_retries;
1557 self
1558 }
1559
1560 /// Sets the backoff increment for retry delays.
1561 ///
1562 /// This value is added to the base delay on each retry attempt.
1563 ///
1564 /// # Arguments
1565 /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1566 ///
1567 /// # Example
1568 /// ```
1569 /// use hypersync_client::Client;
1570 ///
1571 /// let client = Client::builder()
1572 /// .chain_id(1)
1573 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1574 /// .retry_backoff_ms(1000) // 1 second backoff increment
1575 /// .build()
1576 /// .unwrap();
1577 /// ```
1578 pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1579 self.0.retry_backoff_ms = retry_backoff_ms;
1580 self
1581 }
1582
1583 /// Sets the initial delay for retry attempts.
1584 ///
1585 /// # Arguments
1586 /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1587 ///
1588 /// # Example
1589 /// ```
1590 /// use hypersync_client::Client;
1591 ///
1592 /// let client = Client::builder()
1593 /// .chain_id(1)
1594 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1595 /// .retry_base_ms(1000) // Start with 1 second delay
1596 /// .build()
1597 /// .unwrap();
1598 /// ```
1599 pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1600 self.0.retry_base_ms = retry_base_ms;
1601 self
1602 }
1603
1604 /// Sets the maximum delay for retry attempts.
1605 ///
1606 /// The retry delay will not exceed this value, even with backoff increments.
1607 ///
1608 /// # Arguments
1609 /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1610 ///
1611 /// # Example
1612 /// ```
1613 /// use hypersync_client::Client;
1614 ///
1615 /// let client = Client::builder()
1616 /// .chain_id(1)
1617 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1618 /// .retry_ceiling_ms(30000) // Cap at 30 seconds
1619 /// .build()
1620 /// .unwrap();
1621 /// ```
1622 pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1623 self.0.retry_ceiling_ms = retry_ceiling_ms;
1624 self
1625 }
1626
1627 /// Sets whether to proactively sleep when rate limited.
1628 ///
1629 /// When enabled (default), the client will wait for the rate limit window to reset
1630 /// before sending requests it knows will be rejected. Set to `false` to disable
1631 /// this behavior and handle rate limits yourself.
1632 ///
1633 /// # Arguments
1634 /// * `proactive_rate_limit_sleep` - Whether to enable proactive rate limit sleeping (default: true)
1635 pub fn proactive_rate_limit_sleep(mut self, proactive_rate_limit_sleep: bool) -> Self {
1636 self.0.proactive_rate_limit_sleep = proactive_rate_limit_sleep;
1637 self
1638 }
1639
1640 /// Sets the serialization format for client-server communication.
1641 ///
1642 /// # Arguments
1643 /// * `serialization_format` - The format to use (JSON or CapnProto)
1644 ///
1645 /// # Example
1646 /// ```
1647 /// use hypersync_client::{Client, SerializationFormat};
1648 ///
1649 /// let client = Client::builder()
1650 /// .chain_id(1)
1651 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1652 /// .serialization_format(SerializationFormat::Json)
1653 /// .build()
1654 /// .unwrap();
1655 /// ```
1656 pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1657 self.0.serialization_format = serialization_format;
1658 self
1659 }
1660
1661 /// Builds the client with the configured settings.
1662 ///
1663 /// # Returns
1664 /// * `Result<Client>` - The configured client or an error if configuration is invalid
1665 ///
1666 /// # Errors
1667 /// Returns an error if:
1668 /// * The URL is malformed
1669 /// * Required configuration is missing
1670 ///
1671 /// # Example
1672 /// ```
1673 /// use hypersync_client::Client;
1674 ///
1675 /// let client = Client::builder()
1676 /// .chain_id(1)
1677 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1678 /// .build()
1679 /// .unwrap();
1680 /// ```
1681 pub fn build(self) -> Result<Client> {
1682 if self.0.url.is_empty() {
1683 anyhow::bail!(
1684 "endpoint needs to be set, try using builder.chain_id(1) or\
1685 builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1686 )
1687 }
1688 Client::new(self.0)
1689 }
1690}
1691
1692/// 200ms
1693const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1694const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1695/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1696/// so we timeout after 15s (3x the ping interval).
1697const READ_TIMEOUT: Duration = Duration::from_secs(15);
1698
1699/// Events emitted by the height stream.
1700#[derive(Debug, Clone, PartialEq, Eq)]
1701pub enum HeightStreamEvent {
1702 /// Successfully connected or reconnected to the SSE stream.
1703 Connected,
1704 /// Received a height update from the server.
1705 Height(u64),
1706 /// Connection lost, will attempt to reconnect after the specified delay.
1707 Reconnecting {
1708 /// Duration to wait before attempting reconnection.
1709 delay: Duration,
1710 /// The error that caused the reconnection.
1711 error_msg: String,
1712 },
1713}
1714
1715enum InternalStreamEvent {
1716 Publish(HeightStreamEvent),
1717 Ping,
1718 Unknown(String),
1719}
1720
1721impl Client {
1722 fn get_es_stream(&self) -> Result<EventSource> {
1723 // Build the GET /height/sse request
1724 let mut url = self.inner.url.clone();
1725 url.path_segments_mut()
1726 .ok()
1727 .context("invalid base URL")?
1728 .extend(&["height", "sse"]);
1729
1730 let req = self
1731 .inner
1732 .http_client
1733 // Don't set timeout for SSE stream
1734 .request_no_timeout(Method::GET, url);
1735
1736 // Configure exponential backoff for library-level retries
1737 let retry_policy = ExponentialBackoff::new(
1738 INITIAL_RECONNECT_DELAY,
1739 2.0,
1740 Some(MAX_RECONNECT_DELAY),
1741 None, // unlimited retries
1742 );
1743
1744 // Turn the request into an EventSource stream with retries
1745 let mut es = reqwest_eventsource::EventSource::new(req)
1746 .context("unexpected error creating EventSource")?;
1747 es.set_retry_policy(Box::new(retry_policy));
1748 Ok(es)
1749 }
1750
1751 async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1752 let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1753 .await
1754 .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1755 else {
1756 return Ok(None);
1757 };
1758
1759 let e = match res.context("failed response")? {
1760 Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1761 Event::Message(event) => match event.event.as_str() {
1762 "height" => {
1763 let height = event
1764 .data
1765 .trim()
1766 .parse::<u64>()
1767 .context("parsing height from event data")?;
1768 InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1769 }
1770 "ping" => InternalStreamEvent::Ping,
1771 _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1772 },
1773 };
1774
1775 Ok(Some(e))
1776 }
1777
1778 async fn stream_height_events(
1779 es: &mut EventSource,
1780 tx: &mpsc::Sender<HeightStreamEvent>,
1781 ) -> Result<bool> {
1782 let mut received_an_event = false;
1783 while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1784 match event {
1785 InternalStreamEvent::Publish(event) => {
1786 received_an_event = true;
1787 if tx.send(event).await.is_err() {
1788 return Ok(received_an_event); // Receiver dropped, exit task
1789 }
1790 }
1791 InternalStreamEvent::Ping => (), // ignore pings
1792 InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1793 }
1794 }
1795 Ok(received_an_event)
1796 }
1797
1798 fn get_delay(consecutive_failures: u32) -> Duration {
1799 if consecutive_failures > 0 {
1800 /// helper function to calculate 2^x
1801 /// optimization using bit shifting
1802 const fn two_to_pow(x: u32) -> u32 {
1803 1 << x
1804 }
1805 // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1806 INITIAL_RECONNECT_DELAY
1807 .saturating_mul(two_to_pow(consecutive_failures - 1))
1808 .min(MAX_RECONNECT_DELAY)
1809 } else {
1810 // On zero consecutive failures, 0 delay
1811 Duration::from_millis(0)
1812 }
1813 }
1814
1815 async fn stream_height_events_with_retry(
1816 &self,
1817 tx: &mpsc::Sender<HeightStreamEvent>,
1818 ) -> Result<()> {
1819 let mut consecutive_failures = 0u32;
1820
1821 loop {
1822 // should always be able to creat a new es stream
1823 // something is wrong with the req builder otherwise
1824 let mut es = self.get_es_stream().context("get es stream")?;
1825
1826 let mut error = anyhow!("");
1827
1828 match Self::stream_height_events(&mut es, tx).await {
1829 Ok(received_an_event) => {
1830 if received_an_event {
1831 consecutive_failures = 0; // Reset after successful connection that then failed
1832 }
1833 log::trace!("Stream height exited");
1834 }
1835 Err(e) => {
1836 log::trace!("Stream height failed: {e:?}");
1837 error = e;
1838 }
1839 }
1840
1841 es.close();
1842
1843 // If the receiver is closed, exit the task
1844 if tx.is_closed() {
1845 break;
1846 }
1847
1848 let delay = Self::get_delay(consecutive_failures);
1849 log::trace!("Reconnecting in {:?}...", delay);
1850
1851 let error_msg = format!("{error:?}");
1852
1853 if tx
1854 .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1855 .await
1856 .is_err()
1857 {
1858 return Ok(()); // Receiver dropped, exit task
1859 }
1860 tokio::time::sleep(delay).await;
1861
1862 // increment consecutive failures so that on the next try
1863 // it will start using back offs
1864 consecutive_failures += 1;
1865 }
1866
1867 Ok(())
1868 }
1869
1870 /// Streams archive height updates from the server via Server-Sent Events.
1871 ///
1872 /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1873 /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1874 ///
1875 /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1876 /// and height updates. This allows applications to display connection status to users.
1877 ///
1878 /// # Returns
1879 /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1880 /// lifecycle and sends events through this channel.
1881 ///
1882 /// # Example
1883 /// ```
1884 /// # use hypersync_client::{Client, HeightStreamEvent};
1885 /// # async fn example() -> anyhow::Result<()> {
1886 /// let client = Client::builder()
1887 /// .url("https://eth.hypersync.xyz")
1888 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1889 /// .build()?;
1890 ///
1891 /// let mut rx = client.stream_height();
1892 ///
1893 /// while let Some(event) = rx.recv().await {
1894 /// match event {
1895 /// HeightStreamEvent::Connected => println!("Connected to stream"),
1896 /// HeightStreamEvent::Height(h) => println!("Height: {}", h),
1897 /// HeightStreamEvent::Reconnecting { delay, error_msg } => {
1898 /// println!("Reconnecting in {delay:?} due to error: {error_msg}")
1899 /// }
1900 /// }
1901 /// }
1902 /// # Ok(())
1903 /// # }
1904 /// ```
1905 pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1906 let (tx, rx) = mpsc::channel(16);
1907 let client = self.clone();
1908
1909 tokio::spawn(async move {
1910 if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1911 log::error!("Stream height failed unexpectedly: {e:?}");
1912 }
1913 });
1914
1915 rx
1916 }
1917}
1918
1919fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1920 if config.event_signature.is_some() {
1921 return Err(anyhow!(
1922 "config.event_signature can't be passed to simple type function. User is expected to \
1923 decode the logs using Decoder."
1924 ));
1925 }
1926 if config.column_mapping.is_some() {
1927 return Err(anyhow!(
1928 "config.column_mapping can't be passed to single type function. User is expected to \
1929 map values manually."
1930 ));
1931 }
1932
1933 Ok(())
1934}
1935
1936/// Used to indicate whether or not a retry should be attempted.
1937#[derive(Debug, thiserror::Error)]
1938pub enum HyperSyncResponseError {
1939 /// Means that the client should retry with a smaller block range.
1940 #[error("hypersync responded with 'payload too large' error")]
1941 PayloadTooLarge,
1942 /// Server responded with 429 Too Many Requests.
1943 #[error("rate limited by server")]
1944 RateLimited {
1945 /// Rate limit information from the 429 response headers.
1946 rate_limit: RateLimitInfo,
1947 },
1948 /// Any other server error.
1949 #[error(transparent)]
1950 Other(#[from] anyhow::Error),
1951}
1952
1953/// Result of a single Arrow query execution, before retry logic.
1954struct ArrowImplResponse {
1955 /// The parsed Arrow response data.
1956 response: ArrowResponse,
1957 /// Size of the response body in bytes.
1958 response_bytes: u64,
1959 /// Rate limit information parsed from response headers.
1960 rate_limit: RateLimitInfo,
1961}
1962
1963#[cfg(test)]
1964mod tests {
1965 use super::*;
1966 #[test]
1967 fn test_get_delay() {
1968 assert_eq!(
1969 Client::get_delay(0),
1970 Duration::from_millis(0),
1971 "starts with 0 delay"
1972 );
1973 // powers of 2 backoff
1974 assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1975 assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1976 assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1977 // maxes out at 30s
1978 assert_eq!(
1979 Client::get_delay(9),
1980 Duration::from_secs(30),
1981 "max delay is 30s"
1982 );
1983 assert_eq!(
1984 Client::get_delay(10),
1985 Duration::from_secs(30),
1986 "max delay is 30s"
1987 );
1988 }
1989
1990 #[tokio::test]
1991 #[ignore = "integration test requiring live hs server"]
1992 async fn test_http2_is_used() -> anyhow::Result<()> {
1993 let api_token = std::env::var("ENVIO_API_TOKEN")?;
1994 let client = reqwest::Client::builder()
1995 .no_gzip()
1996 .user_agent("hscr-test")
1997 .build()?;
1998
1999 let res = client
2000 .get("https://eth.hypersync.xyz/height")
2001 .bearer_auth(&api_token)
2002 .send()
2003 .await?;
2004
2005 assert_eq!(
2006 res.version(),
2007 reqwest::Version::HTTP_2,
2008 "expected HTTP/2 but got {:?}",
2009 res.version()
2010 );
2011 assert!(res.status().is_success());
2012 Ok(())
2013 }
2014
2015 #[tokio::test]
2016 #[ignore = "integration test with live hs server for height stream"]
2017 async fn test_stream_height_events() -> anyhow::Result<()> {
2018 let (tx, mut rx) = mpsc::channel(16);
2019 let handle = tokio::spawn(async move {
2020 let client = Client::builder()
2021 .url("https://monad-testnet.hypersync.xyz")
2022 .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
2023 .build()?;
2024 let mut es = client.get_es_stream().context("get es stream")?;
2025 Client::stream_height_events(&mut es, &tx).await
2026 });
2027
2028 let val = rx.recv().await;
2029 assert!(val.is_some());
2030 assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
2031 let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
2032 panic!("should have received height")
2033 };
2034 let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
2035 panic!("should have received height")
2036 };
2037 assert!(height2 > height);
2038 drop(rx);
2039
2040 let res = handle.await.expect("should have joined");
2041 let received_an_event =
2042 res.expect("should have ended the stream gracefully after dropping rx");
2043 assert!(received_an_event, "should have received an event");
2044
2045 Ok(())
2046 }
2047}