hypersync_client/lib.rs
1#![deny(missing_docs)]
2// README.md is a symlink (ln -s ../README.md README.md) to the root workspace
3// README. This is needed so that both `cargo doc` and `cargo package` work:
4// - `cargo doc`: include_str! resolves the symlink to the root README.
5// - `cargo package`: cargo copies the symlink target into the tarball, so
6// include_str! finds README.md at the package root during verification.
7#![doc = include_str!("../README.md")]
8use std::time::Instant;
9use std::{sync::Arc, time::Duration};
10
11use anyhow::{anyhow, Context, Result};
12use futures::StreamExt;
13use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
14use reqwest::{Method, StatusCode};
15use reqwest_eventsource::retry::ExponentialBackoff;
16use reqwest_eventsource::{Event, EventSource};
17
18pub mod arrow_reader;
19mod column_mapping;
20mod config;
21mod decode;
22mod decode_call;
23mod from_arrow;
24pub mod metrics;
25mod parquet_out;
26mod parse_response;
27pub mod preset_query;
28mod rate_limit;
29mod rayon_async;
30pub mod simple_types;
31mod stream;
32mod types;
33mod util;
34
35pub use hypersync_format as format;
36pub use hypersync_net_types as net_types;
37pub use hypersync_schema as schema;
38
39use parse_response::parse_query_response;
40use tokio::sync::mpsc;
41use types::ResponseData;
42use url::Url;
43
44pub use column_mapping::{ColumnMapping, DataType};
45pub use config::HexOutput;
46pub use config::{ClientConfig, SerializationFormat, StreamConfig};
47pub use decode::Decoder;
48pub use decode_call::CallDecoder;
49pub use metrics::{
50 RequestKind, RequestStats, StreamMetrics, StreamObserver, StreamSummary, NUM_SIZE_BUCKETS,
51 SIZE_BUCKET_LABELS,
52};
53pub use rate_limit::RateLimitInfo;
54pub use types::{
55 ArrowResponse, ArrowResponseData, EventResponse, QueryResponse, RateLimitResponse,
56};
57
58use crate::parse_response::read_query_response;
59use crate::simple_types::InternalEventJoinStrategy;
60
61#[derive(Debug)]
62struct HttpClientWrapper {
63 /// Mutable state that needs to be refreshed periodically
64 inner: std::sync::Mutex<HttpClientWrapperInner>,
65 /// HyperSync server api token.
66 api_token: String,
67 /// Standard timeout for http requests.
68 timeout: Duration,
69 /// Pools are never idle since polling often happens on the client
70 /// so connections never timeout and dns lookups never happen again
71 /// this is problematic if the underlying ip address changes during
72 /// failovers on hypersync. Since reqwest doesn't implement this we
73 /// simply recreate the client every time
74 max_connection_age: Duration,
75 /// For refreshing the client
76 user_agent: String,
77}
78
79#[derive(Debug)]
80struct HttpClientWrapperInner {
81 /// Initialized reqwest instance for client url.
82 client: reqwest::Client,
83 /// Timestamp when the client was created
84 created_at: Instant,
85}
86
87impl HttpClientWrapper {
88 fn new(user_agent: String, api_token: String, timeout: Duration) -> Self {
89 let client = reqwest::Client::builder()
90 .no_gzip()
91 .user_agent(&user_agent)
92 .build()
93 .unwrap();
94
95 Self {
96 inner: std::sync::Mutex::new(HttpClientWrapperInner {
97 client,
98 created_at: Instant::now(),
99 }),
100 api_token,
101 timeout,
102 max_connection_age: Duration::from_secs(60),
103 user_agent,
104 }
105 }
106
107 fn get_client(&self) -> reqwest::Client {
108 let mut inner = self.inner.lock().expect("HttpClientWrapper mutex poisoned");
109
110 // Check if client needs refresh due to age
111 if inner.created_at.elapsed() > self.max_connection_age {
112 // Recreate client to force new DNS lookup for failover scenarios
113 inner.client = reqwest::Client::builder()
114 .no_gzip()
115 .user_agent(&self.user_agent)
116 .build()
117 .unwrap();
118 inner.created_at = Instant::now();
119 }
120
121 // Clone is cheap for reqwest::Client (it's Arc-wrapped internally)
122 inner.client.clone()
123 }
124
125 fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
126 let client = self.get_client();
127 client
128 .request(method, url)
129 .timeout(self.timeout)
130 .bearer_auth(&self.api_token)
131 }
132
133 fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
134 let client = self.get_client();
135 client.request(method, url).bearer_auth(&self.api_token)
136 }
137}
138
139/// Internal client state to handle http requests and retries.
140#[derive(Debug)]
141struct ClientInner {
142 http_client: HttpClientWrapper,
143 /// HyperSync server URL.
144 url: Url,
145 /// Number of retries to attempt before returning error.
146 max_num_retries: usize,
147 /// Milliseconds that would be used for retry backoff increasing.
148 retry_backoff_ms: u64,
149 /// Initial wait time for request backoff.
150 retry_base_ms: u64,
151 /// Ceiling time for request backoff.
152 retry_ceiling_ms: u64,
153 /// Query serialization format to use for HTTP requests.
154 serialization_format: SerializationFormat,
155 /// Most recently observed rate limit info from the server, paired with the
156 /// instant it was captured so elapsed time can be subtracted from `reset_secs`.
157 rate_limit_state: std::sync::Mutex<Option<(RateLimitInfo, Instant)>>,
158 /// Whether to proactively sleep when the rate limit is exhausted.
159 proactive_rate_limit_sleep: bool,
160}
161
162/// Client to handle http requests and retries.
163#[derive(Clone, Debug)]
164pub struct Client {
165 inner: Arc<ClientInner>,
166}
167
168impl Client {
169 /// Creates a new client with the given configuration.
170 ///
171 /// Configuration must include the `url` and `api_token` fields.
172 /// # Example
173 /// ```
174 /// use hypersync_client::{Client, ClientConfig};
175 ///
176 /// let config = ClientConfig {
177 /// url: "https://eth.hypersync.xyz".to_string(),
178 /// api_token: std::env::var("ENVIO_API_TOKEN")?,
179 /// ..Default::default()
180 /// };
181 /// let client = Client::new(config)?;
182 /// # Ok::<(), anyhow::Error>(())
183 /// ```
184 ///
185 /// # Errors
186 /// This method fails if the config is invalid.
187 pub fn new(cfg: ClientConfig) -> Result<Self> {
188 // hscr stands for hypersync client rust
189 cfg.validate().context("invalid ClientConfig")?;
190 let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
191 Self::new_internal(cfg, user_agent)
192 }
193
194 /// Creates a new client builder.
195 ///
196 /// # Example
197 /// ```
198 /// use hypersync_client::Client;
199 ///
200 /// let client = Client::builder()
201 /// .chain_id(1)
202 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
203 /// .build()
204 /// .unwrap();
205 /// ```
206 pub fn builder() -> ClientBuilder {
207 ClientBuilder::new()
208 }
209
210 #[doc(hidden)]
211 pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
212 // Creates a new client with the given configuration and custom user agent.
213 // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
214 Self::new_internal(cfg, user_agent.into())
215 }
216
217 /// Internal constructor that takes both config and user agent.
218 fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
219 let http_client = HttpClientWrapper::new(
220 user_agent,
221 cfg.api_token,
222 Duration::from_millis(cfg.http_req_timeout_millis),
223 );
224
225 let url = Url::parse(&cfg.url).context("url is malformed")?;
226
227 Ok(Self {
228 inner: Arc::new(ClientInner {
229 http_client,
230 url,
231 max_num_retries: cfg.max_num_retries,
232 retry_backoff_ms: cfg.retry_backoff_ms,
233 retry_base_ms: cfg.retry_base_ms,
234 retry_ceiling_ms: cfg.retry_ceiling_ms,
235 serialization_format: cfg.serialization_format,
236 rate_limit_state: std::sync::Mutex::new(None),
237 proactive_rate_limit_sleep: cfg.proactive_rate_limit_sleep,
238 }),
239 })
240 }
241
242 /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
243 /// query and stream configuration.
244 ///
245 /// ### Implementation
246 /// Runs multiple queries simultaneously based on config.concurrency.
247 ///
248 /// Each query runs until it reaches query.to, server height, any max_num_* query param,
249 /// or execution timed out by server.
250 ///
251 /// # ⚠️ Important Warning
252 ///
253 /// This method will continue executing until the query has run to completion from beginning
254 /// to the end of the block range defined in the query. For heavy queries with large block
255 /// ranges or high data volumes, consider:
256 ///
257 /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
258 /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
259 /// - Break large queries into smaller block ranges
260 ///
261 /// # Example
262 /// ```no_run
263 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
264 ///
265 /// # async fn example() -> anyhow::Result<()> {
266 /// let client = Client::builder()
267 /// .chain_id(1)
268 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
269 /// .build()?;
270 ///
271 /// // Query ERC20 transfer events
272 /// let query = Query::new()
273 /// .from_block(19000000)
274 /// .to_block_excl(19000010)
275 /// .where_logs(
276 /// LogFilter::all()
277 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
278 /// )
279 /// .select_log_fields([LogField::Address, LogField::Data]);
280 /// let response = client.collect(query, StreamConfig::default()).await?;
281 ///
282 /// println!("Collected {} events", response.data.logs.len());
283 /// # Ok(())
284 /// # }
285 /// ```
286 pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
287 check_simple_stream_params(&config)?;
288
289 let mut recv = stream::stream_arrow(self, query, config)
290 .await
291 .context("start stream")?;
292
293 let mut data = ResponseData::default();
294 let mut archive_height = None;
295 let mut next_block = 0;
296 let mut total_execution_time = 0;
297
298 while let Some(res) = recv.recv().await {
299 let res = res.context("get response")?;
300 let res: QueryResponse =
301 QueryResponse::try_from(&res).context("convert arrow response")?;
302
303 for batch in res.data.blocks {
304 data.blocks.push(batch);
305 }
306 for batch in res.data.transactions {
307 data.transactions.push(batch);
308 }
309 for batch in res.data.logs {
310 data.logs.push(batch);
311 }
312 for batch in res.data.traces {
313 data.traces.push(batch);
314 }
315
316 archive_height = res.archive_height;
317 next_block = res.next_block;
318 total_execution_time += res.total_execution_time
319 }
320
321 Ok(QueryResponse {
322 archive_height,
323 next_block,
324 total_execution_time,
325 data,
326 rollback_guard: None,
327 })
328 }
329
330 /// Retrieves events through a stream using the provided query and stream configuration.
331 ///
332 /// # ⚠️ Important Warning
333 ///
334 /// This method will continue executing until the query has run to completion from beginning
335 /// to the end of the block range defined in the query. For heavy queries with large block
336 /// ranges or high data volumes, consider:
337 ///
338 /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
339 /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
340 /// - Break large queries into smaller block ranges
341 ///
342 /// # Example
343 /// ```no_run
344 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
345 ///
346 /// # async fn example() -> anyhow::Result<()> {
347 /// let client = Client::builder()
348 /// .chain_id(1)
349 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
350 /// .build()?;
351 ///
352 /// // Query transactions to a specific address
353 /// let query = Query::new()
354 /// .from_block(19000000)
355 /// .to_block_excl(19000100)
356 /// .where_transactions(
357 /// TransactionFilter::all()
358 /// .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
359 /// )
360 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
361 /// let response = client.collect_events(query, StreamConfig::default()).await?;
362 ///
363 /// println!("Collected {} events", response.data.len());
364 /// # Ok(())
365 /// # }
366 /// ```
367 pub async fn collect_events(
368 &self,
369 mut query: Query,
370 config: StreamConfig,
371 ) -> Result<EventResponse> {
372 check_simple_stream_params(&config)?;
373
374 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
375 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
376
377 let mut recv = stream::stream_arrow(self, query, config)
378 .await
379 .context("start stream")?;
380
381 let mut data = Vec::new();
382 let mut archive_height = None;
383 let mut next_block = 0;
384 let mut total_execution_time = 0;
385
386 while let Some(res) = recv.recv().await {
387 let res = res.context("get response")?;
388 let res: QueryResponse =
389 QueryResponse::try_from(&res).context("convert arrow response")?;
390 let events = event_join_strategy.join_from_response_data(res.data);
391
392 data.extend(events);
393
394 archive_height = res.archive_height;
395 next_block = res.next_block;
396 total_execution_time += res.total_execution_time
397 }
398
399 Ok(EventResponse {
400 archive_height,
401 next_block,
402 total_execution_time,
403 data,
404 rollback_guard: None,
405 })
406 }
407
408 /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
409 /// the provided query and stream configuration.
410 ///
411 /// Returns data in Apache Arrow format for high-performance columnar processing.
412 /// Useful for analytics workloads or when working with Arrow-compatible tools.
413 ///
414 /// # ⚠️ Important Warning
415 ///
416 /// This method will continue executing until the query has run to completion from beginning
417 /// to the end of the block range defined in the query. For heavy queries with large block
418 /// ranges or high data volumes, consider:
419 ///
420 /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
421 /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
422 /// - Break large queries into smaller block ranges
423 ///
424 /// # Example
425 /// ```no_run
426 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
427 ///
428 /// # async fn example() -> anyhow::Result<()> {
429 /// let client = Client::builder()
430 /// .chain_id(1)
431 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
432 /// .build()?;
433 ///
434 /// // Get block data in Arrow format for analytics
435 /// let query = Query::new()
436 /// .from_block(19000000)
437 /// .to_block_excl(19000100)
438 /// .include_all_blocks()
439 /// .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
440 /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
441 ///
442 /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
443 /// # Ok(())
444 /// # }
445 /// ```
446 pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
447 let mut recv = stream::stream_arrow(self, query, config)
448 .await
449 .context("start stream")?;
450
451 let mut data = ArrowResponseData::default();
452 let mut archive_height = None;
453 let mut next_block = 0;
454 let mut total_execution_time = 0;
455
456 while let Some(res) = recv.recv().await {
457 let res = res.context("get response")?;
458
459 for batch in res.data.blocks {
460 data.blocks.push(batch);
461 }
462 for batch in res.data.transactions {
463 data.transactions.push(batch);
464 }
465 for batch in res.data.logs {
466 data.logs.push(batch);
467 }
468 for batch in res.data.traces {
469 data.traces.push(batch);
470 }
471 for batch in res.data.decoded_logs {
472 data.decoded_logs.push(batch);
473 }
474
475 archive_height = res.archive_height;
476 next_block = res.next_block;
477 total_execution_time += res.total_execution_time
478 }
479
480 Ok(ArrowResponse {
481 archive_height,
482 next_block,
483 total_execution_time,
484 data,
485 rollback_guard: None,
486 })
487 }
488
489 /// Writes parquet file getting data through a stream using the provided path, query,
490 /// and stream configuration.
491 ///
492 /// Streams data directly to a Parquet file for efficient storage and later analysis.
493 /// Perfect for data exports or ETL pipelines.
494 ///
495 /// # ⚠️ Important Warning
496 ///
497 /// This method will continue executing until the query has run to completion from beginning
498 /// to the end of the block range defined in the query. For heavy queries with large block
499 /// ranges or high data volumes, consider:
500 ///
501 /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
502 /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
503 /// - Break large queries into smaller block ranges
504 ///
505 /// # Example
506 /// ```no_run
507 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
508 ///
509 /// # async fn example() -> anyhow::Result<()> {
510 /// let client = Client::builder()
511 /// .chain_id(1)
512 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
513 /// .build()?;
514 ///
515 /// // Export all DEX trades to Parquet for analysis
516 /// let query = Query::new()
517 /// .from_block(19000000)
518 /// .to_block_excl(19010000)
519 /// .where_logs(
520 /// LogFilter::all()
521 /// .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
522 /// )
523 /// .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
524 /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
525 ///
526 /// println!("Trade data exported to trades.parquet");
527 /// # Ok(())
528 /// # }
529 /// ```
530 pub async fn collect_parquet(
531 &self,
532 path: &str,
533 query: Query,
534 config: StreamConfig,
535 ) -> Result<()> {
536 parquet_out::collect_parquet(self, path, query, config).await
537 }
538
539 /// Internal implementation of getting chain_id from server
540 async fn get_chain_id_impl(&self) -> Result<u64> {
541 let mut url = self.inner.url.clone();
542 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
543 segments.push("chain_id");
544 std::mem::drop(segments);
545 let req = self.inner.http_client.request(Method::GET, url);
546
547 let res = req.send().await.context("execute http req")?;
548
549 let status = res.status();
550 if !status.is_success() {
551 return Err(anyhow!("http response status code {}", status));
552 }
553
554 let chain_id: ChainId = res.json().await.context("read response body json")?;
555
556 Ok(chain_id.chain_id)
557 }
558
559 /// Internal implementation of getting height from server
560 async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
561 let mut url = self.inner.url.clone();
562 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
563 segments.push("height");
564 std::mem::drop(segments);
565 let mut req = self.inner.http_client.request(Method::GET, url);
566 if let Some(http_timeout_override) = http_timeout_override {
567 req = req.timeout(http_timeout_override);
568 }
569
570 let res = req.send().await.context("execute http req")?;
571
572 let status = res.status();
573 if !status.is_success() {
574 return Err(anyhow!("http response status code {}", status));
575 }
576
577 let height: ArchiveHeight = res.json().await.context("read response body json")?;
578
579 Ok(height.height.unwrap_or(0))
580 }
581
582 /// Get the chain_id from the server with retries.
583 ///
584 /// # Example
585 /// ```no_run
586 /// use hypersync_client::Client;
587 ///
588 /// # async fn example() -> anyhow::Result<()> {
589 /// let client = Client::builder()
590 /// .chain_id(1)
591 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
592 /// .build()?;
593 ///
594 /// let chain_id = client.get_chain_id().await?;
595 /// println!("Connected to chain ID: {}", chain_id);
596 /// # Ok(())
597 /// # }
598 /// ```
599 pub async fn get_chain_id(&self) -> Result<u64> {
600 let mut base = self.inner.retry_base_ms;
601
602 let mut err = anyhow!("");
603
604 for _ in 0..self.inner.max_num_retries + 1 {
605 match self.get_chain_id_impl().await {
606 Ok(res) => return Ok(res),
607 Err(e) => {
608 log::error!(
609 "failed to get chain_id from server, retrying... The error was: {e:?}"
610 );
611 err = err.context(format!("{e:?}"));
612 }
613 }
614
615 let base_ms = Duration::from_millis(base);
616 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
617 rand::random(),
618 self.inner.retry_backoff_ms,
619 ));
620
621 tokio::time::sleep(base_ms + jitter).await;
622
623 base = std::cmp::min(
624 base + self.inner.retry_backoff_ms,
625 self.inner.retry_ceiling_ms,
626 );
627 }
628
629 Err(err)
630 }
631
632 /// Get the height of from server with retries.
633 ///
634 /// # Example
635 /// ```no_run
636 /// use hypersync_client::Client;
637 ///
638 /// # async fn example() -> anyhow::Result<()> {
639 /// let client = Client::builder()
640 /// .chain_id(1)
641 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
642 /// .build()?;
643 ///
644 /// let height = client.get_height().await?;
645 /// println!("Current block height: {}", height);
646 /// # Ok(())
647 /// # }
648 /// ```
649 pub async fn get_height(&self) -> Result<u64> {
650 let mut base = self.inner.retry_base_ms;
651
652 let mut err = anyhow!("");
653
654 for _ in 0..self.inner.max_num_retries + 1 {
655 match self.get_height_impl(None).await {
656 Ok(res) => return Ok(res),
657 Err(e) => {
658 log::error!(
659 "failed to get height from server, retrying... The error was: {e:?}"
660 );
661 err = err.context(format!("{e:?}"));
662 }
663 }
664
665 let base_ms = Duration::from_millis(base);
666 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
667 rand::random(),
668 self.inner.retry_backoff_ms,
669 ));
670
671 tokio::time::sleep(base_ms + jitter).await;
672
673 base = std::cmp::min(
674 base + self.inner.retry_backoff_ms,
675 self.inner.retry_ceiling_ms,
676 );
677 }
678
679 Err(err)
680 }
681
682 /// Get the height of the Client instance for health checks.
683 ///
684 /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
685 ///
686 /// # Example
687 /// ```no_run
688 /// use hypersync_client::Client;
689 /// use std::time::Duration;
690 ///
691 /// # async fn example() -> anyhow::Result<()> {
692 /// let client = Client::builder()
693 /// .chain_id(1)
694 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
695 /// .build()?;
696 ///
697 /// // Quick health check with 5 second timeout
698 /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
699 /// println!("Server is healthy at block: {}", height);
700 /// # Ok(())
701 /// # }
702 /// ```
703 pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
704 self.get_height_impl(http_req_timeout).await
705 }
706
707 /// Executes query with retries and returns the response.
708 ///
709 /// # Example
710 /// ```no_run
711 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
712 ///
713 /// # async fn example() -> anyhow::Result<()> {
714 /// let client = Client::builder()
715 /// .chain_id(1)
716 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
717 /// .build()?;
718 ///
719 /// // Query all blocks from a specific range
720 /// let query = Query::new()
721 /// .from_block(19000000)
722 /// .to_block_excl(19000010)
723 /// .include_all_blocks()
724 /// .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
725 /// let response = client.get(&query).await?;
726 ///
727 /// println!("Retrieved {} blocks", response.data.blocks.len());
728 /// # Ok(())
729 /// # }
730 /// ```
731 pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
732 let arrow_response = self.get_arrow(query).await.context("get data")?;
733 let converted =
734 QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
735 Ok(converted)
736 }
737
738 /// Add block, transaction and log fields selection to the query, executes it with retries
739 /// and returns the response.
740 ///
741 /// This method automatically joins blocks, transactions, and logs into unified events,
742 /// making it easier to work with related blockchain data.
743 ///
744 /// # Example
745 /// ```no_run
746 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
747 ///
748 /// # async fn example() -> anyhow::Result<()> {
749 /// let client = Client::builder()
750 /// .chain_id(1)
751 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
752 /// .build()?;
753 ///
754 /// // Query ERC20 transfers with transaction context
755 /// let query = Query::new()
756 /// .from_block(19000000)
757 /// .to_block_excl(19000010)
758 /// .where_logs(
759 /// LogFilter::all()
760 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
761 /// )
762 /// .select_log_fields([LogField::Address, LogField::Data])
763 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
764 /// let response = client.get_events(query).await?;
765 ///
766 /// println!("Retrieved {} joined events", response.data.len());
767 /// # Ok(())
768 /// # }
769 /// ```
770 pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
771 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
772 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
773 let arrow_response = self.get_arrow(&query).await.context("get data")?;
774 EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
775 }
776
777 /// Executes query once and returns the result using JSON serialization.
778 async fn get_arrow_impl_json(
779 &self,
780 query: &Query,
781 ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
782 let mut url = self.inner.url.clone();
783 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
784 segments.push("query");
785 segments.push("arrow-ipc");
786 std::mem::drop(segments);
787 let req = self.inner.http_client.request(Method::POST, url);
788
789 let res = req.json(&query).send().await.context("execute http req")?;
790
791 let status = res.status();
792 let rate_limit = RateLimitInfo::from_response(&res);
793
794 if status == StatusCode::PAYLOAD_TOO_LARGE {
795 return Err(HyperSyncResponseError::PayloadTooLarge);
796 }
797 if status == StatusCode::TOO_MANY_REQUESTS {
798 return Err(HyperSyncResponseError::RateLimited { rate_limit });
799 }
800 if !status.is_success() {
801 let text = res.text().await.context("read text to see error")?;
802
803 return Err(HyperSyncResponseError::Other(anyhow!(
804 "http response status code {}, err body: {}",
805 status,
806 text
807 )));
808 }
809
810 let bytes = res.bytes().await.context("read response body bytes")?;
811
812 let res = tokio::task::block_in_place(|| {
813 parse_query_response(&bytes).context("parse query response")
814 })?;
815
816 Ok(ArrowImplResponse {
817 response: res,
818 response_bytes: bytes.len().try_into().unwrap(),
819 rate_limit,
820 })
821 }
822
823 fn should_cache_queries(&self) -> bool {
824 matches!(
825 self.inner.serialization_format,
826 SerializationFormat::CapnProto {
827 should_cache_queries: true
828 }
829 )
830 }
831
832 /// Executes query once and returns the result using Cap'n Proto serialization.
833 async fn get_arrow_impl_capnp(
834 &self,
835 query: &Query,
836 ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
837 let mut url = self.inner.url.clone();
838 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
839 segments.push("query");
840 segments.push("arrow-ipc");
841 segments.push("capnp");
842 std::mem::drop(segments);
843
844 let should_cache = self.should_cache_queries();
845
846 if should_cache {
847 let query_with_id = {
848 let mut message = capnp::message::Builder::new_default();
849 let mut request_builder =
850 message.init_root::<hypersync_net_types_capnp::request::Builder>();
851
852 request_builder
853 .build_query_id_from_query(query)
854 .context("build query id")?;
855 let mut query_with_id = Vec::new();
856 capnp::serialize_packed::write_message(&mut query_with_id, &message)
857 .context("write capnp message")?;
858 query_with_id
859 };
860
861 let req = self.inner.http_client.request(Method::POST, url.clone());
862
863 let res = req
864 .body(query_with_id)
865 .send()
866 .await
867 .context("execute http req")?;
868
869 let status = res.status();
870 let rate_limit = RateLimitInfo::from_response(&res);
871
872 if status == StatusCode::PAYLOAD_TOO_LARGE {
873 return Err(HyperSyncResponseError::PayloadTooLarge);
874 }
875 if status == StatusCode::TOO_MANY_REQUESTS {
876 return Err(HyperSyncResponseError::RateLimited { rate_limit });
877 }
878 if status.is_success() {
879 let bytes = res.bytes().await.context("read response body bytes")?;
880
881 let mut opts = capnp::message::ReaderOptions::new();
882 opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
883 let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
884 .context("create message reader")?;
885 let query_response = message_reader
886 .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
887 .context("get cached_query_response root")?;
888 match query_response.get_either().which().context("get either")? {
889 hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
890 query_response,
891 ) => {
892 let res = tokio::task::block_in_place(|| {
893 let res = query_response?;
894 read_query_response(&res).context("parse query response cached")
895 })?;
896 return Ok(ArrowImplResponse {
897 response: res,
898 response_bytes: bytes.len().try_into().unwrap(),
899 rate_limit,
900 });
901 }
902 hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
903 log::trace!("query was not cached, retrying with full query");
904 }
905 }
906 } else {
907 let text = res.text().await.context("read text to see error")?;
908 log::warn!(
909 "Failed cache query, will retry full query. {}, err body: {}",
910 status,
911 text
912 );
913 }
914 };
915
916 let full_query_bytes = {
917 let mut message = capnp::message::Builder::new_default();
918 let mut query_builder =
919 message.init_root::<hypersync_net_types_capnp::request::Builder>();
920
921 query_builder
922 .build_full_query_from_query(query, should_cache)
923 .context("build full query")?;
924 let mut bytes = Vec::new();
925 capnp::serialize_packed::write_message(&mut bytes, &message)
926 .context("write full query capnp message")?;
927 bytes
928 };
929
930 let req = self.inner.http_client.request(Method::POST, url);
931
932 let res = req
933 .body(full_query_bytes)
934 .send()
935 .await
936 .context("execute http req")?;
937
938 let status = res.status();
939 let rate_limit = RateLimitInfo::from_response(&res);
940
941 if status == StatusCode::PAYLOAD_TOO_LARGE {
942 return Err(HyperSyncResponseError::PayloadTooLarge);
943 }
944 if status == StatusCode::TOO_MANY_REQUESTS {
945 return Err(HyperSyncResponseError::RateLimited { rate_limit });
946 }
947 if !status.is_success() {
948 let text = res.text().await.context("read text to see error")?;
949
950 return Err(HyperSyncResponseError::Other(anyhow!(
951 "http response status code {}, err body: {}",
952 status,
953 text
954 )));
955 }
956
957 let bytes = res.bytes().await.context("read response body bytes")?;
958
959 let res = tokio::task::block_in_place(|| {
960 parse_query_response(&bytes).context("parse query response")
961 })?;
962
963 Ok(ArrowImplResponse {
964 response: res,
965 response_bytes: bytes.len().try_into().unwrap(),
966 rate_limit,
967 })
968 }
969
970 /// Executes query once and returns the result, handling payload-too-large by halving block range.
971 async fn get_arrow_impl(
972 &self,
973 query: &Query,
974 ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
975 let mut query = query.clone();
976 loop {
977 let res = match self.inner.serialization_format {
978 SerializationFormat::Json => self.get_arrow_impl_json(&query).await,
979 SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(&query).await,
980 };
981 match res {
982 Ok(res) => return Ok(res),
983 Err(
984 e @ (HyperSyncResponseError::Other(_)
985 | HyperSyncResponseError::RateLimited { .. }),
986 ) => return Err(e),
987 Err(HyperSyncResponseError::PayloadTooLarge) => {
988 let block_range = if let Some(to_block) = query.to_block {
989 let current = to_block - query.from_block;
990 if current < 2 {
991 return Err(HyperSyncResponseError::Other(anyhow!(
992 "Payload is too large and query is using the minimum block range."
993 )));
994 }
995 // Half the current block range
996 current / 2
997 } else {
998 200
999 };
1000 let to_block = query.from_block + block_range;
1001 query.to_block = Some(to_block);
1002
1003 log::trace!(
1004 "Payload is too large, retrying with block range from: {} to: {}",
1005 query.from_block,
1006 to_block
1007 );
1008 }
1009 }
1010 }
1011 }
1012
1013 /// Executes query with retries and returns the response in Arrow format.
1014 pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
1015 const WAIT_ON_RATE_LIMIT: bool = true;
1016 self.get_arrow_with_size(query, WAIT_ON_RATE_LIMIT)
1017 .await
1018 .map(|res| res.response)
1019 }
1020
1021 /// Internal implementation for get_arrow.
1022 ///
1023 /// When `wait_on_rate_limit` is `false`, a 429 response is returned
1024 /// immediately with the rate limit info instead of being retried.
1025 async fn get_arrow_with_size(
1026 &self,
1027 query: &Query,
1028 wait_on_rate_limit: bool,
1029 ) -> Result<ArrowImplResponse> {
1030 let mut base = self.inner.retry_base_ms;
1031
1032 let mut err = anyhow!("");
1033
1034 if self.inner.proactive_rate_limit_sleep {
1035 if wait_on_rate_limit {
1036 self.wait_for_rate_limit().await;
1037 } else if let Some(rate_limit) = self.get_proactive_rate_limit_info() {
1038 return Err(anyhow::anyhow!(HyperSyncResponseError::RateLimited {
1039 rate_limit
1040 }));
1041 }
1042 }
1043
1044 for _ in 0..self.inner.max_num_retries + 1 {
1045 match self.get_arrow_impl(query).await {
1046 Ok(res) => {
1047 self.update_rate_limit_state(&res.rate_limit);
1048 return Ok(res);
1049 }
1050 Err(HyperSyncResponseError::RateLimited { rate_limit }) => {
1051 self.update_rate_limit_state(&rate_limit);
1052 if !wait_on_rate_limit {
1053 return Err(anyhow::anyhow!(HyperSyncResponseError::RateLimited {
1054 rate_limit
1055 }));
1056 }
1057 let wait_secs = rate_limit.suggested_wait_secs().unwrap_or(1) + 1;
1058 log::warn!(
1059 "rate limited by server ({rate_limit}), waiting {wait_secs}s before retry. To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens"
1060 );
1061 err = err.context(format!("rate limited by server ({rate_limit}). To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens"));
1062 tokio::time::sleep(Duration::from_secs(wait_secs)).await;
1063 continue;
1064 }
1065 Err(HyperSyncResponseError::Other(e)) => {
1066 log::error!(
1067 "failed to get arrow data from server, retrying... The error was: {e:?}"
1068 );
1069 err = err.context(format!("{e:?}"));
1070 }
1071 Err(HyperSyncResponseError::PayloadTooLarge) => {
1072 // This shouldn't happen since get_arrow_impl handles it, but just in case
1073 log::error!("unexpected PayloadTooLarge from get_arrow_impl, retrying...");
1074 err = err.context("unexpected PayloadTooLarge");
1075 }
1076 }
1077
1078 let base_ms = Duration::from_millis(base);
1079 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
1080 rand::random(),
1081 self.inner.retry_backoff_ms,
1082 ));
1083
1084 tokio::time::sleep(base_ms + jitter).await;
1085
1086 base = std::cmp::min(
1087 base + self.inner.retry_backoff_ms,
1088 self.inner.retry_ceiling_ms,
1089 );
1090 }
1091
1092 Err(err)
1093 }
1094
1095 /// Spawns task to execute query and return data via a channel.
1096 ///
1097 /// # Example
1098 /// ```no_run
1099 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
1100 ///
1101 /// # async fn example() -> anyhow::Result<()> {
1102 /// let client = Client::builder()
1103 /// .chain_id(1)
1104 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1105 /// .build()?;
1106 ///
1107 /// // Stream all ERC20 transfer events
1108 /// let query = Query::new()
1109 /// .from_block(19000000)
1110 /// .where_logs(
1111 /// LogFilter::all()
1112 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1113 /// )
1114 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
1115 /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
1116 ///
1117 /// while let Some(response) = receiver.recv().await {
1118 /// let response = response?;
1119 /// println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
1120 /// }
1121 /// # Ok(())
1122 /// # }
1123 /// ```
1124 pub async fn stream(
1125 &self,
1126 query: Query,
1127 config: StreamConfig,
1128 ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1129 check_simple_stream_params(&config)?;
1130
1131 let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1132 mpsc::channel(config.concurrency);
1133
1134 let mut inner_rx = self
1135 .stream_arrow(query, config)
1136 .await
1137 .context("start inner stream")?;
1138
1139 tokio::spawn(async move {
1140 while let Some(resp) = inner_rx.recv().await {
1141 let msg = resp
1142 .context("inner receiver")
1143 .and_then(|r| QueryResponse::try_from(&r));
1144 let is_err = msg.is_err();
1145 if tx.send(msg).await.is_err() || is_err {
1146 return;
1147 }
1148 }
1149 });
1150
1151 Ok(rx)
1152 }
1153
1154 /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1155 /// returning data via a channel.
1156 ///
1157 /// This method automatically joins blocks, transactions, and logs into unified events,
1158 /// then streams them via a channel for real-time processing.
1159 ///
1160 /// # Example
1161 /// ```no_run
1162 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1163 ///
1164 /// # async fn example() -> anyhow::Result<()> {
1165 /// let client = Client::builder()
1166 /// .chain_id(1)
1167 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1168 /// .build()?;
1169 ///
1170 /// // Stream NFT transfer events with transaction context
1171 /// let query = Query::new()
1172 /// .from_block(19000000)
1173 /// .where_logs(
1174 /// LogFilter::all()
1175 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1176 /// )
1177 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1178 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1179 /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1180 ///
1181 /// while let Some(response) = receiver.recv().await {
1182 /// let response = response?;
1183 /// println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1184 /// }
1185 /// # Ok(())
1186 /// # }
1187 /// ```
1188 pub async fn stream_events(
1189 &self,
1190 mut query: Query,
1191 config: StreamConfig,
1192 ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1193 check_simple_stream_params(&config)?;
1194
1195 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1196
1197 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1198
1199 let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1200 mpsc::channel(config.concurrency);
1201
1202 let mut inner_rx = self
1203 .stream_arrow(query, config)
1204 .await
1205 .context("start inner stream")?;
1206
1207 tokio::spawn(async move {
1208 while let Some(resp) = inner_rx.recv().await {
1209 let msg = resp
1210 .context("inner receiver")
1211 .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1212 let is_err = msg.is_err();
1213 if tx.send(msg).await.is_err() || is_err {
1214 return;
1215 }
1216 }
1217 });
1218
1219 Ok(rx)
1220 }
1221
1222 /// Spawns task to execute query and return data via a channel in Arrow format.
1223 ///
1224 /// Returns raw Apache Arrow data via a channel for high-performance processing.
1225 /// Ideal for applications that need to work directly with columnar data.
1226 ///
1227 /// # Example
1228 /// ```no_run
1229 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1230 ///
1231 /// # async fn example() -> anyhow::Result<()> {
1232 /// let client = Client::builder()
1233 /// .chain_id(1)
1234 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1235 /// .build()?;
1236 ///
1237 /// // Stream transaction data in Arrow format for analytics
1238 /// let query = Query::new()
1239 /// .from_block(19000000)
1240 /// .to_block_excl(19000100)
1241 /// .where_transactions(
1242 /// TransactionFilter::all()
1243 /// .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1244 /// )
1245 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1246 /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1247 ///
1248 /// while let Some(response) = receiver.recv().await {
1249 /// let response = response?;
1250 /// println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1251 /// }
1252 /// # Ok(())
1253 /// # }
1254 /// ```
1255 pub async fn stream_arrow(
1256 &self,
1257 query: Query,
1258 config: StreamConfig,
1259 ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1260 stream::stream_arrow(self, query, config).await
1261 }
1262
1263 /// Like [`stream_arrow`](Self::stream_arrow), but reports streaming metrics to
1264 /// the given [`StreamObserver`].
1265 ///
1266 /// The engine calls [`StreamObserver::on_request`] for every completed HTTP
1267 /// request, [`StreamObserver::on_progress`] once per scheduler iteration, and
1268 /// [`StreamObserver::on_finish`] with the final [`StreamSummary`]. Pass an
1269 /// `Arc<`[`StreamMetrics`]`>` for a ready-made aggregate handle you can read
1270 /// during or after the stream, or implement the trait for custom exporters.
1271 ///
1272 /// This is the only entry point that does any metrics work; the plain
1273 /// `stream*` methods have zero observability overhead.
1274 ///
1275 /// [`StreamMetrics`]: crate::StreamMetrics
1276 /// [`StreamObserver`]: crate::StreamObserver
1277 /// [`StreamSummary`]: crate::StreamSummary
1278 pub async fn stream_arrow_with_observer(
1279 &self,
1280 query: Query,
1281 config: StreamConfig,
1282 observer: Arc<dyn crate::StreamObserver>,
1283 ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1284 stream::stream_arrow_with_observer(self, query, config, observer).await
1285 }
1286
1287 /// Executes query and returns the response in Arrow format along with
1288 /// rate limit information from the server.
1289 ///
1290 /// Unlike [`get_arrow`](Self::get_arrow), this method does **not** retry on
1291 /// HTTP 429 responses. Instead it returns
1292 /// [`RateLimitResponse::RateLimited`] so the caller can implement their own
1293 /// back-off. Other transient errors are still retried normally.
1294 pub async fn get_arrow_with_rate_limit(
1295 &self,
1296 query: &Query,
1297 ) -> Result<RateLimitResponse<ArrowResponseData>> {
1298 const WAIT_ON_RATE_LIMIT: bool = false;
1299 match self.get_arrow_with_size(query, WAIT_ON_RATE_LIMIT).await {
1300 Ok(result) => Ok(RateLimitResponse::Success {
1301 response: result.response,
1302 rate_limit: result.rate_limit,
1303 }),
1304 Err(e) => match e.downcast::<HyperSyncResponseError>() {
1305 Ok(HyperSyncResponseError::RateLimited { rate_limit }) => {
1306 Ok(RateLimitResponse::RateLimited(rate_limit))
1307 }
1308 Ok(other) => Err(other.into()),
1309 Err(e) => Err(e),
1310 },
1311 }
1312 }
1313
1314 /// Executes query and returns the response along with
1315 /// rate limit information from the server.
1316 ///
1317 /// Unlike [`get`](Self::get), this method does **not** retry on HTTP 429
1318 /// responses. Instead it returns
1319 /// [`RateLimitResponse::RateLimited`] so the caller can implement their own
1320 /// back-off. Other transient errors are still retried normally.
1321 pub async fn get_with_rate_limit(
1322 &self,
1323 query: &Query,
1324 ) -> Result<RateLimitResponse<ResponseData>> {
1325 match self.get_arrow_with_rate_limit(query).await? {
1326 RateLimitResponse::Success {
1327 response,
1328 rate_limit,
1329 } => {
1330 let converted =
1331 QueryResponse::try_from(&response).context("convert arrow response")?;
1332 Ok(RateLimitResponse::Success {
1333 response: converted,
1334 rate_limit,
1335 })
1336 }
1337 RateLimitResponse::RateLimited(info) => Ok(RateLimitResponse::RateLimited(info)),
1338 }
1339 }
1340
1341 /// Executes query and returns joined events along with rate limit
1342 /// information from the server.
1343 ///
1344 /// Unlike [`get_events`](Self::get_events), this method does **not** retry
1345 /// on HTTP 429 responses. Instead it returns
1346 /// [`RateLimitResponse::RateLimited`] so the caller can implement their own
1347 /// back-off. Other transient errors are still retried normally.
1348 pub async fn get_events_with_rate_limit(
1349 &self,
1350 mut query: Query,
1351 ) -> Result<RateLimitResponse<Vec<simple_types::Event>>> {
1352 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1353 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1354 match self.get_arrow_with_rate_limit(&query).await? {
1355 RateLimitResponse::Success {
1356 response,
1357 rate_limit,
1358 } => {
1359 let converted =
1360 EventResponse::try_from_arrow_response(&response, &event_join_strategy)?;
1361 Ok(RateLimitResponse::Success {
1362 response: converted,
1363 rate_limit,
1364 })
1365 }
1366 RateLimitResponse::RateLimited(info) => Ok(RateLimitResponse::RateLimited(info)),
1367 }
1368 }
1369
1370 /// Returns the most recently observed rate limit information, if any.
1371 ///
1372 /// Updated after every request (including inside streams). Returns `None`
1373 /// if no requests have been made yet or the server hasn't returned rate limit headers.
1374 pub fn rate_limit_info(&self) -> Option<RateLimitInfo> {
1375 self.inner
1376 .rate_limit_state
1377 .lock()
1378 .expect("rate_limit_state mutex poisoned")
1379 .as_ref()
1380 .map(|(info, _captured_at)| info.clone())
1381 }
1382
1383 /// Returns the current rate limit info if the client is known to be rate-limited
1384 /// and the reset window has not yet elapsed.
1385 fn get_proactive_rate_limit_info(&self) -> Option<RateLimitInfo> {
1386 let state = self
1387 .inner
1388 .rate_limit_state
1389 .lock()
1390 .expect("rate_limit_state mutex poisoned");
1391 match state.as_ref() {
1392 Some((info, captured_at)) if info.is_rate_limited() => {
1393 let remaining_wait = info.suggested_wait_secs().map(|secs| {
1394 let elapsed = captured_at.elapsed().as_secs();
1395 secs.saturating_sub(elapsed)
1396 });
1397 if remaining_wait.unwrap_or(0) > 0 {
1398 Some(info.clone())
1399 } else {
1400 None
1401 }
1402 }
1403 _ => None,
1404 }
1405 }
1406
1407 /// Waits until the current rate limit window resets, if the client is rate limited.
1408 ///
1409 /// Returns immediately if:
1410 /// - No rate limit information has been observed yet
1411 /// - There is remaining quota in the current window
1412 ///
1413 /// This method is useful for consumers who want to explicitly wait before making
1414 /// requests, for example when coordinating rate limits across multiple systems.
1415 pub async fn wait_for_rate_limit(&self) {
1416 if let Some(info) = self.get_proactive_rate_limit_info() {
1417 let secs = info.suggested_wait_secs().unwrap_or(0);
1418 if secs > 0 {
1419 log::warn!(
1420 "rate limit exhausted ({info}), proactively waiting {secs}s for window reset. To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens"
1421 );
1422 tokio::time::sleep(Duration::from_secs(secs)).await;
1423 }
1424 }
1425 }
1426
1427 /// Updates the internally tracked rate limit state with the current timestamp.
1428 fn update_rate_limit_state(&self, rate_limit: &RateLimitInfo) {
1429 // Only update if the response actually contained rate limit headers
1430 if rate_limit.limit.is_some()
1431 || rate_limit.remaining.is_some()
1432 || rate_limit.reset_secs.is_some()
1433 {
1434 let mut state = self
1435 .inner
1436 .rate_limit_state
1437 .lock()
1438 .expect("rate_limit_state mutex poisoned");
1439 *state = Some((rate_limit.clone(), Instant::now()));
1440 }
1441 }
1442
1443 /// Getter for url field.
1444 ///
1445 /// # Example
1446 /// ```
1447 /// use hypersync_client::Client;
1448 ///
1449 /// let client = Client::builder()
1450 /// .chain_id(1)
1451 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1452 /// .build()
1453 /// .unwrap();
1454 ///
1455 /// println!("Client URL: {}", client.url());
1456 /// ```
1457 pub fn url(&self) -> &Url {
1458 &self.inner.url
1459 }
1460}
1461
1462/// Builder for creating a hypersync client with configuration options.
1463///
1464/// This builder provides a fluent API for configuring client settings like URL,
1465/// authentication, timeouts, and retry behavior.
1466///
1467/// # Example
1468/// ```
1469/// use hypersync_client::{Client, SerializationFormat};
1470///
1471/// let client = Client::builder()
1472/// .chain_id(1)
1473/// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1474/// .http_req_timeout_millis(30000)
1475/// .max_num_retries(3)
1476/// .build()
1477/// .unwrap();
1478/// ```
1479#[derive(Debug, Clone, Default)]
1480pub struct ClientBuilder(ClientConfig);
1481
1482impl ClientBuilder {
1483 /// Creates a new ClientBuilder with default configuration.
1484 pub fn new() -> Self {
1485 Self::default()
1486 }
1487
1488 /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1489 ///
1490 /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1491 ///
1492 /// # Arguments
1493 /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1494 ///
1495 /// # Example
1496 /// ```
1497 /// use hypersync_client::Client;
1498 ///
1499 /// let client = Client::builder()
1500 /// .chain_id(1) // Ethereum mainnet
1501 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1502 /// .build()
1503 /// .unwrap();
1504 /// ```
1505 pub fn chain_id(mut self, chain_id: u64) -> Self {
1506 self.0.url = format!("https://{chain_id}.hypersync.xyz");
1507 self
1508 }
1509
1510 /// Sets a custom URL for the hypersync server.
1511 ///
1512 /// Use this method when you need to connect to a custom hypersync endpoint
1513 /// instead of the default public endpoints.
1514 ///
1515 /// # Arguments
1516 /// * `url` - The hypersync server URL
1517 ///
1518 /// # Example
1519 /// ```
1520 /// use hypersync_client::Client;
1521 ///
1522 /// let client = Client::builder()
1523 /// .url("https://my-custom-hypersync.example.com")
1524 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1525 /// .build()
1526 /// .unwrap();
1527 /// ```
1528 pub fn url<S: ToString>(mut self, url: S) -> Self {
1529 self.0.url = url.to_string();
1530 self
1531 }
1532
1533 /// Sets the api token for authentication.
1534 ///
1535 /// Required for accessing authenticated hypersync endpoints.
1536 ///
1537 /// # Arguments
1538 /// * `api_token` - The authentication token
1539 ///
1540 /// # Example
1541 /// ```
1542 /// use hypersync_client::Client;
1543 ///
1544 /// let client = Client::builder()
1545 /// .chain_id(1)
1546 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1547 /// .build()
1548 /// .unwrap();
1549 /// ```
1550 pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1551 self.0.api_token = api_token.to_string();
1552 self
1553 }
1554
1555 /// Sets the HTTP request timeout in milliseconds.
1556 ///
1557 /// # Arguments
1558 /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1559 ///
1560 /// # Example
1561 /// ```
1562 /// use hypersync_client::Client;
1563 ///
1564 /// let client = Client::builder()
1565 /// .chain_id(1)
1566 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1567 /// .http_req_timeout_millis(60000) // 60 second timeout
1568 /// .build()
1569 /// .unwrap();
1570 /// ```
1571 pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1572 self.0.http_req_timeout_millis = http_req_timeout_millis;
1573 self
1574 }
1575
1576 /// Sets the maximum number of retries for failed requests.
1577 ///
1578 /// # Arguments
1579 /// * `max_num_retries` - Maximum number of retries (default: 10)
1580 ///
1581 /// # Example
1582 /// ```
1583 /// use hypersync_client::Client;
1584 ///
1585 /// let client = Client::builder()
1586 /// .chain_id(1)
1587 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1588 /// .max_num_retries(5)
1589 /// .build()
1590 /// .unwrap();
1591 /// ```
1592 pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1593 self.0.max_num_retries = max_num_retries;
1594 self
1595 }
1596
1597 /// Sets the backoff increment for retry delays.
1598 ///
1599 /// This value is added to the base delay on each retry attempt.
1600 ///
1601 /// # Arguments
1602 /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1603 ///
1604 /// # Example
1605 /// ```
1606 /// use hypersync_client::Client;
1607 ///
1608 /// let client = Client::builder()
1609 /// .chain_id(1)
1610 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1611 /// .retry_backoff_ms(1000) // 1 second backoff increment
1612 /// .build()
1613 /// .unwrap();
1614 /// ```
1615 pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1616 self.0.retry_backoff_ms = retry_backoff_ms;
1617 self
1618 }
1619
1620 /// Sets the initial delay for retry attempts.
1621 ///
1622 /// # Arguments
1623 /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1624 ///
1625 /// # Example
1626 /// ```
1627 /// use hypersync_client::Client;
1628 ///
1629 /// let client = Client::builder()
1630 /// .chain_id(1)
1631 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1632 /// .retry_base_ms(1000) // Start with 1 second delay
1633 /// .build()
1634 /// .unwrap();
1635 /// ```
1636 pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1637 self.0.retry_base_ms = retry_base_ms;
1638 self
1639 }
1640
1641 /// Sets the maximum delay for retry attempts.
1642 ///
1643 /// The retry delay will not exceed this value, even with backoff increments.
1644 ///
1645 /// # Arguments
1646 /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1647 ///
1648 /// # Example
1649 /// ```
1650 /// use hypersync_client::Client;
1651 ///
1652 /// let client = Client::builder()
1653 /// .chain_id(1)
1654 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1655 /// .retry_ceiling_ms(30000) // Cap at 30 seconds
1656 /// .build()
1657 /// .unwrap();
1658 /// ```
1659 pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1660 self.0.retry_ceiling_ms = retry_ceiling_ms;
1661 self
1662 }
1663
1664 /// Sets whether to proactively sleep when rate limited.
1665 ///
1666 /// When enabled (default), the client will wait for the rate limit window to reset
1667 /// before sending requests it knows will be rejected. Set to `false` to disable
1668 /// this behavior and handle rate limits yourself.
1669 ///
1670 /// # Arguments
1671 /// * `proactive_rate_limit_sleep` - Whether to enable proactive rate limit sleeping (default: true)
1672 pub fn proactive_rate_limit_sleep(mut self, proactive_rate_limit_sleep: bool) -> Self {
1673 self.0.proactive_rate_limit_sleep = proactive_rate_limit_sleep;
1674 self
1675 }
1676
1677 /// Sets the serialization format for client-server communication.
1678 ///
1679 /// # Arguments
1680 /// * `serialization_format` - The format to use (JSON or CapnProto)
1681 ///
1682 /// # Example
1683 /// ```
1684 /// use hypersync_client::{Client, SerializationFormat};
1685 ///
1686 /// let client = Client::builder()
1687 /// .chain_id(1)
1688 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1689 /// .serialization_format(SerializationFormat::Json)
1690 /// .build()
1691 /// .unwrap();
1692 /// ```
1693 pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1694 self.0.serialization_format = serialization_format;
1695 self
1696 }
1697
1698 /// Builds the client with the configured settings.
1699 ///
1700 /// # Returns
1701 /// * `Result<Client>` - The configured client or an error if configuration is invalid
1702 ///
1703 /// # Errors
1704 /// Returns an error if:
1705 /// * The URL is malformed
1706 /// * Required configuration is missing
1707 ///
1708 /// # Example
1709 /// ```
1710 /// use hypersync_client::Client;
1711 ///
1712 /// let client = Client::builder()
1713 /// .chain_id(1)
1714 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1715 /// .build()
1716 /// .unwrap();
1717 /// ```
1718 pub fn build(self) -> Result<Client> {
1719 if self.0.url.is_empty() {
1720 anyhow::bail!(
1721 "endpoint needs to be set, try using builder.chain_id(1) or\
1722 builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1723 )
1724 }
1725 Client::new(self.0)
1726 }
1727}
1728
1729/// 200ms
1730const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1731const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1732/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1733/// so we timeout after 15s (3x the ping interval).
1734const READ_TIMEOUT: Duration = Duration::from_secs(15);
1735
1736/// Events emitted by the height stream.
1737#[derive(Debug, Clone, PartialEq, Eq)]
1738pub enum HeightStreamEvent {
1739 /// Successfully connected or reconnected to the SSE stream.
1740 Connected,
1741 /// Received a height update from the server.
1742 Height(u64),
1743 /// Connection lost, will attempt to reconnect after the specified delay.
1744 Reconnecting {
1745 /// Duration to wait before attempting reconnection.
1746 delay: Duration,
1747 /// The error that caused the reconnection.
1748 error_msg: String,
1749 },
1750}
1751
1752enum InternalStreamEvent {
1753 Publish(HeightStreamEvent),
1754 Ping,
1755 Unknown(String),
1756}
1757
1758impl Client {
1759 fn get_es_stream(&self) -> Result<EventSource> {
1760 // Build the GET /height/sse request
1761 let mut url = self.inner.url.clone();
1762 url.path_segments_mut()
1763 .ok()
1764 .context("invalid base URL")?
1765 .extend(&["height", "sse"]);
1766
1767 let req = self
1768 .inner
1769 .http_client
1770 // Don't set timeout for SSE stream
1771 .request_no_timeout(Method::GET, url);
1772
1773 // Configure exponential backoff for library-level retries
1774 let retry_policy = ExponentialBackoff::new(
1775 INITIAL_RECONNECT_DELAY,
1776 2.0,
1777 Some(MAX_RECONNECT_DELAY),
1778 None, // unlimited retries
1779 );
1780
1781 // Turn the request into an EventSource stream with retries
1782 let mut es = reqwest_eventsource::EventSource::new(req)
1783 .context("unexpected error creating EventSource")?;
1784 es.set_retry_policy(Box::new(retry_policy));
1785 Ok(es)
1786 }
1787
1788 async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1789 let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1790 .await
1791 .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1792 else {
1793 return Ok(None);
1794 };
1795
1796 let e = match res.context("failed response")? {
1797 Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1798 Event::Message(event) => match event.event.as_str() {
1799 "height" => {
1800 let height = event
1801 .data
1802 .trim()
1803 .parse::<u64>()
1804 .context("parsing height from event data")?;
1805 InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1806 }
1807 "ping" => InternalStreamEvent::Ping,
1808 _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1809 },
1810 };
1811
1812 Ok(Some(e))
1813 }
1814
1815 async fn stream_height_events(
1816 es: &mut EventSource,
1817 tx: &mpsc::Sender<HeightStreamEvent>,
1818 ) -> Result<bool> {
1819 let mut received_an_event = false;
1820 while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1821 match event {
1822 InternalStreamEvent::Publish(event) => {
1823 received_an_event = true;
1824 if tx.send(event).await.is_err() {
1825 return Ok(received_an_event); // Receiver dropped, exit task
1826 }
1827 }
1828 InternalStreamEvent::Ping => (), // ignore pings
1829 InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1830 }
1831 }
1832 Ok(received_an_event)
1833 }
1834
1835 fn get_delay(consecutive_failures: u32) -> Duration {
1836 if consecutive_failures > 0 {
1837 /// helper function to calculate 2^x
1838 /// optimization using bit shifting
1839 const fn two_to_pow(x: u32) -> u32 {
1840 1 << x
1841 }
1842 // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1843 INITIAL_RECONNECT_DELAY
1844 .saturating_mul(two_to_pow(consecutive_failures - 1))
1845 .min(MAX_RECONNECT_DELAY)
1846 } else {
1847 // On zero consecutive failures, 0 delay
1848 Duration::from_millis(0)
1849 }
1850 }
1851
1852 async fn stream_height_events_with_retry(
1853 &self,
1854 tx: &mpsc::Sender<HeightStreamEvent>,
1855 ) -> Result<()> {
1856 let mut consecutive_failures = 0u32;
1857
1858 loop {
1859 // should always be able to creat a new es stream
1860 // something is wrong with the req builder otherwise
1861 let mut es = self.get_es_stream().context("get es stream")?;
1862
1863 let mut error = anyhow!("");
1864
1865 match Self::stream_height_events(&mut es, tx).await {
1866 Ok(received_an_event) => {
1867 if received_an_event {
1868 consecutive_failures = 0; // Reset after successful connection that then failed
1869 }
1870 log::trace!("Stream height exited");
1871 }
1872 Err(e) => {
1873 log::trace!("Stream height failed: {e:?}");
1874 error = e;
1875 }
1876 }
1877
1878 es.close();
1879
1880 // If the receiver is closed, exit the task
1881 if tx.is_closed() {
1882 break;
1883 }
1884
1885 let delay = Self::get_delay(consecutive_failures);
1886 log::trace!("Reconnecting in {:?}...", delay);
1887
1888 let error_msg = format!("{error:?}");
1889
1890 if tx
1891 .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1892 .await
1893 .is_err()
1894 {
1895 return Ok(()); // Receiver dropped, exit task
1896 }
1897 tokio::time::sleep(delay).await;
1898
1899 // increment consecutive failures so that on the next try
1900 // it will start using back offs
1901 consecutive_failures += 1;
1902 }
1903
1904 Ok(())
1905 }
1906
1907 /// Streams archive height updates from the server via Server-Sent Events.
1908 ///
1909 /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1910 /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1911 ///
1912 /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1913 /// and height updates. This allows applications to display connection status to users.
1914 ///
1915 /// # Returns
1916 /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1917 /// lifecycle and sends events through this channel.
1918 ///
1919 /// # Example
1920 /// ```
1921 /// # use hypersync_client::{Client, HeightStreamEvent};
1922 /// # async fn example() -> anyhow::Result<()> {
1923 /// let client = Client::builder()
1924 /// .url("https://eth.hypersync.xyz")
1925 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1926 /// .build()?;
1927 ///
1928 /// let mut rx = client.stream_height();
1929 ///
1930 /// while let Some(event) = rx.recv().await {
1931 /// match event {
1932 /// HeightStreamEvent::Connected => println!("Connected to stream"),
1933 /// HeightStreamEvent::Height(h) => println!("Height: {}", h),
1934 /// HeightStreamEvent::Reconnecting { delay, error_msg } => {
1935 /// println!("Reconnecting in {delay:?} due to error: {error_msg}")
1936 /// }
1937 /// }
1938 /// }
1939 /// # Ok(())
1940 /// # }
1941 /// ```
1942 pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1943 let (tx, rx) = mpsc::channel(16);
1944 let client = self.clone();
1945
1946 tokio::spawn(async move {
1947 if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1948 log::error!("Stream height failed unexpectedly: {e:?}");
1949 }
1950 });
1951
1952 rx
1953 }
1954}
1955
1956fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1957 if config.event_signature.is_some() {
1958 return Err(anyhow!(
1959 "config.event_signature can't be passed to simple type function. User is expected to \
1960 decode the logs using Decoder."
1961 ));
1962 }
1963 if config.column_mapping.is_some() {
1964 return Err(anyhow!(
1965 "config.column_mapping can't be passed to single type function. User is expected to \
1966 map values manually."
1967 ));
1968 }
1969
1970 Ok(())
1971}
1972
1973/// Used to indicate whether or not a retry should be attempted.
1974#[derive(Debug, thiserror::Error)]
1975pub enum HyperSyncResponseError {
1976 /// Means that the client should retry with a smaller block range.
1977 #[error("hypersync responded with 'payload too large' error")]
1978 PayloadTooLarge,
1979 /// Server responded with 429 Too Many Requests.
1980 #[error("rate limited by server. To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens")]
1981 RateLimited {
1982 /// Rate limit information from the 429 response headers.
1983 rate_limit: RateLimitInfo,
1984 },
1985 /// Any other server error.
1986 #[error(transparent)]
1987 Other(#[from] anyhow::Error),
1988}
1989
1990/// Result of a single Arrow query execution, before retry logic.
1991struct ArrowImplResponse {
1992 /// The parsed Arrow response data.
1993 response: ArrowResponse,
1994 /// Size of the response body in bytes.
1995 response_bytes: u64,
1996 /// Rate limit information parsed from response headers.
1997 rate_limit: RateLimitInfo,
1998}
1999
2000#[cfg(test)]
2001mod tests {
2002 use super::*;
2003 #[test]
2004 fn test_get_delay() {
2005 assert_eq!(
2006 Client::get_delay(0),
2007 Duration::from_millis(0),
2008 "starts with 0 delay"
2009 );
2010 // powers of 2 backoff
2011 assert_eq!(Client::get_delay(1), Duration::from_millis(200));
2012 assert_eq!(Client::get_delay(2), Duration::from_millis(400));
2013 assert_eq!(Client::get_delay(3), Duration::from_millis(800));
2014 // maxes out at 30s
2015 assert_eq!(
2016 Client::get_delay(9),
2017 Duration::from_secs(30),
2018 "max delay is 30s"
2019 );
2020 assert_eq!(
2021 Client::get_delay(10),
2022 Duration::from_secs(30),
2023 "max delay is 30s"
2024 );
2025 }
2026
2027 #[tokio::test]
2028 #[ignore = "integration test requiring live hs server"]
2029 async fn test_http2_is_used() -> anyhow::Result<()> {
2030 let api_token = std::env::var("ENVIO_API_TOKEN")?;
2031 let client = reqwest::Client::builder()
2032 .no_gzip()
2033 .user_agent("hscr-test")
2034 .build()?;
2035
2036 let res = client
2037 .get("https://eth.hypersync.xyz/height")
2038 .bearer_auth(&api_token)
2039 .send()
2040 .await?;
2041
2042 assert_eq!(
2043 res.version(),
2044 reqwest::Version::HTTP_2,
2045 "expected HTTP/2 but got {:?}",
2046 res.version()
2047 );
2048 assert!(res.status().is_success());
2049 Ok(())
2050 }
2051
2052 #[tokio::test]
2053 #[ignore = "integration test with live hs server for height stream"]
2054 async fn test_stream_height_events() -> anyhow::Result<()> {
2055 let (tx, mut rx) = mpsc::channel(16);
2056 let handle = tokio::spawn(async move {
2057 let client = Client::builder()
2058 .url("https://monad-testnet.hypersync.xyz")
2059 .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
2060 .build()?;
2061 let mut es = client.get_es_stream().context("get es stream")?;
2062 Client::stream_height_events(&mut es, &tx).await
2063 });
2064
2065 let val = rx.recv().await;
2066 assert!(val.is_some());
2067 assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
2068 let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
2069 panic!("should have received height")
2070 };
2071 let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
2072 panic!("should have received height")
2073 };
2074 assert!(height2 > height);
2075 drop(rx);
2076
2077 let res = handle.await.expect("should have joined");
2078 let received_an_event =
2079 res.expect("should have ended the stream gracefully after dropping rx");
2080 assert!(received_an_event, "should have received an event");
2081
2082 Ok(())
2083 }
2084}