hypersync_client/lib.rs
1#![deny(missing_docs)]
2// README.md is a symlink (ln -s ../README.md README.md) to the root workspace
3// README. This is needed so that both `cargo doc` and `cargo package` work:
4// - `cargo doc`: include_str! resolves the symlink to the root README.
5// - `cargo package`: cargo copies the symlink target into the tarball, so
6// include_str! finds README.md at the package root during verification.
7#![doc = include_str!("../README.md")]
8use std::time::Instant;
9use std::{sync::Arc, time::Duration};
10
11use anyhow::{anyhow, Context, Result};
12use futures::StreamExt;
13use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
14use reqwest::{Method, StatusCode};
15use reqwest_eventsource::retry::ExponentialBackoff;
16use reqwest_eventsource::{Event, EventSource};
17
18pub mod arrow_reader;
19mod column_mapping;
20mod config;
21mod decode;
22mod decode_call;
23mod from_arrow;
24mod parquet_out;
25mod parse_response;
26pub mod preset_query;
27mod rate_limit;
28mod rayon_async;
29pub mod simple_types;
30mod stream;
31mod types;
32mod util;
33
34pub use hypersync_format as format;
35pub use hypersync_net_types as net_types;
36pub use hypersync_schema as schema;
37
38use parse_response::parse_query_response;
39use tokio::sync::mpsc;
40use types::ResponseData;
41use url::Url;
42
43pub use column_mapping::{ColumnMapping, DataType};
44pub use config::HexOutput;
45pub use config::{ClientConfig, SerializationFormat, StreamConfig};
46pub use decode::Decoder;
47pub use decode_call::CallDecoder;
48pub use rate_limit::RateLimitInfo;
49pub use types::{
50 ArrowResponse, ArrowResponseData, EventResponse, QueryResponse, QueryResponseWithRateLimit,
51};
52
53use crate::parse_response::read_query_response;
54use crate::simple_types::InternalEventJoinStrategy;
55
56#[derive(Debug)]
57struct HttpClientWrapper {
58 /// Mutable state that needs to be refreshed periodically
59 inner: std::sync::Mutex<HttpClientWrapperInner>,
60 /// HyperSync server api token.
61 api_token: String,
62 /// Standard timeout for http requests.
63 timeout: Duration,
64 /// Pools are never idle since polling often happens on the client
65 /// so connections never timeout and dns lookups never happen again
66 /// this is problematic if the underlying ip address changes during
67 /// failovers on hypersync. Since reqwest doesn't implement this we
68 /// simply recreate the client every time
69 max_connection_age: Duration,
70 /// For refreshing the client
71 user_agent: String,
72}
73
74#[derive(Debug)]
75struct HttpClientWrapperInner {
76 /// Initialized reqwest instance for client url.
77 client: reqwest::Client,
78 /// Timestamp when the client was created
79 created_at: Instant,
80}
81
82impl HttpClientWrapper {
83 fn new(user_agent: String, api_token: String, timeout: Duration) -> Self {
84 let client = reqwest::Client::builder()
85 .no_gzip()
86 .user_agent(&user_agent)
87 .build()
88 .unwrap();
89
90 Self {
91 inner: std::sync::Mutex::new(HttpClientWrapperInner {
92 client,
93 created_at: Instant::now(),
94 }),
95 api_token,
96 timeout,
97 max_connection_age: Duration::from_secs(60),
98 user_agent,
99 }
100 }
101
102 fn get_client(&self) -> reqwest::Client {
103 let mut inner = self.inner.lock().expect("HttpClientWrapper mutex poisoned");
104
105 // Check if client needs refresh due to age
106 if inner.created_at.elapsed() > self.max_connection_age {
107 // Recreate client to force new DNS lookup for failover scenarios
108 inner.client = reqwest::Client::builder()
109 .no_gzip()
110 .user_agent(&self.user_agent)
111 .build()
112 .unwrap();
113 inner.created_at = Instant::now();
114 }
115
116 // Clone is cheap for reqwest::Client (it's Arc-wrapped internally)
117 inner.client.clone()
118 }
119
120 fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
121 let client = self.get_client();
122 client
123 .request(method, url)
124 .timeout(self.timeout)
125 .bearer_auth(&self.api_token)
126 }
127
128 fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
129 let client = self.get_client();
130 client.request(method, url).bearer_auth(&self.api_token)
131 }
132}
133
134/// Internal client state to handle http requests and retries.
135#[derive(Debug)]
136struct ClientInner {
137 http_client: HttpClientWrapper,
138 /// HyperSync server URL.
139 url: Url,
140 /// Number of retries to attempt before returning error.
141 max_num_retries: usize,
142 /// Milliseconds that would be used for retry backoff increasing.
143 retry_backoff_ms: u64,
144 /// Initial wait time for request backoff.
145 retry_base_ms: u64,
146 /// Ceiling time for request backoff.
147 retry_ceiling_ms: u64,
148 /// Query serialization format to use for HTTP requests.
149 serialization_format: SerializationFormat,
150 /// Most recently observed rate limit info from the server, paired with the
151 /// instant it was captured so elapsed time can be subtracted from `reset_secs`.
152 rate_limit_state: std::sync::Mutex<Option<(RateLimitInfo, Instant)>>,
153 /// Whether to proactively sleep when the rate limit is exhausted.
154 proactive_rate_limit_sleep: bool,
155}
156
157/// Client to handle http requests and retries.
158#[derive(Clone, Debug)]
159pub struct Client {
160 inner: Arc<ClientInner>,
161}
162
163impl Client {
164 /// Creates a new client with the given configuration.
165 ///
166 /// Configuration must include the `url` and `api_token` fields.
167 /// # Example
168 /// ```
169 /// use hypersync_client::{Client, ClientConfig};
170 ///
171 /// let config = ClientConfig {
172 /// url: "https://eth.hypersync.xyz".to_string(),
173 /// api_token: std::env::var("ENVIO_API_TOKEN")?,
174 /// ..Default::default()
175 /// };
176 /// let client = Client::new(config)?;
177 /// # Ok::<(), anyhow::Error>(())
178 /// ```
179 ///
180 /// # Errors
181 /// This method fails if the config is invalid.
182 pub fn new(cfg: ClientConfig) -> Result<Self> {
183 // hscr stands for hypersync client rust
184 cfg.validate().context("invalid ClientConfig")?;
185 let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
186 Self::new_internal(cfg, user_agent)
187 }
188
189 /// Creates a new client builder.
190 ///
191 /// # Example
192 /// ```
193 /// use hypersync_client::Client;
194 ///
195 /// let client = Client::builder()
196 /// .chain_id(1)
197 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
198 /// .build()
199 /// .unwrap();
200 /// ```
201 pub fn builder() -> ClientBuilder {
202 ClientBuilder::new()
203 }
204
205 #[doc(hidden)]
206 pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
207 // Creates a new client with the given configuration and custom user agent.
208 // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
209 Self::new_internal(cfg, user_agent.into())
210 }
211
212 /// Internal constructor that takes both config and user agent.
213 fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
214 let http_client = HttpClientWrapper::new(
215 user_agent,
216 cfg.api_token,
217 Duration::from_millis(cfg.http_req_timeout_millis),
218 );
219
220 let url = Url::parse(&cfg.url).context("url is malformed")?;
221
222 Ok(Self {
223 inner: Arc::new(ClientInner {
224 http_client,
225 url,
226 max_num_retries: cfg.max_num_retries,
227 retry_backoff_ms: cfg.retry_backoff_ms,
228 retry_base_ms: cfg.retry_base_ms,
229 retry_ceiling_ms: cfg.retry_ceiling_ms,
230 serialization_format: cfg.serialization_format,
231 rate_limit_state: std::sync::Mutex::new(None),
232 proactive_rate_limit_sleep: cfg.proactive_rate_limit_sleep,
233 }),
234 })
235 }
236
237 /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
238 /// query and stream configuration.
239 ///
240 /// ### Implementation
241 /// Runs multiple queries simultaneously based on config.concurrency.
242 ///
243 /// Each query runs until it reaches query.to, server height, any max_num_* query param,
244 /// or execution timed out by server.
245 ///
246 /// # ⚠️ Important Warning
247 ///
248 /// This method will continue executing until the query has run to completion from beginning
249 /// to the end of the block range defined in the query. For heavy queries with large block
250 /// ranges or high data volumes, consider:
251 ///
252 /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
253 /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
254 /// - Break large queries into smaller block ranges
255 ///
256 /// # Example
257 /// ```no_run
258 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
259 ///
260 /// # async fn example() -> anyhow::Result<()> {
261 /// let client = Client::builder()
262 /// .chain_id(1)
263 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
264 /// .build()?;
265 ///
266 /// // Query ERC20 transfer events
267 /// let query = Query::new()
268 /// .from_block(19000000)
269 /// .to_block_excl(19000010)
270 /// .where_logs(
271 /// LogFilter::all()
272 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
273 /// )
274 /// .select_log_fields([LogField::Address, LogField::Data]);
275 /// let response = client.collect(query, StreamConfig::default()).await?;
276 ///
277 /// println!("Collected {} events", response.data.logs.len());
278 /// # Ok(())
279 /// # }
280 /// ```
281 pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
282 check_simple_stream_params(&config)?;
283
284 let mut recv = stream::stream_arrow(self, query, config)
285 .await
286 .context("start stream")?;
287
288 let mut data = ResponseData::default();
289 let mut archive_height = None;
290 let mut next_block = 0;
291 let mut total_execution_time = 0;
292
293 while let Some(res) = recv.recv().await {
294 let res = res.context("get response")?;
295 let res: QueryResponse =
296 QueryResponse::try_from(&res).context("convert arrow response")?;
297
298 for batch in res.data.blocks {
299 data.blocks.push(batch);
300 }
301 for batch in res.data.transactions {
302 data.transactions.push(batch);
303 }
304 for batch in res.data.logs {
305 data.logs.push(batch);
306 }
307 for batch in res.data.traces {
308 data.traces.push(batch);
309 }
310
311 archive_height = res.archive_height;
312 next_block = res.next_block;
313 total_execution_time += res.total_execution_time
314 }
315
316 Ok(QueryResponse {
317 archive_height,
318 next_block,
319 total_execution_time,
320 data,
321 rollback_guard: None,
322 })
323 }
324
325 /// Retrieves events through a stream using the provided query and stream configuration.
326 ///
327 /// # ⚠️ Important Warning
328 ///
329 /// This method will continue executing until the query has run to completion from beginning
330 /// to the end of the block range defined in the query. For heavy queries with large block
331 /// ranges or high data volumes, consider:
332 ///
333 /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
334 /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
335 /// - Break large queries into smaller block ranges
336 ///
337 /// # Example
338 /// ```no_run
339 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
340 ///
341 /// # async fn example() -> anyhow::Result<()> {
342 /// let client = Client::builder()
343 /// .chain_id(1)
344 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
345 /// .build()?;
346 ///
347 /// // Query transactions to a specific address
348 /// let query = Query::new()
349 /// .from_block(19000000)
350 /// .to_block_excl(19000100)
351 /// .where_transactions(
352 /// TransactionFilter::all()
353 /// .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
354 /// )
355 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
356 /// let response = client.collect_events(query, StreamConfig::default()).await?;
357 ///
358 /// println!("Collected {} events", response.data.len());
359 /// # Ok(())
360 /// # }
361 /// ```
362 pub async fn collect_events(
363 &self,
364 mut query: Query,
365 config: StreamConfig,
366 ) -> Result<EventResponse> {
367 check_simple_stream_params(&config)?;
368
369 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
370 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
371
372 let mut recv = stream::stream_arrow(self, query, config)
373 .await
374 .context("start stream")?;
375
376 let mut data = Vec::new();
377 let mut archive_height = None;
378 let mut next_block = 0;
379 let mut total_execution_time = 0;
380
381 while let Some(res) = recv.recv().await {
382 let res = res.context("get response")?;
383 let res: QueryResponse =
384 QueryResponse::try_from(&res).context("convert arrow response")?;
385 let events = event_join_strategy.join_from_response_data(res.data);
386
387 data.extend(events);
388
389 archive_height = res.archive_height;
390 next_block = res.next_block;
391 total_execution_time += res.total_execution_time
392 }
393
394 Ok(EventResponse {
395 archive_height,
396 next_block,
397 total_execution_time,
398 data,
399 rollback_guard: None,
400 })
401 }
402
403 /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
404 /// the provided query and stream configuration.
405 ///
406 /// Returns data in Apache Arrow format for high-performance columnar processing.
407 /// Useful for analytics workloads or when working with Arrow-compatible tools.
408 ///
409 /// # ⚠️ Important Warning
410 ///
411 /// This method will continue executing until the query has run to completion from beginning
412 /// to the end of the block range defined in the query. For heavy queries with large block
413 /// ranges or high data volumes, consider:
414 ///
415 /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
416 /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
417 /// - Break large queries into smaller block ranges
418 ///
419 /// # Example
420 /// ```no_run
421 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
422 ///
423 /// # async fn example() -> anyhow::Result<()> {
424 /// let client = Client::builder()
425 /// .chain_id(1)
426 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
427 /// .build()?;
428 ///
429 /// // Get block data in Arrow format for analytics
430 /// let query = Query::new()
431 /// .from_block(19000000)
432 /// .to_block_excl(19000100)
433 /// .include_all_blocks()
434 /// .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
435 /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
436 ///
437 /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
438 /// # Ok(())
439 /// # }
440 /// ```
441 pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
442 let mut recv = stream::stream_arrow(self, query, config)
443 .await
444 .context("start stream")?;
445
446 let mut data = ArrowResponseData::default();
447 let mut archive_height = None;
448 let mut next_block = 0;
449 let mut total_execution_time = 0;
450
451 while let Some(res) = recv.recv().await {
452 let res = res.context("get response")?;
453
454 for batch in res.data.blocks {
455 data.blocks.push(batch);
456 }
457 for batch in res.data.transactions {
458 data.transactions.push(batch);
459 }
460 for batch in res.data.logs {
461 data.logs.push(batch);
462 }
463 for batch in res.data.traces {
464 data.traces.push(batch);
465 }
466 for batch in res.data.decoded_logs {
467 data.decoded_logs.push(batch);
468 }
469
470 archive_height = res.archive_height;
471 next_block = res.next_block;
472 total_execution_time += res.total_execution_time
473 }
474
475 Ok(ArrowResponse {
476 archive_height,
477 next_block,
478 total_execution_time,
479 data,
480 rollback_guard: None,
481 })
482 }
483
484 /// Writes parquet file getting data through a stream using the provided path, query,
485 /// and stream configuration.
486 ///
487 /// Streams data directly to a Parquet file for efficient storage and later analysis.
488 /// Perfect for data exports or ETL pipelines.
489 ///
490 /// # ⚠️ Important Warning
491 ///
492 /// This method will continue executing until the query has run to completion from beginning
493 /// to the end of the block range defined in the query. For heavy queries with large block
494 /// ranges or high data volumes, consider:
495 ///
496 /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
497 /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
498 /// - Break large queries into smaller block ranges
499 ///
500 /// # Example
501 /// ```no_run
502 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
503 ///
504 /// # async fn example() -> anyhow::Result<()> {
505 /// let client = Client::builder()
506 /// .chain_id(1)
507 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
508 /// .build()?;
509 ///
510 /// // Export all DEX trades to Parquet for analysis
511 /// let query = Query::new()
512 /// .from_block(19000000)
513 /// .to_block_excl(19010000)
514 /// .where_logs(
515 /// LogFilter::all()
516 /// .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
517 /// )
518 /// .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
519 /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
520 ///
521 /// println!("Trade data exported to trades.parquet");
522 /// # Ok(())
523 /// # }
524 /// ```
525 pub async fn collect_parquet(
526 &self,
527 path: &str,
528 query: Query,
529 config: StreamConfig,
530 ) -> Result<()> {
531 parquet_out::collect_parquet(self, path, query, config).await
532 }
533
534 /// Internal implementation of getting chain_id from server
535 async fn get_chain_id_impl(&self) -> Result<u64> {
536 let mut url = self.inner.url.clone();
537 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
538 segments.push("chain_id");
539 std::mem::drop(segments);
540 let req = self.inner.http_client.request(Method::GET, url);
541
542 let res = req.send().await.context("execute http req")?;
543
544 let status = res.status();
545 if !status.is_success() {
546 return Err(anyhow!("http response status code {}", status));
547 }
548
549 let chain_id: ChainId = res.json().await.context("read response body json")?;
550
551 Ok(chain_id.chain_id)
552 }
553
554 /// Internal implementation of getting height from server
555 async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
556 let mut url = self.inner.url.clone();
557 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
558 segments.push("height");
559 std::mem::drop(segments);
560 let mut req = self.inner.http_client.request(Method::GET, url);
561 if let Some(http_timeout_override) = http_timeout_override {
562 req = req.timeout(http_timeout_override);
563 }
564
565 let res = req.send().await.context("execute http req")?;
566
567 let status = res.status();
568 if !status.is_success() {
569 return Err(anyhow!("http response status code {}", status));
570 }
571
572 let height: ArchiveHeight = res.json().await.context("read response body json")?;
573
574 Ok(height.height.unwrap_or(0))
575 }
576
577 /// Get the chain_id from the server with retries.
578 ///
579 /// # Example
580 /// ```no_run
581 /// use hypersync_client::Client;
582 ///
583 /// # async fn example() -> anyhow::Result<()> {
584 /// let client = Client::builder()
585 /// .chain_id(1)
586 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
587 /// .build()?;
588 ///
589 /// let chain_id = client.get_chain_id().await?;
590 /// println!("Connected to chain ID: {}", chain_id);
591 /// # Ok(())
592 /// # }
593 /// ```
594 pub async fn get_chain_id(&self) -> Result<u64> {
595 let mut base = self.inner.retry_base_ms;
596
597 let mut err = anyhow!("");
598
599 for _ in 0..self.inner.max_num_retries + 1 {
600 match self.get_chain_id_impl().await {
601 Ok(res) => return Ok(res),
602 Err(e) => {
603 log::error!(
604 "failed to get chain_id from server, retrying... The error was: {e:?}"
605 );
606 err = err.context(format!("{e:?}"));
607 }
608 }
609
610 let base_ms = Duration::from_millis(base);
611 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
612 rand::random(),
613 self.inner.retry_backoff_ms,
614 ));
615
616 tokio::time::sleep(base_ms + jitter).await;
617
618 base = std::cmp::min(
619 base + self.inner.retry_backoff_ms,
620 self.inner.retry_ceiling_ms,
621 );
622 }
623
624 Err(err)
625 }
626
627 /// Get the height of from server with retries.
628 ///
629 /// # Example
630 /// ```no_run
631 /// use hypersync_client::Client;
632 ///
633 /// # async fn example() -> anyhow::Result<()> {
634 /// let client = Client::builder()
635 /// .chain_id(1)
636 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
637 /// .build()?;
638 ///
639 /// let height = client.get_height().await?;
640 /// println!("Current block height: {}", height);
641 /// # Ok(())
642 /// # }
643 /// ```
644 pub async fn get_height(&self) -> Result<u64> {
645 let mut base = self.inner.retry_base_ms;
646
647 let mut err = anyhow!("");
648
649 for _ in 0..self.inner.max_num_retries + 1 {
650 match self.get_height_impl(None).await {
651 Ok(res) => return Ok(res),
652 Err(e) => {
653 log::error!(
654 "failed to get height from server, retrying... The error was: {e:?}"
655 );
656 err = err.context(format!("{e:?}"));
657 }
658 }
659
660 let base_ms = Duration::from_millis(base);
661 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
662 rand::random(),
663 self.inner.retry_backoff_ms,
664 ));
665
666 tokio::time::sleep(base_ms + jitter).await;
667
668 base = std::cmp::min(
669 base + self.inner.retry_backoff_ms,
670 self.inner.retry_ceiling_ms,
671 );
672 }
673
674 Err(err)
675 }
676
677 /// Get the height of the Client instance for health checks.
678 ///
679 /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
680 ///
681 /// # Example
682 /// ```no_run
683 /// use hypersync_client::Client;
684 /// use std::time::Duration;
685 ///
686 /// # async fn example() -> anyhow::Result<()> {
687 /// let client = Client::builder()
688 /// .chain_id(1)
689 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
690 /// .build()?;
691 ///
692 /// // Quick health check with 5 second timeout
693 /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
694 /// println!("Server is healthy at block: {}", height);
695 /// # Ok(())
696 /// # }
697 /// ```
698 pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
699 self.get_height_impl(http_req_timeout).await
700 }
701
702 /// Executes query with retries and returns the response.
703 ///
704 /// # Example
705 /// ```no_run
706 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
707 ///
708 /// # async fn example() -> anyhow::Result<()> {
709 /// let client = Client::builder()
710 /// .chain_id(1)
711 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
712 /// .build()?;
713 ///
714 /// // Query all blocks from a specific range
715 /// let query = Query::new()
716 /// .from_block(19000000)
717 /// .to_block_excl(19000010)
718 /// .include_all_blocks()
719 /// .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
720 /// let response = client.get(&query).await?;
721 ///
722 /// println!("Retrieved {} blocks", response.data.blocks.len());
723 /// # Ok(())
724 /// # }
725 /// ```
726 pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
727 let arrow_response = self.get_arrow(query).await.context("get data")?;
728 let converted =
729 QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
730 Ok(converted)
731 }
732
733 /// Add block, transaction and log fields selection to the query, executes it with retries
734 /// and returns the response.
735 ///
736 /// This method automatically joins blocks, transactions, and logs into unified events,
737 /// making it easier to work with related blockchain data.
738 ///
739 /// # Example
740 /// ```no_run
741 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
742 ///
743 /// # async fn example() -> anyhow::Result<()> {
744 /// let client = Client::builder()
745 /// .chain_id(1)
746 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
747 /// .build()?;
748 ///
749 /// // Query ERC20 transfers with transaction context
750 /// let query = Query::new()
751 /// .from_block(19000000)
752 /// .to_block_excl(19000010)
753 /// .where_logs(
754 /// LogFilter::all()
755 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
756 /// )
757 /// .select_log_fields([LogField::Address, LogField::Data])
758 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
759 /// let response = client.get_events(query).await?;
760 ///
761 /// println!("Retrieved {} joined events", response.data.len());
762 /// # Ok(())
763 /// # }
764 /// ```
765 pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
766 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
767 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
768 let arrow_response = self.get_arrow(&query).await.context("get data")?;
769 EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
770 }
771
772 /// Executes query once and returns the result using JSON serialization.
773 async fn get_arrow_impl_json(
774 &self,
775 query: &Query,
776 ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
777 let mut url = self.inner.url.clone();
778 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
779 segments.push("query");
780 segments.push("arrow-ipc");
781 std::mem::drop(segments);
782 let req = self.inner.http_client.request(Method::POST, url);
783
784 let res = req.json(&query).send().await.context("execute http req")?;
785
786 let status = res.status();
787 let rate_limit = RateLimitInfo::from_response(&res);
788
789 if status == StatusCode::PAYLOAD_TOO_LARGE {
790 return Err(HyperSyncResponseError::PayloadTooLarge);
791 }
792 if status == StatusCode::TOO_MANY_REQUESTS {
793 return Err(HyperSyncResponseError::RateLimited { rate_limit });
794 }
795 if !status.is_success() {
796 let text = res.text().await.context("read text to see error")?;
797
798 return Err(HyperSyncResponseError::Other(anyhow!(
799 "http response status code {}, err body: {}",
800 status,
801 text
802 )));
803 }
804
805 let bytes = res.bytes().await.context("read response body bytes")?;
806
807 let res = tokio::task::block_in_place(|| {
808 parse_query_response(&bytes).context("parse query response")
809 })?;
810
811 Ok(ArrowImplResponse {
812 response: res,
813 response_bytes: bytes.len().try_into().unwrap(),
814 rate_limit,
815 })
816 }
817
818 fn should_cache_queries(&self) -> bool {
819 matches!(
820 self.inner.serialization_format,
821 SerializationFormat::CapnProto {
822 should_cache_queries: true
823 }
824 )
825 }
826
827 /// Executes query once and returns the result using Cap'n Proto serialization.
828 async fn get_arrow_impl_capnp(
829 &self,
830 query: &Query,
831 ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
832 let mut url = self.inner.url.clone();
833 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
834 segments.push("query");
835 segments.push("arrow-ipc");
836 segments.push("capnp");
837 std::mem::drop(segments);
838
839 let should_cache = self.should_cache_queries();
840
841 if should_cache {
842 let query_with_id = {
843 let mut message = capnp::message::Builder::new_default();
844 let mut request_builder =
845 message.init_root::<hypersync_net_types_capnp::request::Builder>();
846
847 request_builder
848 .build_query_id_from_query(query)
849 .context("build query id")?;
850 let mut query_with_id = Vec::new();
851 capnp::serialize_packed::write_message(&mut query_with_id, &message)
852 .context("write capnp message")?;
853 query_with_id
854 };
855
856 let req = self.inner.http_client.request(Method::POST, url.clone());
857
858 let res = req
859 .body(query_with_id)
860 .send()
861 .await
862 .context("execute http req")?;
863
864 let status = res.status();
865 let rate_limit = RateLimitInfo::from_response(&res);
866
867 if status == StatusCode::PAYLOAD_TOO_LARGE {
868 return Err(HyperSyncResponseError::PayloadTooLarge);
869 }
870 if status == StatusCode::TOO_MANY_REQUESTS {
871 return Err(HyperSyncResponseError::RateLimited { rate_limit });
872 }
873 if status.is_success() {
874 let bytes = res.bytes().await.context("read response body bytes")?;
875
876 let mut opts = capnp::message::ReaderOptions::new();
877 opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
878 let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
879 .context("create message reader")?;
880 let query_response = message_reader
881 .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
882 .context("get cached_query_response root")?;
883 match query_response.get_either().which().context("get either")? {
884 hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
885 query_response,
886 ) => {
887 let res = tokio::task::block_in_place(|| {
888 let res = query_response?;
889 read_query_response(&res).context("parse query response cached")
890 })?;
891 return Ok(ArrowImplResponse {
892 response: res,
893 response_bytes: bytes.len().try_into().unwrap(),
894 rate_limit,
895 });
896 }
897 hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
898 log::trace!("query was not cached, retrying with full query");
899 }
900 }
901 } else {
902 let text = res.text().await.context("read text to see error")?;
903 log::warn!(
904 "Failed cache query, will retry full query. {}, err body: {}",
905 status,
906 text
907 );
908 }
909 };
910
911 let full_query_bytes = {
912 let mut message = capnp::message::Builder::new_default();
913 let mut query_builder =
914 message.init_root::<hypersync_net_types_capnp::request::Builder>();
915
916 query_builder
917 .build_full_query_from_query(query, should_cache)
918 .context("build full query")?;
919 let mut bytes = Vec::new();
920 capnp::serialize_packed::write_message(&mut bytes, &message)
921 .context("write full query capnp message")?;
922 bytes
923 };
924
925 let req = self.inner.http_client.request(Method::POST, url);
926
927 let res = req
928 .body(full_query_bytes)
929 .send()
930 .await
931 .context("execute http req")?;
932
933 let status = res.status();
934 let rate_limit = RateLimitInfo::from_response(&res);
935
936 if status == StatusCode::PAYLOAD_TOO_LARGE {
937 return Err(HyperSyncResponseError::PayloadTooLarge);
938 }
939 if status == StatusCode::TOO_MANY_REQUESTS {
940 return Err(HyperSyncResponseError::RateLimited { rate_limit });
941 }
942 if !status.is_success() {
943 let text = res.text().await.context("read text to see error")?;
944
945 return Err(HyperSyncResponseError::Other(anyhow!(
946 "http response status code {}, err body: {}",
947 status,
948 text
949 )));
950 }
951
952 let bytes = res.bytes().await.context("read response body bytes")?;
953
954 let res = tokio::task::block_in_place(|| {
955 parse_query_response(&bytes).context("parse query response")
956 })?;
957
958 Ok(ArrowImplResponse {
959 response: res,
960 response_bytes: bytes.len().try_into().unwrap(),
961 rate_limit,
962 })
963 }
964
965 /// Executes query once and returns the result, handling payload-too-large by halving block range.
966 async fn get_arrow_impl(
967 &self,
968 query: &Query,
969 ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
970 let mut query = query.clone();
971 loop {
972 let res = match self.inner.serialization_format {
973 SerializationFormat::Json => self.get_arrow_impl_json(&query).await,
974 SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(&query).await,
975 };
976 match res {
977 Ok(res) => return Ok(res),
978 Err(
979 e @ (HyperSyncResponseError::Other(_)
980 | HyperSyncResponseError::RateLimited { .. }),
981 ) => return Err(e),
982 Err(HyperSyncResponseError::PayloadTooLarge) => {
983 let block_range = if let Some(to_block) = query.to_block {
984 let current = to_block - query.from_block;
985 if current < 2 {
986 return Err(HyperSyncResponseError::Other(anyhow!(
987 "Payload is too large and query is using the minimum block range."
988 )));
989 }
990 // Half the current block range
991 current / 2
992 } else {
993 200
994 };
995 let to_block = query.from_block + block_range;
996 query.to_block = Some(to_block);
997
998 log::trace!(
999 "Payload is too large, retrying with block range from: {} to: {}",
1000 query.from_block,
1001 to_block
1002 );
1003 }
1004 }
1005 }
1006 }
1007
1008 /// Executes query with retries and returns the response in Arrow format.
1009 pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
1010 self.get_arrow_with_size(query)
1011 .await
1012 .map(|res| res.response)
1013 }
1014
1015 /// Internal implementation for get_arrow.
1016 async fn get_arrow_with_size(&self, query: &Query) -> Result<ArrowImplResponse> {
1017 let mut base = self.inner.retry_base_ms;
1018
1019 let mut err = anyhow!("");
1020
1021 // Proactive throttling: if we know we're rate limited, wait before sending
1022 if self.inner.proactive_rate_limit_sleep {
1023 self.wait_for_rate_limit().await;
1024 }
1025
1026 for _ in 0..self.inner.max_num_retries + 1 {
1027 match self.get_arrow_impl(query).await {
1028 Ok(res) => {
1029 self.update_rate_limit_state(&res.rate_limit);
1030 return Ok(res);
1031 }
1032 Err(HyperSyncResponseError::RateLimited { rate_limit }) => {
1033 self.update_rate_limit_state(&rate_limit);
1034 let wait_secs = rate_limit.suggested_wait_secs().unwrap_or(1) + 1;
1035 log::warn!(
1036 "rate limited by server ({rate_limit}), waiting {wait_secs}s before retry. To increase your rate limits, upgrade your plan at https://app.envio.dev/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens"
1037 );
1038 err = err.context(format!("rate limited by server ({rate_limit}). To increase your rate limits, upgrade your plan at https://app.envio.dev/api-tokens"));
1039 tokio::time::sleep(Duration::from_secs(wait_secs)).await;
1040 continue;
1041 }
1042 Err(HyperSyncResponseError::Other(e)) => {
1043 log::error!(
1044 "failed to get arrow data from server, retrying... The error was: {e:?}"
1045 );
1046 err = err.context(format!("{e:?}"));
1047 }
1048 Err(HyperSyncResponseError::PayloadTooLarge) => {
1049 // This shouldn't happen since get_arrow_impl handles it, but just in case
1050 log::error!("unexpected PayloadTooLarge from get_arrow_impl, retrying...");
1051 err = err.context("unexpected PayloadTooLarge");
1052 }
1053 }
1054
1055 let base_ms = Duration::from_millis(base);
1056 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
1057 rand::random(),
1058 self.inner.retry_backoff_ms,
1059 ));
1060
1061 tokio::time::sleep(base_ms + jitter).await;
1062
1063 base = std::cmp::min(
1064 base + self.inner.retry_backoff_ms,
1065 self.inner.retry_ceiling_ms,
1066 );
1067 }
1068
1069 Err(err)
1070 }
1071
1072 /// Spawns task to execute query and return data via a channel.
1073 ///
1074 /// # Example
1075 /// ```no_run
1076 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
1077 ///
1078 /// # async fn example() -> anyhow::Result<()> {
1079 /// let client = Client::builder()
1080 /// .chain_id(1)
1081 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1082 /// .build()?;
1083 ///
1084 /// // Stream all ERC20 transfer events
1085 /// let query = Query::new()
1086 /// .from_block(19000000)
1087 /// .where_logs(
1088 /// LogFilter::all()
1089 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1090 /// )
1091 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
1092 /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
1093 ///
1094 /// while let Some(response) = receiver.recv().await {
1095 /// let response = response?;
1096 /// println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
1097 /// }
1098 /// # Ok(())
1099 /// # }
1100 /// ```
1101 pub async fn stream(
1102 &self,
1103 query: Query,
1104 config: StreamConfig,
1105 ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1106 check_simple_stream_params(&config)?;
1107
1108 let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1109 mpsc::channel(config.concurrency);
1110
1111 let mut inner_rx = self
1112 .stream_arrow(query, config)
1113 .await
1114 .context("start inner stream")?;
1115
1116 tokio::spawn(async move {
1117 while let Some(resp) = inner_rx.recv().await {
1118 let msg = resp
1119 .context("inner receiver")
1120 .and_then(|r| QueryResponse::try_from(&r));
1121 let is_err = msg.is_err();
1122 if tx.send(msg).await.is_err() || is_err {
1123 return;
1124 }
1125 }
1126 });
1127
1128 Ok(rx)
1129 }
1130
1131 /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1132 /// returning data via a channel.
1133 ///
1134 /// This method automatically joins blocks, transactions, and logs into unified events,
1135 /// then streams them via a channel for real-time processing.
1136 ///
1137 /// # Example
1138 /// ```no_run
1139 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1140 ///
1141 /// # async fn example() -> anyhow::Result<()> {
1142 /// let client = Client::builder()
1143 /// .chain_id(1)
1144 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1145 /// .build()?;
1146 ///
1147 /// // Stream NFT transfer events with transaction context
1148 /// let query = Query::new()
1149 /// .from_block(19000000)
1150 /// .where_logs(
1151 /// LogFilter::all()
1152 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1153 /// )
1154 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1155 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1156 /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1157 ///
1158 /// while let Some(response) = receiver.recv().await {
1159 /// let response = response?;
1160 /// println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1161 /// }
1162 /// # Ok(())
1163 /// # }
1164 /// ```
1165 pub async fn stream_events(
1166 &self,
1167 mut query: Query,
1168 config: StreamConfig,
1169 ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1170 check_simple_stream_params(&config)?;
1171
1172 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1173
1174 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1175
1176 let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1177 mpsc::channel(config.concurrency);
1178
1179 let mut inner_rx = self
1180 .stream_arrow(query, config)
1181 .await
1182 .context("start inner stream")?;
1183
1184 tokio::spawn(async move {
1185 while let Some(resp) = inner_rx.recv().await {
1186 let msg = resp
1187 .context("inner receiver")
1188 .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1189 let is_err = msg.is_err();
1190 if tx.send(msg).await.is_err() || is_err {
1191 return;
1192 }
1193 }
1194 });
1195
1196 Ok(rx)
1197 }
1198
1199 /// Spawns task to execute query and return data via a channel in Arrow format.
1200 ///
1201 /// Returns raw Apache Arrow data via a channel for high-performance processing.
1202 /// Ideal for applications that need to work directly with columnar data.
1203 ///
1204 /// # Example
1205 /// ```no_run
1206 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1207 ///
1208 /// # async fn example() -> anyhow::Result<()> {
1209 /// let client = Client::builder()
1210 /// .chain_id(1)
1211 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1212 /// .build()?;
1213 ///
1214 /// // Stream transaction data in Arrow format for analytics
1215 /// let query = Query::new()
1216 /// .from_block(19000000)
1217 /// .to_block_excl(19000100)
1218 /// .where_transactions(
1219 /// TransactionFilter::all()
1220 /// .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1221 /// )
1222 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1223 /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1224 ///
1225 /// while let Some(response) = receiver.recv().await {
1226 /// let response = response?;
1227 /// println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1228 /// }
1229 /// # Ok(())
1230 /// # }
1231 /// ```
1232 pub async fn stream_arrow(
1233 &self,
1234 query: Query,
1235 config: StreamConfig,
1236 ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1237 stream::stream_arrow(self, query, config).await
1238 }
1239
1240 /// Executes query with retries and returns the response in Arrow format along with
1241 /// rate limit information from the server.
1242 ///
1243 /// This is useful for consumers that want to inspect rate limit headers and implement
1244 /// their own rate limiting logic in external systems.
1245 pub async fn get_arrow_with_rate_limit(
1246 &self,
1247 query: &Query,
1248 ) -> Result<QueryResponseWithRateLimit<ArrowResponseData>> {
1249 let result = self.get_arrow_with_size(query).await?;
1250 Ok(QueryResponseWithRateLimit {
1251 response: result.response,
1252 rate_limit: result.rate_limit,
1253 })
1254 }
1255
1256 /// Executes query with retries and returns the response along with
1257 /// rate limit information from the server.
1258 ///
1259 /// This is useful for consumers that want to inspect rate limit headers and implement
1260 /// their own rate limiting logic in external systems.
1261 pub async fn get_with_rate_limit(
1262 &self,
1263 query: &Query,
1264 ) -> Result<QueryResponseWithRateLimit<ResponseData>> {
1265 let result = self.get_arrow_with_rate_limit(query).await?;
1266 let converted =
1267 QueryResponse::try_from(&result.response).context("convert arrow response")?;
1268 Ok(QueryResponseWithRateLimit {
1269 response: converted,
1270 rate_limit: result.rate_limit,
1271 })
1272 }
1273
1274 /// Returns the most recently observed rate limit information, if any.
1275 ///
1276 /// Updated after every request (including inside streams). Returns `None`
1277 /// if no requests have been made yet or the server hasn't returned rate limit headers.
1278 pub fn rate_limit_info(&self) -> Option<RateLimitInfo> {
1279 self.inner
1280 .rate_limit_state
1281 .lock()
1282 .expect("rate_limit_state mutex poisoned")
1283 .as_ref()
1284 .map(|(info, _captured_at)| info.clone())
1285 }
1286
1287 /// Waits until the current rate limit window resets, if the client is rate limited.
1288 ///
1289 /// Returns immediately if:
1290 /// - No rate limit information has been observed yet
1291 /// - There is remaining quota in the current window
1292 ///
1293 /// This method is useful for consumers who want to explicitly wait before making
1294 /// requests, for example when coordinating rate limits across multiple systems.
1295 pub async fn wait_for_rate_limit(&self) {
1296 let wait_info = {
1297 let state = self
1298 .inner
1299 .rate_limit_state
1300 .lock()
1301 .expect("rate_limit_state mutex poisoned");
1302 match state.as_ref() {
1303 Some((info, captured_at)) if info.is_rate_limited() => {
1304 info.suggested_wait_secs().map(|secs| {
1305 let elapsed = captured_at.elapsed().as_secs();
1306 let remaining_wait = secs.saturating_sub(elapsed);
1307 (remaining_wait, info.clone())
1308 })
1309 }
1310 _ => None,
1311 }
1312 };
1313 if let Some((secs, info)) = wait_info {
1314 if secs > 0 {
1315 log::warn!(
1316 "rate limit exhausted ({info}), proactively waiting {secs}s for window reset. To increase your rate limits, upgrade your plan at https://app.envio.dev/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens"
1317 );
1318 tokio::time::sleep(Duration::from_secs(secs)).await;
1319 }
1320 }
1321 }
1322
1323 /// Updates the internally tracked rate limit state with the current timestamp.
1324 fn update_rate_limit_state(&self, rate_limit: &RateLimitInfo) {
1325 // Only update if the response actually contained rate limit headers
1326 if rate_limit.limit.is_some()
1327 || rate_limit.remaining.is_some()
1328 || rate_limit.reset_secs.is_some()
1329 {
1330 let mut state = self
1331 .inner
1332 .rate_limit_state
1333 .lock()
1334 .expect("rate_limit_state mutex poisoned");
1335 *state = Some((rate_limit.clone(), Instant::now()));
1336 }
1337 }
1338
1339 /// Getter for url field.
1340 ///
1341 /// # Example
1342 /// ```
1343 /// use hypersync_client::Client;
1344 ///
1345 /// let client = Client::builder()
1346 /// .chain_id(1)
1347 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1348 /// .build()
1349 /// .unwrap();
1350 ///
1351 /// println!("Client URL: {}", client.url());
1352 /// ```
1353 pub fn url(&self) -> &Url {
1354 &self.inner.url
1355 }
1356}
1357
1358/// Builder for creating a hypersync client with configuration options.
1359///
1360/// This builder provides a fluent API for configuring client settings like URL,
1361/// authentication, timeouts, and retry behavior.
1362///
1363/// # Example
1364/// ```
1365/// use hypersync_client::{Client, SerializationFormat};
1366///
1367/// let client = Client::builder()
1368/// .chain_id(1)
1369/// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1370/// .http_req_timeout_millis(30000)
1371/// .max_num_retries(3)
1372/// .build()
1373/// .unwrap();
1374/// ```
1375#[derive(Debug, Clone, Default)]
1376pub struct ClientBuilder(ClientConfig);
1377
1378impl ClientBuilder {
1379 /// Creates a new ClientBuilder with default configuration.
1380 pub fn new() -> Self {
1381 Self::default()
1382 }
1383
1384 /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1385 ///
1386 /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1387 ///
1388 /// # Arguments
1389 /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1390 ///
1391 /// # Example
1392 /// ```
1393 /// use hypersync_client::Client;
1394 ///
1395 /// let client = Client::builder()
1396 /// .chain_id(1) // Ethereum mainnet
1397 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1398 /// .build()
1399 /// .unwrap();
1400 /// ```
1401 pub fn chain_id(mut self, chain_id: u64) -> Self {
1402 self.0.url = format!("https://{chain_id}.hypersync.xyz");
1403 self
1404 }
1405
1406 /// Sets a custom URL for the hypersync server.
1407 ///
1408 /// Use this method when you need to connect to a custom hypersync endpoint
1409 /// instead of the default public endpoints.
1410 ///
1411 /// # Arguments
1412 /// * `url` - The hypersync server URL
1413 ///
1414 /// # Example
1415 /// ```
1416 /// use hypersync_client::Client;
1417 ///
1418 /// let client = Client::builder()
1419 /// .url("https://my-custom-hypersync.example.com")
1420 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1421 /// .build()
1422 /// .unwrap();
1423 /// ```
1424 pub fn url<S: ToString>(mut self, url: S) -> Self {
1425 self.0.url = url.to_string();
1426 self
1427 }
1428
1429 /// Sets the api token for authentication.
1430 ///
1431 /// Required for accessing authenticated hypersync endpoints.
1432 ///
1433 /// # Arguments
1434 /// * `api_token` - The authentication token
1435 ///
1436 /// # Example
1437 /// ```
1438 /// use hypersync_client::Client;
1439 ///
1440 /// let client = Client::builder()
1441 /// .chain_id(1)
1442 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1443 /// .build()
1444 /// .unwrap();
1445 /// ```
1446 pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1447 self.0.api_token = api_token.to_string();
1448 self
1449 }
1450
1451 /// Sets the HTTP request timeout in milliseconds.
1452 ///
1453 /// # Arguments
1454 /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1455 ///
1456 /// # Example
1457 /// ```
1458 /// use hypersync_client::Client;
1459 ///
1460 /// let client = Client::builder()
1461 /// .chain_id(1)
1462 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1463 /// .http_req_timeout_millis(60000) // 60 second timeout
1464 /// .build()
1465 /// .unwrap();
1466 /// ```
1467 pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1468 self.0.http_req_timeout_millis = http_req_timeout_millis;
1469 self
1470 }
1471
1472 /// Sets the maximum number of retries for failed requests.
1473 ///
1474 /// # Arguments
1475 /// * `max_num_retries` - Maximum number of retries (default: 10)
1476 ///
1477 /// # Example
1478 /// ```
1479 /// use hypersync_client::Client;
1480 ///
1481 /// let client = Client::builder()
1482 /// .chain_id(1)
1483 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1484 /// .max_num_retries(5)
1485 /// .build()
1486 /// .unwrap();
1487 /// ```
1488 pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1489 self.0.max_num_retries = max_num_retries;
1490 self
1491 }
1492
1493 /// Sets the backoff increment for retry delays.
1494 ///
1495 /// This value is added to the base delay on each retry attempt.
1496 ///
1497 /// # Arguments
1498 /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1499 ///
1500 /// # Example
1501 /// ```
1502 /// use hypersync_client::Client;
1503 ///
1504 /// let client = Client::builder()
1505 /// .chain_id(1)
1506 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1507 /// .retry_backoff_ms(1000) // 1 second backoff increment
1508 /// .build()
1509 /// .unwrap();
1510 /// ```
1511 pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1512 self.0.retry_backoff_ms = retry_backoff_ms;
1513 self
1514 }
1515
1516 /// Sets the initial delay for retry attempts.
1517 ///
1518 /// # Arguments
1519 /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1520 ///
1521 /// # Example
1522 /// ```
1523 /// use hypersync_client::Client;
1524 ///
1525 /// let client = Client::builder()
1526 /// .chain_id(1)
1527 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1528 /// .retry_base_ms(1000) // Start with 1 second delay
1529 /// .build()
1530 /// .unwrap();
1531 /// ```
1532 pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1533 self.0.retry_base_ms = retry_base_ms;
1534 self
1535 }
1536
1537 /// Sets the maximum delay for retry attempts.
1538 ///
1539 /// The retry delay will not exceed this value, even with backoff increments.
1540 ///
1541 /// # Arguments
1542 /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1543 ///
1544 /// # Example
1545 /// ```
1546 /// use hypersync_client::Client;
1547 ///
1548 /// let client = Client::builder()
1549 /// .chain_id(1)
1550 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1551 /// .retry_ceiling_ms(30000) // Cap at 30 seconds
1552 /// .build()
1553 /// .unwrap();
1554 /// ```
1555 pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1556 self.0.retry_ceiling_ms = retry_ceiling_ms;
1557 self
1558 }
1559
1560 /// Sets whether to proactively sleep when rate limited.
1561 ///
1562 /// When enabled (default), the client will wait for the rate limit window to reset
1563 /// before sending requests it knows will be rejected. Set to `false` to disable
1564 /// this behavior and handle rate limits yourself.
1565 ///
1566 /// # Arguments
1567 /// * `proactive_rate_limit_sleep` - Whether to enable proactive rate limit sleeping (default: true)
1568 pub fn proactive_rate_limit_sleep(mut self, proactive_rate_limit_sleep: bool) -> Self {
1569 self.0.proactive_rate_limit_sleep = proactive_rate_limit_sleep;
1570 self
1571 }
1572
1573 /// Sets the serialization format for client-server communication.
1574 ///
1575 /// # Arguments
1576 /// * `serialization_format` - The format to use (JSON or CapnProto)
1577 ///
1578 /// # Example
1579 /// ```
1580 /// use hypersync_client::{Client, SerializationFormat};
1581 ///
1582 /// let client = Client::builder()
1583 /// .chain_id(1)
1584 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1585 /// .serialization_format(SerializationFormat::Json)
1586 /// .build()
1587 /// .unwrap();
1588 /// ```
1589 pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1590 self.0.serialization_format = serialization_format;
1591 self
1592 }
1593
1594 /// Builds the client with the configured settings.
1595 ///
1596 /// # Returns
1597 /// * `Result<Client>` - The configured client or an error if configuration is invalid
1598 ///
1599 /// # Errors
1600 /// Returns an error if:
1601 /// * The URL is malformed
1602 /// * Required configuration is missing
1603 ///
1604 /// # Example
1605 /// ```
1606 /// use hypersync_client::Client;
1607 ///
1608 /// let client = Client::builder()
1609 /// .chain_id(1)
1610 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1611 /// .build()
1612 /// .unwrap();
1613 /// ```
1614 pub fn build(self) -> Result<Client> {
1615 if self.0.url.is_empty() {
1616 anyhow::bail!(
1617 "endpoint needs to be set, try using builder.chain_id(1) or\
1618 builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1619 )
1620 }
1621 Client::new(self.0)
1622 }
1623}
1624
1625/// 200ms
1626const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1627const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1628/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1629/// so we timeout after 15s (3x the ping interval).
1630const READ_TIMEOUT: Duration = Duration::from_secs(15);
1631
1632/// Events emitted by the height stream.
1633#[derive(Debug, Clone, PartialEq, Eq)]
1634pub enum HeightStreamEvent {
1635 /// Successfully connected or reconnected to the SSE stream.
1636 Connected,
1637 /// Received a height update from the server.
1638 Height(u64),
1639 /// Connection lost, will attempt to reconnect after the specified delay.
1640 Reconnecting {
1641 /// Duration to wait before attempting reconnection.
1642 delay: Duration,
1643 /// The error that caused the reconnection.
1644 error_msg: String,
1645 },
1646}
1647
1648enum InternalStreamEvent {
1649 Publish(HeightStreamEvent),
1650 Ping,
1651 Unknown(String),
1652}
1653
1654impl Client {
1655 fn get_es_stream(&self) -> Result<EventSource> {
1656 // Build the GET /height/sse request
1657 let mut url = self.inner.url.clone();
1658 url.path_segments_mut()
1659 .ok()
1660 .context("invalid base URL")?
1661 .extend(&["height", "sse"]);
1662
1663 let req = self
1664 .inner
1665 .http_client
1666 // Don't set timeout for SSE stream
1667 .request_no_timeout(Method::GET, url);
1668
1669 // Configure exponential backoff for library-level retries
1670 let retry_policy = ExponentialBackoff::new(
1671 INITIAL_RECONNECT_DELAY,
1672 2.0,
1673 Some(MAX_RECONNECT_DELAY),
1674 None, // unlimited retries
1675 );
1676
1677 // Turn the request into an EventSource stream with retries
1678 let mut es = reqwest_eventsource::EventSource::new(req)
1679 .context("unexpected error creating EventSource")?;
1680 es.set_retry_policy(Box::new(retry_policy));
1681 Ok(es)
1682 }
1683
1684 async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1685 let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1686 .await
1687 .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1688 else {
1689 return Ok(None);
1690 };
1691
1692 let e = match res.context("failed response")? {
1693 Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1694 Event::Message(event) => match event.event.as_str() {
1695 "height" => {
1696 let height = event
1697 .data
1698 .trim()
1699 .parse::<u64>()
1700 .context("parsing height from event data")?;
1701 InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1702 }
1703 "ping" => InternalStreamEvent::Ping,
1704 _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1705 },
1706 };
1707
1708 Ok(Some(e))
1709 }
1710
1711 async fn stream_height_events(
1712 es: &mut EventSource,
1713 tx: &mpsc::Sender<HeightStreamEvent>,
1714 ) -> Result<bool> {
1715 let mut received_an_event = false;
1716 while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1717 match event {
1718 InternalStreamEvent::Publish(event) => {
1719 received_an_event = true;
1720 if tx.send(event).await.is_err() {
1721 return Ok(received_an_event); // Receiver dropped, exit task
1722 }
1723 }
1724 InternalStreamEvent::Ping => (), // ignore pings
1725 InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1726 }
1727 }
1728 Ok(received_an_event)
1729 }
1730
1731 fn get_delay(consecutive_failures: u32) -> Duration {
1732 if consecutive_failures > 0 {
1733 /// helper function to calculate 2^x
1734 /// optimization using bit shifting
1735 const fn two_to_pow(x: u32) -> u32 {
1736 1 << x
1737 }
1738 // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1739 INITIAL_RECONNECT_DELAY
1740 .saturating_mul(two_to_pow(consecutive_failures - 1))
1741 .min(MAX_RECONNECT_DELAY)
1742 } else {
1743 // On zero consecutive failures, 0 delay
1744 Duration::from_millis(0)
1745 }
1746 }
1747
1748 async fn stream_height_events_with_retry(
1749 &self,
1750 tx: &mpsc::Sender<HeightStreamEvent>,
1751 ) -> Result<()> {
1752 let mut consecutive_failures = 0u32;
1753
1754 loop {
1755 // should always be able to creat a new es stream
1756 // something is wrong with the req builder otherwise
1757 let mut es = self.get_es_stream().context("get es stream")?;
1758
1759 let mut error = anyhow!("");
1760
1761 match Self::stream_height_events(&mut es, tx).await {
1762 Ok(received_an_event) => {
1763 if received_an_event {
1764 consecutive_failures = 0; // Reset after successful connection that then failed
1765 }
1766 log::trace!("Stream height exited");
1767 }
1768 Err(e) => {
1769 log::trace!("Stream height failed: {e:?}");
1770 error = e;
1771 }
1772 }
1773
1774 es.close();
1775
1776 // If the receiver is closed, exit the task
1777 if tx.is_closed() {
1778 break;
1779 }
1780
1781 let delay = Self::get_delay(consecutive_failures);
1782 log::trace!("Reconnecting in {:?}...", delay);
1783
1784 let error_msg = format!("{error:?}");
1785
1786 if tx
1787 .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1788 .await
1789 .is_err()
1790 {
1791 return Ok(()); // Receiver dropped, exit task
1792 }
1793 tokio::time::sleep(delay).await;
1794
1795 // increment consecutive failures so that on the next try
1796 // it will start using back offs
1797 consecutive_failures += 1;
1798 }
1799
1800 Ok(())
1801 }
1802
1803 /// Streams archive height updates from the server via Server-Sent Events.
1804 ///
1805 /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1806 /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1807 ///
1808 /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1809 /// and height updates. This allows applications to display connection status to users.
1810 ///
1811 /// # Returns
1812 /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1813 /// lifecycle and sends events through this channel.
1814 ///
1815 /// # Example
1816 /// ```
1817 /// # use hypersync_client::{Client, HeightStreamEvent};
1818 /// # async fn example() -> anyhow::Result<()> {
1819 /// let client = Client::builder()
1820 /// .url("https://eth.hypersync.xyz")
1821 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1822 /// .build()?;
1823 ///
1824 /// let mut rx = client.stream_height();
1825 ///
1826 /// while let Some(event) = rx.recv().await {
1827 /// match event {
1828 /// HeightStreamEvent::Connected => println!("Connected to stream"),
1829 /// HeightStreamEvent::Height(h) => println!("Height: {}", h),
1830 /// HeightStreamEvent::Reconnecting { delay, error_msg } => {
1831 /// println!("Reconnecting in {delay:?} due to error: {error_msg}")
1832 /// }
1833 /// }
1834 /// }
1835 /// # Ok(())
1836 /// # }
1837 /// ```
1838 pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1839 let (tx, rx) = mpsc::channel(16);
1840 let client = self.clone();
1841
1842 tokio::spawn(async move {
1843 if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1844 log::error!("Stream height failed unexpectedly: {e:?}");
1845 }
1846 });
1847
1848 rx
1849 }
1850}
1851
1852fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1853 if config.event_signature.is_some() {
1854 return Err(anyhow!(
1855 "config.event_signature can't be passed to simple type function. User is expected to \
1856 decode the logs using Decoder."
1857 ));
1858 }
1859 if config.column_mapping.is_some() {
1860 return Err(anyhow!(
1861 "config.column_mapping can't be passed to single type function. User is expected to \
1862 map values manually."
1863 ));
1864 }
1865
1866 Ok(())
1867}
1868
1869/// Used to indicate whether or not a retry should be attempted.
1870#[derive(Debug, thiserror::Error)]
1871pub enum HyperSyncResponseError {
1872 /// Means that the client should retry with a smaller block range.
1873 #[error("hypersync responded with 'payload too large' error")]
1874 PayloadTooLarge,
1875 /// Server responded with 429 Too Many Requests.
1876 #[error("rate limited by server. To increase your rate limits, upgrade your plan at https://app.envio.dev/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens")]
1877 RateLimited {
1878 /// Rate limit information from the 429 response headers.
1879 rate_limit: RateLimitInfo,
1880 },
1881 /// Any other server error.
1882 #[error(transparent)]
1883 Other(#[from] anyhow::Error),
1884}
1885
1886/// Result of a single Arrow query execution, before retry logic.
1887struct ArrowImplResponse {
1888 /// The parsed Arrow response data.
1889 response: ArrowResponse,
1890 /// Size of the response body in bytes.
1891 response_bytes: u64,
1892 /// Rate limit information parsed from response headers.
1893 rate_limit: RateLimitInfo,
1894}
1895
1896#[cfg(test)]
1897mod tests {
1898 use super::*;
1899 #[test]
1900 fn test_get_delay() {
1901 assert_eq!(
1902 Client::get_delay(0),
1903 Duration::from_millis(0),
1904 "starts with 0 delay"
1905 );
1906 // powers of 2 backoff
1907 assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1908 assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1909 assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1910 // maxes out at 30s
1911 assert_eq!(
1912 Client::get_delay(9),
1913 Duration::from_secs(30),
1914 "max delay is 30s"
1915 );
1916 assert_eq!(
1917 Client::get_delay(10),
1918 Duration::from_secs(30),
1919 "max delay is 30s"
1920 );
1921 }
1922
1923 #[tokio::test]
1924 #[ignore = "integration test requiring live hs server"]
1925 async fn test_http2_is_used() -> anyhow::Result<()> {
1926 let api_token = std::env::var("ENVIO_API_TOKEN")?;
1927 let client = reqwest::Client::builder()
1928 .no_gzip()
1929 .user_agent("hscr-test")
1930 .build()?;
1931
1932 let res = client
1933 .get("https://eth.hypersync.xyz/height")
1934 .bearer_auth(&api_token)
1935 .send()
1936 .await?;
1937
1938 assert_eq!(
1939 res.version(),
1940 reqwest::Version::HTTP_2,
1941 "expected HTTP/2 but got {:?}",
1942 res.version()
1943 );
1944 assert!(res.status().is_success());
1945 Ok(())
1946 }
1947
1948 #[tokio::test]
1949 #[ignore = "integration test with live hs server for height stream"]
1950 async fn test_stream_height_events() -> anyhow::Result<()> {
1951 let (tx, mut rx) = mpsc::channel(16);
1952 let handle = tokio::spawn(async move {
1953 let client = Client::builder()
1954 .url("https://monad-testnet.hypersync.xyz")
1955 .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1956 .build()?;
1957 let mut es = client.get_es_stream().context("get es stream")?;
1958 Client::stream_height_events(&mut es, &tx).await
1959 });
1960
1961 let val = rx.recv().await;
1962 assert!(val.is_some());
1963 assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
1964 let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
1965 panic!("should have received height")
1966 };
1967 let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
1968 panic!("should have received height")
1969 };
1970 assert!(height2 > height);
1971 drop(rx);
1972
1973 let res = handle.await.expect("should have joined");
1974 let received_an_event =
1975 res.expect("should have ended the stream gracefully after dropping rx");
1976 assert!(received_an_event, "should have received an event");
1977
1978 Ok(())
1979 }
1980}