hypersync_client/lib.rs
1#![deny(missing_docs)]
2// README.md is a symlink (ln -s ../README.md README.md) to the root workspace
3// README. This is needed so that both `cargo doc` and `cargo package` work:
4// - `cargo doc`: include_str! resolves the symlink to the root README.
5// - `cargo package`: cargo copies the symlink target into the tarball, so
6// include_str! finds README.md at the package root during verification.
7#![doc = include_str!("../README.md")]
8use std::time::Instant;
9use std::{sync::Arc, time::Duration};
10
11use anyhow::{anyhow, Context, Result};
12use futures::StreamExt;
13use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
14use reqwest::{Method, StatusCode};
15use reqwest_eventsource::retry::ExponentialBackoff;
16use reqwest_eventsource::{Event, EventSource};
17
18pub mod arrow_reader;
19mod column_mapping;
20mod config;
21mod decode;
22mod decode_call;
23mod from_arrow;
24mod parquet_out;
25mod parse_response;
26pub mod preset_query;
27mod rate_limit;
28mod rayon_async;
29pub mod simple_types;
30mod stream;
31mod types;
32mod util;
33
34pub use hypersync_format as format;
35pub use hypersync_net_types as net_types;
36pub use hypersync_schema as schema;
37
38use parse_response::parse_query_response;
39use tokio::sync::mpsc;
40use types::ResponseData;
41use url::Url;
42
43pub use column_mapping::{ColumnMapping, DataType};
44pub use config::HexOutput;
45pub use config::{ClientConfig, SerializationFormat, StreamConfig};
46pub use decode::Decoder;
47pub use decode_call::CallDecoder;
48pub use rate_limit::RateLimitInfo;
49pub use types::{
50 ArrowResponse, ArrowResponseData, EventResponse, QueryResponse, RateLimitResponse,
51};
52
53use crate::parse_response::read_query_response;
54use crate::simple_types::InternalEventJoinStrategy;
55
56#[derive(Debug)]
57struct HttpClientWrapper {
58 /// Mutable state that needs to be refreshed periodically
59 inner: std::sync::Mutex<HttpClientWrapperInner>,
60 /// HyperSync server api token.
61 api_token: String,
62 /// Standard timeout for http requests.
63 timeout: Duration,
64 /// Pools are never idle since polling often happens on the client
65 /// so connections never timeout and dns lookups never happen again
66 /// this is problematic if the underlying ip address changes during
67 /// failovers on hypersync. Since reqwest doesn't implement this we
68 /// simply recreate the client every time
69 max_connection_age: Duration,
70 /// For refreshing the client
71 user_agent: String,
72}
73
74#[derive(Debug)]
75struct HttpClientWrapperInner {
76 /// Initialized reqwest instance for client url.
77 client: reqwest::Client,
78 /// Timestamp when the client was created
79 created_at: Instant,
80}
81
82impl HttpClientWrapper {
83 fn new(user_agent: String, api_token: String, timeout: Duration) -> Self {
84 let client = reqwest::Client::builder()
85 .no_gzip()
86 .user_agent(&user_agent)
87 .build()
88 .unwrap();
89
90 Self {
91 inner: std::sync::Mutex::new(HttpClientWrapperInner {
92 client,
93 created_at: Instant::now(),
94 }),
95 api_token,
96 timeout,
97 max_connection_age: Duration::from_secs(60),
98 user_agent,
99 }
100 }
101
102 fn get_client(&self) -> reqwest::Client {
103 let mut inner = self.inner.lock().expect("HttpClientWrapper mutex poisoned");
104
105 // Check if client needs refresh due to age
106 if inner.created_at.elapsed() > self.max_connection_age {
107 // Recreate client to force new DNS lookup for failover scenarios
108 inner.client = reqwest::Client::builder()
109 .no_gzip()
110 .user_agent(&self.user_agent)
111 .build()
112 .unwrap();
113 inner.created_at = Instant::now();
114 }
115
116 // Clone is cheap for reqwest::Client (it's Arc-wrapped internally)
117 inner.client.clone()
118 }
119
120 fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
121 let client = self.get_client();
122 client
123 .request(method, url)
124 .timeout(self.timeout)
125 .bearer_auth(&self.api_token)
126 }
127
128 fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
129 let client = self.get_client();
130 client.request(method, url).bearer_auth(&self.api_token)
131 }
132}
133
134/// Internal client state to handle http requests and retries.
135#[derive(Debug)]
136struct ClientInner {
137 http_client: HttpClientWrapper,
138 /// HyperSync server URL.
139 url: Url,
140 /// Number of retries to attempt before returning error.
141 max_num_retries: usize,
142 /// Milliseconds that would be used for retry backoff increasing.
143 retry_backoff_ms: u64,
144 /// Initial wait time for request backoff.
145 retry_base_ms: u64,
146 /// Ceiling time for request backoff.
147 retry_ceiling_ms: u64,
148 /// Query serialization format to use for HTTP requests.
149 serialization_format: SerializationFormat,
150 /// Most recently observed rate limit info from the server, paired with the
151 /// instant it was captured so elapsed time can be subtracted from `reset_secs`.
152 rate_limit_state: std::sync::Mutex<Option<(RateLimitInfo, Instant)>>,
153 /// Whether to proactively sleep when the rate limit is exhausted.
154 proactive_rate_limit_sleep: bool,
155}
156
157/// Client to handle http requests and retries.
158#[derive(Clone, Debug)]
159pub struct Client {
160 inner: Arc<ClientInner>,
161}
162
163impl Client {
164 /// Creates a new client with the given configuration.
165 ///
166 /// Configuration must include the `url` and `api_token` fields.
167 /// # Example
168 /// ```
169 /// use hypersync_client::{Client, ClientConfig};
170 ///
171 /// let config = ClientConfig {
172 /// url: "https://eth.hypersync.xyz".to_string(),
173 /// api_token: std::env::var("ENVIO_API_TOKEN")?,
174 /// ..Default::default()
175 /// };
176 /// let client = Client::new(config)?;
177 /// # Ok::<(), anyhow::Error>(())
178 /// ```
179 ///
180 /// # Errors
181 /// This method fails if the config is invalid.
182 pub fn new(cfg: ClientConfig) -> Result<Self> {
183 // hscr stands for hypersync client rust
184 cfg.validate().context("invalid ClientConfig")?;
185 let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
186 Self::new_internal(cfg, user_agent)
187 }
188
189 /// Creates a new client builder.
190 ///
191 /// # Example
192 /// ```
193 /// use hypersync_client::Client;
194 ///
195 /// let client = Client::builder()
196 /// .chain_id(1)
197 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
198 /// .build()
199 /// .unwrap();
200 /// ```
201 pub fn builder() -> ClientBuilder {
202 ClientBuilder::new()
203 }
204
205 #[doc(hidden)]
206 pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
207 // Creates a new client with the given configuration and custom user agent.
208 // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
209 Self::new_internal(cfg, user_agent.into())
210 }
211
212 /// Internal constructor that takes both config and user agent.
213 fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
214 let http_client = HttpClientWrapper::new(
215 user_agent,
216 cfg.api_token,
217 Duration::from_millis(cfg.http_req_timeout_millis),
218 );
219
220 let url = Url::parse(&cfg.url).context("url is malformed")?;
221
222 Ok(Self {
223 inner: Arc::new(ClientInner {
224 http_client,
225 url,
226 max_num_retries: cfg.max_num_retries,
227 retry_backoff_ms: cfg.retry_backoff_ms,
228 retry_base_ms: cfg.retry_base_ms,
229 retry_ceiling_ms: cfg.retry_ceiling_ms,
230 serialization_format: cfg.serialization_format,
231 rate_limit_state: std::sync::Mutex::new(None),
232 proactive_rate_limit_sleep: cfg.proactive_rate_limit_sleep,
233 }),
234 })
235 }
236
237 /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
238 /// query and stream configuration.
239 ///
240 /// ### Implementation
241 /// Runs multiple queries simultaneously based on config.concurrency.
242 ///
243 /// Each query runs until it reaches query.to, server height, any max_num_* query param,
244 /// or execution timed out by server.
245 ///
246 /// # ⚠️ Important Warning
247 ///
248 /// This method will continue executing until the query has run to completion from beginning
249 /// to the end of the block range defined in the query. For heavy queries with large block
250 /// ranges or high data volumes, consider:
251 ///
252 /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
253 /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
254 /// - Break large queries into smaller block ranges
255 ///
256 /// # Example
257 /// ```no_run
258 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
259 ///
260 /// # async fn example() -> anyhow::Result<()> {
261 /// let client = Client::builder()
262 /// .chain_id(1)
263 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
264 /// .build()?;
265 ///
266 /// // Query ERC20 transfer events
267 /// let query = Query::new()
268 /// .from_block(19000000)
269 /// .to_block_excl(19000010)
270 /// .where_logs(
271 /// LogFilter::all()
272 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
273 /// )
274 /// .select_log_fields([LogField::Address, LogField::Data]);
275 /// let response = client.collect(query, StreamConfig::default()).await?;
276 ///
277 /// println!("Collected {} events", response.data.logs.len());
278 /// # Ok(())
279 /// # }
280 /// ```
281 pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
282 check_simple_stream_params(&config)?;
283
284 let mut recv = stream::stream_arrow(self, query, config)
285 .await
286 .context("start stream")?;
287
288 let mut data = ResponseData::default();
289 let mut archive_height = None;
290 let mut next_block = 0;
291 let mut total_execution_time = 0;
292
293 while let Some(res) = recv.recv().await {
294 let res = res.context("get response")?;
295 let res: QueryResponse =
296 QueryResponse::try_from(&res).context("convert arrow response")?;
297
298 for batch in res.data.blocks {
299 data.blocks.push(batch);
300 }
301 for batch in res.data.transactions {
302 data.transactions.push(batch);
303 }
304 for batch in res.data.logs {
305 data.logs.push(batch);
306 }
307 for batch in res.data.traces {
308 data.traces.push(batch);
309 }
310
311 archive_height = res.archive_height;
312 next_block = res.next_block;
313 total_execution_time += res.total_execution_time
314 }
315
316 Ok(QueryResponse {
317 archive_height,
318 next_block,
319 total_execution_time,
320 data,
321 rollback_guard: None,
322 })
323 }
324
325 /// Retrieves events through a stream using the provided query and stream configuration.
326 ///
327 /// # ⚠️ Important Warning
328 ///
329 /// This method will continue executing until the query has run to completion from beginning
330 /// to the end of the block range defined in the query. For heavy queries with large block
331 /// ranges or high data volumes, consider:
332 ///
333 /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
334 /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
335 /// - Break large queries into smaller block ranges
336 ///
337 /// # Example
338 /// ```no_run
339 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
340 ///
341 /// # async fn example() -> anyhow::Result<()> {
342 /// let client = Client::builder()
343 /// .chain_id(1)
344 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
345 /// .build()?;
346 ///
347 /// // Query transactions to a specific address
348 /// let query = Query::new()
349 /// .from_block(19000000)
350 /// .to_block_excl(19000100)
351 /// .where_transactions(
352 /// TransactionFilter::all()
353 /// .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
354 /// )
355 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
356 /// let response = client.collect_events(query, StreamConfig::default()).await?;
357 ///
358 /// println!("Collected {} events", response.data.len());
359 /// # Ok(())
360 /// # }
361 /// ```
362 pub async fn collect_events(
363 &self,
364 mut query: Query,
365 config: StreamConfig,
366 ) -> Result<EventResponse> {
367 check_simple_stream_params(&config)?;
368
369 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
370 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
371
372 let mut recv = stream::stream_arrow(self, query, config)
373 .await
374 .context("start stream")?;
375
376 let mut data = Vec::new();
377 let mut archive_height = None;
378 let mut next_block = 0;
379 let mut total_execution_time = 0;
380
381 while let Some(res) = recv.recv().await {
382 let res = res.context("get response")?;
383 let res: QueryResponse =
384 QueryResponse::try_from(&res).context("convert arrow response")?;
385 let events = event_join_strategy.join_from_response_data(res.data);
386
387 data.extend(events);
388
389 archive_height = res.archive_height;
390 next_block = res.next_block;
391 total_execution_time += res.total_execution_time
392 }
393
394 Ok(EventResponse {
395 archive_height,
396 next_block,
397 total_execution_time,
398 data,
399 rollback_guard: None,
400 })
401 }
402
403 /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
404 /// the provided query and stream configuration.
405 ///
406 /// Returns data in Apache Arrow format for high-performance columnar processing.
407 /// Useful for analytics workloads or when working with Arrow-compatible tools.
408 ///
409 /// # ⚠️ Important Warning
410 ///
411 /// This method will continue executing until the query has run to completion from beginning
412 /// to the end of the block range defined in the query. For heavy queries with large block
413 /// ranges or high data volumes, consider:
414 ///
415 /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
416 /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
417 /// - Break large queries into smaller block ranges
418 ///
419 /// # Example
420 /// ```no_run
421 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
422 ///
423 /// # async fn example() -> anyhow::Result<()> {
424 /// let client = Client::builder()
425 /// .chain_id(1)
426 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
427 /// .build()?;
428 ///
429 /// // Get block data in Arrow format for analytics
430 /// let query = Query::new()
431 /// .from_block(19000000)
432 /// .to_block_excl(19000100)
433 /// .include_all_blocks()
434 /// .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
435 /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
436 ///
437 /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
438 /// # Ok(())
439 /// # }
440 /// ```
441 pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
442 let mut recv = stream::stream_arrow(self, query, config)
443 .await
444 .context("start stream")?;
445
446 let mut data = ArrowResponseData::default();
447 let mut archive_height = None;
448 let mut next_block = 0;
449 let mut total_execution_time = 0;
450
451 while let Some(res) = recv.recv().await {
452 let res = res.context("get response")?;
453
454 for batch in res.data.blocks {
455 data.blocks.push(batch);
456 }
457 for batch in res.data.transactions {
458 data.transactions.push(batch);
459 }
460 for batch in res.data.logs {
461 data.logs.push(batch);
462 }
463 for batch in res.data.traces {
464 data.traces.push(batch);
465 }
466 for batch in res.data.decoded_logs {
467 data.decoded_logs.push(batch);
468 }
469
470 archive_height = res.archive_height;
471 next_block = res.next_block;
472 total_execution_time += res.total_execution_time
473 }
474
475 Ok(ArrowResponse {
476 archive_height,
477 next_block,
478 total_execution_time,
479 data,
480 rollback_guard: None,
481 })
482 }
483
484 /// Writes parquet file getting data through a stream using the provided path, query,
485 /// and stream configuration.
486 ///
487 /// Streams data directly to a Parquet file for efficient storage and later analysis.
488 /// Perfect for data exports or ETL pipelines.
489 ///
490 /// # ⚠️ Important Warning
491 ///
492 /// This method will continue executing until the query has run to completion from beginning
493 /// to the end of the block range defined in the query. For heavy queries with large block
494 /// ranges or high data volumes, consider:
495 ///
496 /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
497 /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
498 /// - Break large queries into smaller block ranges
499 ///
500 /// # Example
501 /// ```no_run
502 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
503 ///
504 /// # async fn example() -> anyhow::Result<()> {
505 /// let client = Client::builder()
506 /// .chain_id(1)
507 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
508 /// .build()?;
509 ///
510 /// // Export all DEX trades to Parquet for analysis
511 /// let query = Query::new()
512 /// .from_block(19000000)
513 /// .to_block_excl(19010000)
514 /// .where_logs(
515 /// LogFilter::all()
516 /// .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
517 /// )
518 /// .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
519 /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
520 ///
521 /// println!("Trade data exported to trades.parquet");
522 /// # Ok(())
523 /// # }
524 /// ```
525 pub async fn collect_parquet(
526 &self,
527 path: &str,
528 query: Query,
529 config: StreamConfig,
530 ) -> Result<()> {
531 parquet_out::collect_parquet(self, path, query, config).await
532 }
533
534 /// Internal implementation of getting chain_id from server
535 async fn get_chain_id_impl(&self) -> Result<u64> {
536 let mut url = self.inner.url.clone();
537 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
538 segments.push("chain_id");
539 std::mem::drop(segments);
540 let req = self.inner.http_client.request(Method::GET, url);
541
542 let res = req.send().await.context("execute http req")?;
543
544 let status = res.status();
545 if !status.is_success() {
546 return Err(anyhow!("http response status code {}", status));
547 }
548
549 let chain_id: ChainId = res.json().await.context("read response body json")?;
550
551 Ok(chain_id.chain_id)
552 }
553
554 /// Internal implementation of getting height from server
555 async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
556 let mut url = self.inner.url.clone();
557 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
558 segments.push("height");
559 std::mem::drop(segments);
560 let mut req = self.inner.http_client.request(Method::GET, url);
561 if let Some(http_timeout_override) = http_timeout_override {
562 req = req.timeout(http_timeout_override);
563 }
564
565 let res = req.send().await.context("execute http req")?;
566
567 let status = res.status();
568 if !status.is_success() {
569 return Err(anyhow!("http response status code {}", status));
570 }
571
572 let height: ArchiveHeight = res.json().await.context("read response body json")?;
573
574 Ok(height.height.unwrap_or(0))
575 }
576
577 /// Get the chain_id from the server with retries.
578 ///
579 /// # Example
580 /// ```no_run
581 /// use hypersync_client::Client;
582 ///
583 /// # async fn example() -> anyhow::Result<()> {
584 /// let client = Client::builder()
585 /// .chain_id(1)
586 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
587 /// .build()?;
588 ///
589 /// let chain_id = client.get_chain_id().await?;
590 /// println!("Connected to chain ID: {}", chain_id);
591 /// # Ok(())
592 /// # }
593 /// ```
594 pub async fn get_chain_id(&self) -> Result<u64> {
595 let mut base = self.inner.retry_base_ms;
596
597 let mut err = anyhow!("");
598
599 for _ in 0..self.inner.max_num_retries + 1 {
600 match self.get_chain_id_impl().await {
601 Ok(res) => return Ok(res),
602 Err(e) => {
603 log::error!(
604 "failed to get chain_id from server, retrying... The error was: {e:?}"
605 );
606 err = err.context(format!("{e:?}"));
607 }
608 }
609
610 let base_ms = Duration::from_millis(base);
611 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
612 rand::random(),
613 self.inner.retry_backoff_ms,
614 ));
615
616 tokio::time::sleep(base_ms + jitter).await;
617
618 base = std::cmp::min(
619 base + self.inner.retry_backoff_ms,
620 self.inner.retry_ceiling_ms,
621 );
622 }
623
624 Err(err)
625 }
626
627 /// Get the height of from server with retries.
628 ///
629 /// # Example
630 /// ```no_run
631 /// use hypersync_client::Client;
632 ///
633 /// # async fn example() -> anyhow::Result<()> {
634 /// let client = Client::builder()
635 /// .chain_id(1)
636 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
637 /// .build()?;
638 ///
639 /// let height = client.get_height().await?;
640 /// println!("Current block height: {}", height);
641 /// # Ok(())
642 /// # }
643 /// ```
644 pub async fn get_height(&self) -> Result<u64> {
645 let mut base = self.inner.retry_base_ms;
646
647 let mut err = anyhow!("");
648
649 for _ in 0..self.inner.max_num_retries + 1 {
650 match self.get_height_impl(None).await {
651 Ok(res) => return Ok(res),
652 Err(e) => {
653 log::error!(
654 "failed to get height from server, retrying... The error was: {e:?}"
655 );
656 err = err.context(format!("{e:?}"));
657 }
658 }
659
660 let base_ms = Duration::from_millis(base);
661 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
662 rand::random(),
663 self.inner.retry_backoff_ms,
664 ));
665
666 tokio::time::sleep(base_ms + jitter).await;
667
668 base = std::cmp::min(
669 base + self.inner.retry_backoff_ms,
670 self.inner.retry_ceiling_ms,
671 );
672 }
673
674 Err(err)
675 }
676
677 /// Get the height of the Client instance for health checks.
678 ///
679 /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
680 ///
681 /// # Example
682 /// ```no_run
683 /// use hypersync_client::Client;
684 /// use std::time::Duration;
685 ///
686 /// # async fn example() -> anyhow::Result<()> {
687 /// let client = Client::builder()
688 /// .chain_id(1)
689 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
690 /// .build()?;
691 ///
692 /// // Quick health check with 5 second timeout
693 /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
694 /// println!("Server is healthy at block: {}", height);
695 /// # Ok(())
696 /// # }
697 /// ```
698 pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
699 self.get_height_impl(http_req_timeout).await
700 }
701
702 /// Executes query with retries and returns the response.
703 ///
704 /// # Example
705 /// ```no_run
706 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
707 ///
708 /// # async fn example() -> anyhow::Result<()> {
709 /// let client = Client::builder()
710 /// .chain_id(1)
711 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
712 /// .build()?;
713 ///
714 /// // Query all blocks from a specific range
715 /// let query = Query::new()
716 /// .from_block(19000000)
717 /// .to_block_excl(19000010)
718 /// .include_all_blocks()
719 /// .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
720 /// let response = client.get(&query).await?;
721 ///
722 /// println!("Retrieved {} blocks", response.data.blocks.len());
723 /// # Ok(())
724 /// # }
725 /// ```
726 pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
727 let arrow_response = self.get_arrow(query).await.context("get data")?;
728 let converted =
729 QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
730 Ok(converted)
731 }
732
733 /// Add block, transaction and log fields selection to the query, executes it with retries
734 /// and returns the response.
735 ///
736 /// This method automatically joins blocks, transactions, and logs into unified events,
737 /// making it easier to work with related blockchain data.
738 ///
739 /// # Example
740 /// ```no_run
741 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
742 ///
743 /// # async fn example() -> anyhow::Result<()> {
744 /// let client = Client::builder()
745 /// .chain_id(1)
746 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
747 /// .build()?;
748 ///
749 /// // Query ERC20 transfers with transaction context
750 /// let query = Query::new()
751 /// .from_block(19000000)
752 /// .to_block_excl(19000010)
753 /// .where_logs(
754 /// LogFilter::all()
755 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
756 /// )
757 /// .select_log_fields([LogField::Address, LogField::Data])
758 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
759 /// let response = client.get_events(query).await?;
760 ///
761 /// println!("Retrieved {} joined events", response.data.len());
762 /// # Ok(())
763 /// # }
764 /// ```
765 pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
766 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
767 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
768 let arrow_response = self.get_arrow(&query).await.context("get data")?;
769 EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
770 }
771
772 /// Executes query once and returns the result using JSON serialization.
773 async fn get_arrow_impl_json(
774 &self,
775 query: &Query,
776 ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
777 let mut url = self.inner.url.clone();
778 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
779 segments.push("query");
780 segments.push("arrow-ipc");
781 std::mem::drop(segments);
782 let req = self.inner.http_client.request(Method::POST, url);
783
784 let res = req.json(&query).send().await.context("execute http req")?;
785
786 let status = res.status();
787 let rate_limit = RateLimitInfo::from_response(&res);
788
789 if status == StatusCode::PAYLOAD_TOO_LARGE {
790 return Err(HyperSyncResponseError::PayloadTooLarge);
791 }
792 if status == StatusCode::TOO_MANY_REQUESTS {
793 return Err(HyperSyncResponseError::RateLimited { rate_limit });
794 }
795 if !status.is_success() {
796 let text = res.text().await.context("read text to see error")?;
797
798 return Err(HyperSyncResponseError::Other(anyhow!(
799 "http response status code {}, err body: {}",
800 status,
801 text
802 )));
803 }
804
805 let bytes = res.bytes().await.context("read response body bytes")?;
806
807 let res = tokio::task::block_in_place(|| {
808 parse_query_response(&bytes).context("parse query response")
809 })?;
810
811 Ok(ArrowImplResponse {
812 response: res,
813 response_bytes: bytes.len().try_into().unwrap(),
814 rate_limit,
815 })
816 }
817
818 fn should_cache_queries(&self) -> bool {
819 matches!(
820 self.inner.serialization_format,
821 SerializationFormat::CapnProto {
822 should_cache_queries: true
823 }
824 )
825 }
826
827 /// Executes query once and returns the result using Cap'n Proto serialization.
828 async fn get_arrow_impl_capnp(
829 &self,
830 query: &Query,
831 ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
832 let mut url = self.inner.url.clone();
833 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
834 segments.push("query");
835 segments.push("arrow-ipc");
836 segments.push("capnp");
837 std::mem::drop(segments);
838
839 let should_cache = self.should_cache_queries();
840
841 if should_cache {
842 let query_with_id = {
843 let mut message = capnp::message::Builder::new_default();
844 let mut request_builder =
845 message.init_root::<hypersync_net_types_capnp::request::Builder>();
846
847 request_builder
848 .build_query_id_from_query(query)
849 .context("build query id")?;
850 let mut query_with_id = Vec::new();
851 capnp::serialize_packed::write_message(&mut query_with_id, &message)
852 .context("write capnp message")?;
853 query_with_id
854 };
855
856 let req = self.inner.http_client.request(Method::POST, url.clone());
857
858 let res = req
859 .body(query_with_id)
860 .send()
861 .await
862 .context("execute http req")?;
863
864 let status = res.status();
865 let rate_limit = RateLimitInfo::from_response(&res);
866
867 if status == StatusCode::PAYLOAD_TOO_LARGE {
868 return Err(HyperSyncResponseError::PayloadTooLarge);
869 }
870 if status == StatusCode::TOO_MANY_REQUESTS {
871 return Err(HyperSyncResponseError::RateLimited { rate_limit });
872 }
873 if status.is_success() {
874 let bytes = res.bytes().await.context("read response body bytes")?;
875
876 let mut opts = capnp::message::ReaderOptions::new();
877 opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
878 let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
879 .context("create message reader")?;
880 let query_response = message_reader
881 .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
882 .context("get cached_query_response root")?;
883 match query_response.get_either().which().context("get either")? {
884 hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
885 query_response,
886 ) => {
887 let res = tokio::task::block_in_place(|| {
888 let res = query_response?;
889 read_query_response(&res).context("parse query response cached")
890 })?;
891 return Ok(ArrowImplResponse {
892 response: res,
893 response_bytes: bytes.len().try_into().unwrap(),
894 rate_limit,
895 });
896 }
897 hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
898 log::trace!("query was not cached, retrying with full query");
899 }
900 }
901 } else {
902 let text = res.text().await.context("read text to see error")?;
903 log::warn!(
904 "Failed cache query, will retry full query. {}, err body: {}",
905 status,
906 text
907 );
908 }
909 };
910
911 let full_query_bytes = {
912 let mut message = capnp::message::Builder::new_default();
913 let mut query_builder =
914 message.init_root::<hypersync_net_types_capnp::request::Builder>();
915
916 query_builder
917 .build_full_query_from_query(query, should_cache)
918 .context("build full query")?;
919 let mut bytes = Vec::new();
920 capnp::serialize_packed::write_message(&mut bytes, &message)
921 .context("write full query capnp message")?;
922 bytes
923 };
924
925 let req = self.inner.http_client.request(Method::POST, url);
926
927 let res = req
928 .body(full_query_bytes)
929 .send()
930 .await
931 .context("execute http req")?;
932
933 let status = res.status();
934 let rate_limit = RateLimitInfo::from_response(&res);
935
936 if status == StatusCode::PAYLOAD_TOO_LARGE {
937 return Err(HyperSyncResponseError::PayloadTooLarge);
938 }
939 if status == StatusCode::TOO_MANY_REQUESTS {
940 return Err(HyperSyncResponseError::RateLimited { rate_limit });
941 }
942 if !status.is_success() {
943 let text = res.text().await.context("read text to see error")?;
944
945 return Err(HyperSyncResponseError::Other(anyhow!(
946 "http response status code {}, err body: {}",
947 status,
948 text
949 )));
950 }
951
952 let bytes = res.bytes().await.context("read response body bytes")?;
953
954 let res = tokio::task::block_in_place(|| {
955 parse_query_response(&bytes).context("parse query response")
956 })?;
957
958 Ok(ArrowImplResponse {
959 response: res,
960 response_bytes: bytes.len().try_into().unwrap(),
961 rate_limit,
962 })
963 }
964
965 /// Executes query once and returns the result, handling payload-too-large by halving block range.
966 async fn get_arrow_impl(
967 &self,
968 query: &Query,
969 ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
970 let mut query = query.clone();
971 loop {
972 let res = match self.inner.serialization_format {
973 SerializationFormat::Json => self.get_arrow_impl_json(&query).await,
974 SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(&query).await,
975 };
976 match res {
977 Ok(res) => return Ok(res),
978 Err(
979 e @ (HyperSyncResponseError::Other(_)
980 | HyperSyncResponseError::RateLimited { .. }),
981 ) => return Err(e),
982 Err(HyperSyncResponseError::PayloadTooLarge) => {
983 let block_range = if let Some(to_block) = query.to_block {
984 let current = to_block - query.from_block;
985 if current < 2 {
986 return Err(HyperSyncResponseError::Other(anyhow!(
987 "Payload is too large and query is using the minimum block range."
988 )));
989 }
990 // Half the current block range
991 current / 2
992 } else {
993 200
994 };
995 let to_block = query.from_block + block_range;
996 query.to_block = Some(to_block);
997
998 log::trace!(
999 "Payload is too large, retrying with block range from: {} to: {}",
1000 query.from_block,
1001 to_block
1002 );
1003 }
1004 }
1005 }
1006 }
1007
1008 /// Executes query with retries and returns the response in Arrow format.
1009 pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
1010 const WAIT_ON_RATE_LIMIT: bool = true;
1011 self.get_arrow_with_size(query, WAIT_ON_RATE_LIMIT)
1012 .await
1013 .map(|res| res.response)
1014 }
1015
1016 /// Internal implementation for get_arrow.
1017 ///
1018 /// When `wait_on_rate_limit` is `false`, a 429 response is returned
1019 /// immediately with the rate limit info instead of being retried.
1020 async fn get_arrow_with_size(
1021 &self,
1022 query: &Query,
1023 wait_on_rate_limit: bool,
1024 ) -> Result<ArrowImplResponse> {
1025 let mut base = self.inner.retry_base_ms;
1026
1027 let mut err = anyhow!("");
1028
1029 if self.inner.proactive_rate_limit_sleep {
1030 if wait_on_rate_limit {
1031 self.wait_for_rate_limit().await;
1032 } else if let Some(rate_limit) = self.get_proactive_rate_limit_info() {
1033 return Err(anyhow::anyhow!(HyperSyncResponseError::RateLimited {
1034 rate_limit
1035 }));
1036 }
1037 }
1038
1039 for _ in 0..self.inner.max_num_retries + 1 {
1040 match self.get_arrow_impl(query).await {
1041 Ok(res) => {
1042 self.update_rate_limit_state(&res.rate_limit);
1043 return Ok(res);
1044 }
1045 Err(HyperSyncResponseError::RateLimited { rate_limit }) => {
1046 self.update_rate_limit_state(&rate_limit);
1047 if !wait_on_rate_limit {
1048 return Err(anyhow::anyhow!(HyperSyncResponseError::RateLimited {
1049 rate_limit
1050 }));
1051 }
1052 let wait_secs = rate_limit.suggested_wait_secs().unwrap_or(1) + 1;
1053 log::warn!(
1054 "rate limited by server ({rate_limit}), waiting {wait_secs}s before retry. To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens"
1055 );
1056 err = err.context(format!("rate limited by server ({rate_limit}). To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens"));
1057 tokio::time::sleep(Duration::from_secs(wait_secs)).await;
1058 continue;
1059 }
1060 Err(HyperSyncResponseError::Other(e)) => {
1061 log::error!(
1062 "failed to get arrow data from server, retrying... The error was: {e:?}"
1063 );
1064 err = err.context(format!("{e:?}"));
1065 }
1066 Err(HyperSyncResponseError::PayloadTooLarge) => {
1067 // This shouldn't happen since get_arrow_impl handles it, but just in case
1068 log::error!("unexpected PayloadTooLarge from get_arrow_impl, retrying...");
1069 err = err.context("unexpected PayloadTooLarge");
1070 }
1071 }
1072
1073 let base_ms = Duration::from_millis(base);
1074 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
1075 rand::random(),
1076 self.inner.retry_backoff_ms,
1077 ));
1078
1079 tokio::time::sleep(base_ms + jitter).await;
1080
1081 base = std::cmp::min(
1082 base + self.inner.retry_backoff_ms,
1083 self.inner.retry_ceiling_ms,
1084 );
1085 }
1086
1087 Err(err)
1088 }
1089
1090 /// Spawns task to execute query and return data via a channel.
1091 ///
1092 /// # Example
1093 /// ```no_run
1094 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
1095 ///
1096 /// # async fn example() -> anyhow::Result<()> {
1097 /// let client = Client::builder()
1098 /// .chain_id(1)
1099 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1100 /// .build()?;
1101 ///
1102 /// // Stream all ERC20 transfer events
1103 /// let query = Query::new()
1104 /// .from_block(19000000)
1105 /// .where_logs(
1106 /// LogFilter::all()
1107 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1108 /// )
1109 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
1110 /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
1111 ///
1112 /// while let Some(response) = receiver.recv().await {
1113 /// let response = response?;
1114 /// println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
1115 /// }
1116 /// # Ok(())
1117 /// # }
1118 /// ```
1119 pub async fn stream(
1120 &self,
1121 query: Query,
1122 config: StreamConfig,
1123 ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1124 check_simple_stream_params(&config)?;
1125
1126 let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1127 mpsc::channel(config.concurrency);
1128
1129 let mut inner_rx = self
1130 .stream_arrow(query, config)
1131 .await
1132 .context("start inner stream")?;
1133
1134 tokio::spawn(async move {
1135 while let Some(resp) = inner_rx.recv().await {
1136 let msg = resp
1137 .context("inner receiver")
1138 .and_then(|r| QueryResponse::try_from(&r));
1139 let is_err = msg.is_err();
1140 if tx.send(msg).await.is_err() || is_err {
1141 return;
1142 }
1143 }
1144 });
1145
1146 Ok(rx)
1147 }
1148
1149 /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1150 /// returning data via a channel.
1151 ///
1152 /// This method automatically joins blocks, transactions, and logs into unified events,
1153 /// then streams them via a channel for real-time processing.
1154 ///
1155 /// # Example
1156 /// ```no_run
1157 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1158 ///
1159 /// # async fn example() -> anyhow::Result<()> {
1160 /// let client = Client::builder()
1161 /// .chain_id(1)
1162 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1163 /// .build()?;
1164 ///
1165 /// // Stream NFT transfer events with transaction context
1166 /// let query = Query::new()
1167 /// .from_block(19000000)
1168 /// .where_logs(
1169 /// LogFilter::all()
1170 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1171 /// )
1172 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1173 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1174 /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1175 ///
1176 /// while let Some(response) = receiver.recv().await {
1177 /// let response = response?;
1178 /// println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1179 /// }
1180 /// # Ok(())
1181 /// # }
1182 /// ```
1183 pub async fn stream_events(
1184 &self,
1185 mut query: Query,
1186 config: StreamConfig,
1187 ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1188 check_simple_stream_params(&config)?;
1189
1190 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1191
1192 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1193
1194 let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1195 mpsc::channel(config.concurrency);
1196
1197 let mut inner_rx = self
1198 .stream_arrow(query, config)
1199 .await
1200 .context("start inner stream")?;
1201
1202 tokio::spawn(async move {
1203 while let Some(resp) = inner_rx.recv().await {
1204 let msg = resp
1205 .context("inner receiver")
1206 .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1207 let is_err = msg.is_err();
1208 if tx.send(msg).await.is_err() || is_err {
1209 return;
1210 }
1211 }
1212 });
1213
1214 Ok(rx)
1215 }
1216
1217 /// Spawns task to execute query and return data via a channel in Arrow format.
1218 ///
1219 /// Returns raw Apache Arrow data via a channel for high-performance processing.
1220 /// Ideal for applications that need to work directly with columnar data.
1221 ///
1222 /// # Example
1223 /// ```no_run
1224 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1225 ///
1226 /// # async fn example() -> anyhow::Result<()> {
1227 /// let client = Client::builder()
1228 /// .chain_id(1)
1229 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1230 /// .build()?;
1231 ///
1232 /// // Stream transaction data in Arrow format for analytics
1233 /// let query = Query::new()
1234 /// .from_block(19000000)
1235 /// .to_block_excl(19000100)
1236 /// .where_transactions(
1237 /// TransactionFilter::all()
1238 /// .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1239 /// )
1240 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1241 /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1242 ///
1243 /// while let Some(response) = receiver.recv().await {
1244 /// let response = response?;
1245 /// println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1246 /// }
1247 /// # Ok(())
1248 /// # }
1249 /// ```
1250 pub async fn stream_arrow(
1251 &self,
1252 query: Query,
1253 config: StreamConfig,
1254 ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1255 stream::stream_arrow(self, query, config).await
1256 }
1257
1258 /// Executes query and returns the response in Arrow format along with
1259 /// rate limit information from the server.
1260 ///
1261 /// Unlike [`get_arrow`](Self::get_arrow), this method does **not** retry on
1262 /// HTTP 429 responses. Instead it returns
1263 /// [`RateLimitResponse::RateLimited`] so the caller can implement their own
1264 /// back-off. Other transient errors are still retried normally.
1265 pub async fn get_arrow_with_rate_limit(
1266 &self,
1267 query: &Query,
1268 ) -> Result<RateLimitResponse<ArrowResponseData>> {
1269 const WAIT_ON_RATE_LIMIT: bool = false;
1270 match self.get_arrow_with_size(query, WAIT_ON_RATE_LIMIT).await {
1271 Ok(result) => Ok(RateLimitResponse::Success {
1272 response: result.response,
1273 rate_limit: result.rate_limit,
1274 }),
1275 Err(e) => match e.downcast::<HyperSyncResponseError>() {
1276 Ok(HyperSyncResponseError::RateLimited { rate_limit }) => {
1277 Ok(RateLimitResponse::RateLimited(rate_limit))
1278 }
1279 Ok(other) => Err(other.into()),
1280 Err(e) => Err(e),
1281 },
1282 }
1283 }
1284
1285 /// Executes query and returns the response along with
1286 /// rate limit information from the server.
1287 ///
1288 /// Unlike [`get`](Self::get), this method does **not** retry on HTTP 429
1289 /// responses. Instead it returns
1290 /// [`RateLimitResponse::RateLimited`] so the caller can implement their own
1291 /// back-off. Other transient errors are still retried normally.
1292 pub async fn get_with_rate_limit(
1293 &self,
1294 query: &Query,
1295 ) -> Result<RateLimitResponse<ResponseData>> {
1296 match self.get_arrow_with_rate_limit(query).await? {
1297 RateLimitResponse::Success {
1298 response,
1299 rate_limit,
1300 } => {
1301 let converted =
1302 QueryResponse::try_from(&response).context("convert arrow response")?;
1303 Ok(RateLimitResponse::Success {
1304 response: converted,
1305 rate_limit,
1306 })
1307 }
1308 RateLimitResponse::RateLimited(info) => Ok(RateLimitResponse::RateLimited(info)),
1309 }
1310 }
1311
1312 /// Executes query and returns joined events along with rate limit
1313 /// information from the server.
1314 ///
1315 /// Unlike [`get_events`](Self::get_events), this method does **not** retry
1316 /// on HTTP 429 responses. Instead it returns
1317 /// [`RateLimitResponse::RateLimited`] so the caller can implement their own
1318 /// back-off. Other transient errors are still retried normally.
1319 pub async fn get_events_with_rate_limit(
1320 &self,
1321 mut query: Query,
1322 ) -> Result<RateLimitResponse<Vec<simple_types::Event>>> {
1323 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1324 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1325 match self.get_arrow_with_rate_limit(&query).await? {
1326 RateLimitResponse::Success {
1327 response,
1328 rate_limit,
1329 } => {
1330 let converted =
1331 EventResponse::try_from_arrow_response(&response, &event_join_strategy)?;
1332 Ok(RateLimitResponse::Success {
1333 response: converted,
1334 rate_limit,
1335 })
1336 }
1337 RateLimitResponse::RateLimited(info) => Ok(RateLimitResponse::RateLimited(info)),
1338 }
1339 }
1340
1341 /// Returns the most recently observed rate limit information, if any.
1342 ///
1343 /// Updated after every request (including inside streams). Returns `None`
1344 /// if no requests have been made yet or the server hasn't returned rate limit headers.
1345 pub fn rate_limit_info(&self) -> Option<RateLimitInfo> {
1346 self.inner
1347 .rate_limit_state
1348 .lock()
1349 .expect("rate_limit_state mutex poisoned")
1350 .as_ref()
1351 .map(|(info, _captured_at)| info.clone())
1352 }
1353
1354 /// Returns the current rate limit info if the client is known to be rate-limited
1355 /// and the reset window has not yet elapsed.
1356 fn get_proactive_rate_limit_info(&self) -> Option<RateLimitInfo> {
1357 let state = self
1358 .inner
1359 .rate_limit_state
1360 .lock()
1361 .expect("rate_limit_state mutex poisoned");
1362 match state.as_ref() {
1363 Some((info, captured_at)) if info.is_rate_limited() => {
1364 let remaining_wait = info.suggested_wait_secs().map(|secs| {
1365 let elapsed = captured_at.elapsed().as_secs();
1366 secs.saturating_sub(elapsed)
1367 });
1368 if remaining_wait.unwrap_or(0) > 0 {
1369 Some(info.clone())
1370 } else {
1371 None
1372 }
1373 }
1374 _ => None,
1375 }
1376 }
1377
1378 /// Waits until the current rate limit window resets, if the client is rate limited.
1379 ///
1380 /// Returns immediately if:
1381 /// - No rate limit information has been observed yet
1382 /// - There is remaining quota in the current window
1383 ///
1384 /// This method is useful for consumers who want to explicitly wait before making
1385 /// requests, for example when coordinating rate limits across multiple systems.
1386 pub async fn wait_for_rate_limit(&self) {
1387 if let Some(info) = self.get_proactive_rate_limit_info() {
1388 let secs = info.suggested_wait_secs().unwrap_or(0);
1389 if secs > 0 {
1390 log::warn!(
1391 "rate limit exhausted ({info}), proactively waiting {secs}s for window reset. To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens"
1392 );
1393 tokio::time::sleep(Duration::from_secs(secs)).await;
1394 }
1395 }
1396 }
1397
1398 /// Updates the internally tracked rate limit state with the current timestamp.
1399 fn update_rate_limit_state(&self, rate_limit: &RateLimitInfo) {
1400 // Only update if the response actually contained rate limit headers
1401 if rate_limit.limit.is_some()
1402 || rate_limit.remaining.is_some()
1403 || rate_limit.reset_secs.is_some()
1404 {
1405 let mut state = self
1406 .inner
1407 .rate_limit_state
1408 .lock()
1409 .expect("rate_limit_state mutex poisoned");
1410 *state = Some((rate_limit.clone(), Instant::now()));
1411 }
1412 }
1413
1414 /// Getter for url field.
1415 ///
1416 /// # Example
1417 /// ```
1418 /// use hypersync_client::Client;
1419 ///
1420 /// let client = Client::builder()
1421 /// .chain_id(1)
1422 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1423 /// .build()
1424 /// .unwrap();
1425 ///
1426 /// println!("Client URL: {}", client.url());
1427 /// ```
1428 pub fn url(&self) -> &Url {
1429 &self.inner.url
1430 }
1431}
1432
1433/// Builder for creating a hypersync client with configuration options.
1434///
1435/// This builder provides a fluent API for configuring client settings like URL,
1436/// authentication, timeouts, and retry behavior.
1437///
1438/// # Example
1439/// ```
1440/// use hypersync_client::{Client, SerializationFormat};
1441///
1442/// let client = Client::builder()
1443/// .chain_id(1)
1444/// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1445/// .http_req_timeout_millis(30000)
1446/// .max_num_retries(3)
1447/// .build()
1448/// .unwrap();
1449/// ```
1450#[derive(Debug, Clone, Default)]
1451pub struct ClientBuilder(ClientConfig);
1452
1453impl ClientBuilder {
1454 /// Creates a new ClientBuilder with default configuration.
1455 pub fn new() -> Self {
1456 Self::default()
1457 }
1458
1459 /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1460 ///
1461 /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1462 ///
1463 /// # Arguments
1464 /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1465 ///
1466 /// # Example
1467 /// ```
1468 /// use hypersync_client::Client;
1469 ///
1470 /// let client = Client::builder()
1471 /// .chain_id(1) // Ethereum mainnet
1472 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1473 /// .build()
1474 /// .unwrap();
1475 /// ```
1476 pub fn chain_id(mut self, chain_id: u64) -> Self {
1477 self.0.url = format!("https://{chain_id}.hypersync.xyz");
1478 self
1479 }
1480
1481 /// Sets a custom URL for the hypersync server.
1482 ///
1483 /// Use this method when you need to connect to a custom hypersync endpoint
1484 /// instead of the default public endpoints.
1485 ///
1486 /// # Arguments
1487 /// * `url` - The hypersync server URL
1488 ///
1489 /// # Example
1490 /// ```
1491 /// use hypersync_client::Client;
1492 ///
1493 /// let client = Client::builder()
1494 /// .url("https://my-custom-hypersync.example.com")
1495 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1496 /// .build()
1497 /// .unwrap();
1498 /// ```
1499 pub fn url<S: ToString>(mut self, url: S) -> Self {
1500 self.0.url = url.to_string();
1501 self
1502 }
1503
1504 /// Sets the api token for authentication.
1505 ///
1506 /// Required for accessing authenticated hypersync endpoints.
1507 ///
1508 /// # Arguments
1509 /// * `api_token` - The authentication token
1510 ///
1511 /// # Example
1512 /// ```
1513 /// use hypersync_client::Client;
1514 ///
1515 /// let client = Client::builder()
1516 /// .chain_id(1)
1517 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1518 /// .build()
1519 /// .unwrap();
1520 /// ```
1521 pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1522 self.0.api_token = api_token.to_string();
1523 self
1524 }
1525
1526 /// Sets the HTTP request timeout in milliseconds.
1527 ///
1528 /// # Arguments
1529 /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1530 ///
1531 /// # Example
1532 /// ```
1533 /// use hypersync_client::Client;
1534 ///
1535 /// let client = Client::builder()
1536 /// .chain_id(1)
1537 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1538 /// .http_req_timeout_millis(60000) // 60 second timeout
1539 /// .build()
1540 /// .unwrap();
1541 /// ```
1542 pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1543 self.0.http_req_timeout_millis = http_req_timeout_millis;
1544 self
1545 }
1546
1547 /// Sets the maximum number of retries for failed requests.
1548 ///
1549 /// # Arguments
1550 /// * `max_num_retries` - Maximum number of retries (default: 10)
1551 ///
1552 /// # Example
1553 /// ```
1554 /// use hypersync_client::Client;
1555 ///
1556 /// let client = Client::builder()
1557 /// .chain_id(1)
1558 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1559 /// .max_num_retries(5)
1560 /// .build()
1561 /// .unwrap();
1562 /// ```
1563 pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1564 self.0.max_num_retries = max_num_retries;
1565 self
1566 }
1567
1568 /// Sets the backoff increment for retry delays.
1569 ///
1570 /// This value is added to the base delay on each retry attempt.
1571 ///
1572 /// # Arguments
1573 /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1574 ///
1575 /// # Example
1576 /// ```
1577 /// use hypersync_client::Client;
1578 ///
1579 /// let client = Client::builder()
1580 /// .chain_id(1)
1581 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1582 /// .retry_backoff_ms(1000) // 1 second backoff increment
1583 /// .build()
1584 /// .unwrap();
1585 /// ```
1586 pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1587 self.0.retry_backoff_ms = retry_backoff_ms;
1588 self
1589 }
1590
1591 /// Sets the initial delay for retry attempts.
1592 ///
1593 /// # Arguments
1594 /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1595 ///
1596 /// # Example
1597 /// ```
1598 /// use hypersync_client::Client;
1599 ///
1600 /// let client = Client::builder()
1601 /// .chain_id(1)
1602 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1603 /// .retry_base_ms(1000) // Start with 1 second delay
1604 /// .build()
1605 /// .unwrap();
1606 /// ```
1607 pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1608 self.0.retry_base_ms = retry_base_ms;
1609 self
1610 }
1611
1612 /// Sets the maximum delay for retry attempts.
1613 ///
1614 /// The retry delay will not exceed this value, even with backoff increments.
1615 ///
1616 /// # Arguments
1617 /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1618 ///
1619 /// # Example
1620 /// ```
1621 /// use hypersync_client::Client;
1622 ///
1623 /// let client = Client::builder()
1624 /// .chain_id(1)
1625 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1626 /// .retry_ceiling_ms(30000) // Cap at 30 seconds
1627 /// .build()
1628 /// .unwrap();
1629 /// ```
1630 pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1631 self.0.retry_ceiling_ms = retry_ceiling_ms;
1632 self
1633 }
1634
1635 /// Sets whether to proactively sleep when rate limited.
1636 ///
1637 /// When enabled (default), the client will wait for the rate limit window to reset
1638 /// before sending requests it knows will be rejected. Set to `false` to disable
1639 /// this behavior and handle rate limits yourself.
1640 ///
1641 /// # Arguments
1642 /// * `proactive_rate_limit_sleep` - Whether to enable proactive rate limit sleeping (default: true)
1643 pub fn proactive_rate_limit_sleep(mut self, proactive_rate_limit_sleep: bool) -> Self {
1644 self.0.proactive_rate_limit_sleep = proactive_rate_limit_sleep;
1645 self
1646 }
1647
1648 /// Sets the serialization format for client-server communication.
1649 ///
1650 /// # Arguments
1651 /// * `serialization_format` - The format to use (JSON or CapnProto)
1652 ///
1653 /// # Example
1654 /// ```
1655 /// use hypersync_client::{Client, SerializationFormat};
1656 ///
1657 /// let client = Client::builder()
1658 /// .chain_id(1)
1659 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1660 /// .serialization_format(SerializationFormat::Json)
1661 /// .build()
1662 /// .unwrap();
1663 /// ```
1664 pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1665 self.0.serialization_format = serialization_format;
1666 self
1667 }
1668
1669 /// Builds the client with the configured settings.
1670 ///
1671 /// # Returns
1672 /// * `Result<Client>` - The configured client or an error if configuration is invalid
1673 ///
1674 /// # Errors
1675 /// Returns an error if:
1676 /// * The URL is malformed
1677 /// * Required configuration is missing
1678 ///
1679 /// # Example
1680 /// ```
1681 /// use hypersync_client::Client;
1682 ///
1683 /// let client = Client::builder()
1684 /// .chain_id(1)
1685 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1686 /// .build()
1687 /// .unwrap();
1688 /// ```
1689 pub fn build(self) -> Result<Client> {
1690 if self.0.url.is_empty() {
1691 anyhow::bail!(
1692 "endpoint needs to be set, try using builder.chain_id(1) or\
1693 builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1694 )
1695 }
1696 Client::new(self.0)
1697 }
1698}
1699
1700/// 200ms
1701const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1702const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1703/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1704/// so we timeout after 15s (3x the ping interval).
1705const READ_TIMEOUT: Duration = Duration::from_secs(15);
1706
1707/// Events emitted by the height stream.
1708#[derive(Debug, Clone, PartialEq, Eq)]
1709pub enum HeightStreamEvent {
1710 /// Successfully connected or reconnected to the SSE stream.
1711 Connected,
1712 /// Received a height update from the server.
1713 Height(u64),
1714 /// Connection lost, will attempt to reconnect after the specified delay.
1715 Reconnecting {
1716 /// Duration to wait before attempting reconnection.
1717 delay: Duration,
1718 /// The error that caused the reconnection.
1719 error_msg: String,
1720 },
1721}
1722
1723enum InternalStreamEvent {
1724 Publish(HeightStreamEvent),
1725 Ping,
1726 Unknown(String),
1727}
1728
1729impl Client {
1730 fn get_es_stream(&self) -> Result<EventSource> {
1731 // Build the GET /height/sse request
1732 let mut url = self.inner.url.clone();
1733 url.path_segments_mut()
1734 .ok()
1735 .context("invalid base URL")?
1736 .extend(&["height", "sse"]);
1737
1738 let req = self
1739 .inner
1740 .http_client
1741 // Don't set timeout for SSE stream
1742 .request_no_timeout(Method::GET, url);
1743
1744 // Configure exponential backoff for library-level retries
1745 let retry_policy = ExponentialBackoff::new(
1746 INITIAL_RECONNECT_DELAY,
1747 2.0,
1748 Some(MAX_RECONNECT_DELAY),
1749 None, // unlimited retries
1750 );
1751
1752 // Turn the request into an EventSource stream with retries
1753 let mut es = reqwest_eventsource::EventSource::new(req)
1754 .context("unexpected error creating EventSource")?;
1755 es.set_retry_policy(Box::new(retry_policy));
1756 Ok(es)
1757 }
1758
1759 async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1760 let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1761 .await
1762 .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1763 else {
1764 return Ok(None);
1765 };
1766
1767 let e = match res.context("failed response")? {
1768 Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1769 Event::Message(event) => match event.event.as_str() {
1770 "height" => {
1771 let height = event
1772 .data
1773 .trim()
1774 .parse::<u64>()
1775 .context("parsing height from event data")?;
1776 InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1777 }
1778 "ping" => InternalStreamEvent::Ping,
1779 _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1780 },
1781 };
1782
1783 Ok(Some(e))
1784 }
1785
1786 async fn stream_height_events(
1787 es: &mut EventSource,
1788 tx: &mpsc::Sender<HeightStreamEvent>,
1789 ) -> Result<bool> {
1790 let mut received_an_event = false;
1791 while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1792 match event {
1793 InternalStreamEvent::Publish(event) => {
1794 received_an_event = true;
1795 if tx.send(event).await.is_err() {
1796 return Ok(received_an_event); // Receiver dropped, exit task
1797 }
1798 }
1799 InternalStreamEvent::Ping => (), // ignore pings
1800 InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1801 }
1802 }
1803 Ok(received_an_event)
1804 }
1805
1806 fn get_delay(consecutive_failures: u32) -> Duration {
1807 if consecutive_failures > 0 {
1808 /// helper function to calculate 2^x
1809 /// optimization using bit shifting
1810 const fn two_to_pow(x: u32) -> u32 {
1811 1 << x
1812 }
1813 // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1814 INITIAL_RECONNECT_DELAY
1815 .saturating_mul(two_to_pow(consecutive_failures - 1))
1816 .min(MAX_RECONNECT_DELAY)
1817 } else {
1818 // On zero consecutive failures, 0 delay
1819 Duration::from_millis(0)
1820 }
1821 }
1822
1823 async fn stream_height_events_with_retry(
1824 &self,
1825 tx: &mpsc::Sender<HeightStreamEvent>,
1826 ) -> Result<()> {
1827 let mut consecutive_failures = 0u32;
1828
1829 loop {
1830 // should always be able to creat a new es stream
1831 // something is wrong with the req builder otherwise
1832 let mut es = self.get_es_stream().context("get es stream")?;
1833
1834 let mut error = anyhow!("");
1835
1836 match Self::stream_height_events(&mut es, tx).await {
1837 Ok(received_an_event) => {
1838 if received_an_event {
1839 consecutive_failures = 0; // Reset after successful connection that then failed
1840 }
1841 log::trace!("Stream height exited");
1842 }
1843 Err(e) => {
1844 log::trace!("Stream height failed: {e:?}");
1845 error = e;
1846 }
1847 }
1848
1849 es.close();
1850
1851 // If the receiver is closed, exit the task
1852 if tx.is_closed() {
1853 break;
1854 }
1855
1856 let delay = Self::get_delay(consecutive_failures);
1857 log::trace!("Reconnecting in {:?}...", delay);
1858
1859 let error_msg = format!("{error:?}");
1860
1861 if tx
1862 .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1863 .await
1864 .is_err()
1865 {
1866 return Ok(()); // Receiver dropped, exit task
1867 }
1868 tokio::time::sleep(delay).await;
1869
1870 // increment consecutive failures so that on the next try
1871 // it will start using back offs
1872 consecutive_failures += 1;
1873 }
1874
1875 Ok(())
1876 }
1877
1878 /// Streams archive height updates from the server via Server-Sent Events.
1879 ///
1880 /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1881 /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1882 ///
1883 /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1884 /// and height updates. This allows applications to display connection status to users.
1885 ///
1886 /// # Returns
1887 /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1888 /// lifecycle and sends events through this channel.
1889 ///
1890 /// # Example
1891 /// ```
1892 /// # use hypersync_client::{Client, HeightStreamEvent};
1893 /// # async fn example() -> anyhow::Result<()> {
1894 /// let client = Client::builder()
1895 /// .url("https://eth.hypersync.xyz")
1896 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1897 /// .build()?;
1898 ///
1899 /// let mut rx = client.stream_height();
1900 ///
1901 /// while let Some(event) = rx.recv().await {
1902 /// match event {
1903 /// HeightStreamEvent::Connected => println!("Connected to stream"),
1904 /// HeightStreamEvent::Height(h) => println!("Height: {}", h),
1905 /// HeightStreamEvent::Reconnecting { delay, error_msg } => {
1906 /// println!("Reconnecting in {delay:?} due to error: {error_msg}")
1907 /// }
1908 /// }
1909 /// }
1910 /// # Ok(())
1911 /// # }
1912 /// ```
1913 pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1914 let (tx, rx) = mpsc::channel(16);
1915 let client = self.clone();
1916
1917 tokio::spawn(async move {
1918 if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1919 log::error!("Stream height failed unexpectedly: {e:?}");
1920 }
1921 });
1922
1923 rx
1924 }
1925}
1926
1927fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1928 if config.event_signature.is_some() {
1929 return Err(anyhow!(
1930 "config.event_signature can't be passed to simple type function. User is expected to \
1931 decode the logs using Decoder."
1932 ));
1933 }
1934 if config.column_mapping.is_some() {
1935 return Err(anyhow!(
1936 "config.column_mapping can't be passed to single type function. User is expected to \
1937 map values manually."
1938 ));
1939 }
1940
1941 Ok(())
1942}
1943
1944/// Used to indicate whether or not a retry should be attempted.
1945#[derive(Debug, thiserror::Error)]
1946pub enum HyperSyncResponseError {
1947 /// Means that the client should retry with a smaller block range.
1948 #[error("hypersync responded with 'payload too large' error")]
1949 PayloadTooLarge,
1950 /// Server responded with 429 Too Many Requests.
1951 #[error("rate limited by server. To increase your rate limits, upgrade your plan at https://envio.dev/app/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens")]
1952 RateLimited {
1953 /// Rate limit information from the 429 response headers.
1954 rate_limit: RateLimitInfo,
1955 },
1956 /// Any other server error.
1957 #[error(transparent)]
1958 Other(#[from] anyhow::Error),
1959}
1960
1961/// Result of a single Arrow query execution, before retry logic.
1962struct ArrowImplResponse {
1963 /// The parsed Arrow response data.
1964 response: ArrowResponse,
1965 /// Size of the response body in bytes.
1966 response_bytes: u64,
1967 /// Rate limit information parsed from response headers.
1968 rate_limit: RateLimitInfo,
1969}
1970
1971#[cfg(test)]
1972mod tests {
1973 use super::*;
1974 #[test]
1975 fn test_get_delay() {
1976 assert_eq!(
1977 Client::get_delay(0),
1978 Duration::from_millis(0),
1979 "starts with 0 delay"
1980 );
1981 // powers of 2 backoff
1982 assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1983 assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1984 assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1985 // maxes out at 30s
1986 assert_eq!(
1987 Client::get_delay(9),
1988 Duration::from_secs(30),
1989 "max delay is 30s"
1990 );
1991 assert_eq!(
1992 Client::get_delay(10),
1993 Duration::from_secs(30),
1994 "max delay is 30s"
1995 );
1996 }
1997
1998 #[tokio::test]
1999 #[ignore = "integration test requiring live hs server"]
2000 async fn test_http2_is_used() -> anyhow::Result<()> {
2001 let api_token = std::env::var("ENVIO_API_TOKEN")?;
2002 let client = reqwest::Client::builder()
2003 .no_gzip()
2004 .user_agent("hscr-test")
2005 .build()?;
2006
2007 let res = client
2008 .get("https://eth.hypersync.xyz/height")
2009 .bearer_auth(&api_token)
2010 .send()
2011 .await?;
2012
2013 assert_eq!(
2014 res.version(),
2015 reqwest::Version::HTTP_2,
2016 "expected HTTP/2 but got {:?}",
2017 res.version()
2018 );
2019 assert!(res.status().is_success());
2020 Ok(())
2021 }
2022
2023 #[tokio::test]
2024 #[ignore = "integration test with live hs server for height stream"]
2025 async fn test_stream_height_events() -> anyhow::Result<()> {
2026 let (tx, mut rx) = mpsc::channel(16);
2027 let handle = tokio::spawn(async move {
2028 let client = Client::builder()
2029 .url("https://monad-testnet.hypersync.xyz")
2030 .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
2031 .build()?;
2032 let mut es = client.get_es_stream().context("get es stream")?;
2033 Client::stream_height_events(&mut es, &tx).await
2034 });
2035
2036 let val = rx.recv().await;
2037 assert!(val.is_some());
2038 assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
2039 let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
2040 panic!("should have received height")
2041 };
2042 let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
2043 panic!("should have received height")
2044 };
2045 assert!(height2 > height);
2046 drop(rx);
2047
2048 let res = handle.await.expect("should have joined");
2049 let received_an_event =
2050 res.expect("should have ended the stream gracefully after dropping rx");
2051 assert!(received_an_event, "should have received an event");
2052
2053 Ok(())
2054 }
2055}