hypersync_client/lib.rs
1#![deny(missing_docs)]
2//! # HyperSync Client
3//!
4//! A high-performance Rust client for the HyperSync protocol, enabling efficient retrieval
5//! of blockchain data including blocks, transactions, logs, and traces.
6//!
7//! ## Features
8//!
9//! - **High-performance streaming**: Parallel data fetching with automatic retries
10//! - **Flexible querying**: Rich query builder API for precise data selection
11//! - **Multiple data formats**: Support for Arrow, Parquet, and simple Rust types
12//! - **Event decoding**: Automatic ABI decoding for smart contract events
13//! - **Real-time updates**: Live height streaming via Server-Sent Events
14//! - **Production ready**: Built-in rate limiting, retries, and error handling
15//!
16//! ## Quick Start
17//!
18//! ```no_run
19//! use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//! // Create a client for Ethereum mainnet
24//! let client = Client::builder()
25//! .chain_id(1)
26//! .api_token(std::env::var("ENVIO_API_TOKEN")?)
27//! .build()?;
28//!
29//! // Query ERC20 transfer events from USDC contract
30//! let query = Query::new()
31//! .from_block(19000000)
32//! .to_block_excl(19001000)
33//! .where_logs(
34//! LogFilter::all()
35//! .and_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
36//! .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
37//! )
38//! .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
39//!
40//! // Get all data in one response
41//! let response = client.get(&query).await?;
42//! println!("Retrieved {} blocks", response.data.blocks.len());
43//!
44//! // Or stream data for large ranges
45//! let mut receiver = client.stream(query, StreamConfig::default()).await?;
46//! while let Some(response) = receiver.recv().await {
47//! let response = response?;
48//! println!("Streaming: got blocks up to {}", response.next_block);
49//! }
50//!
51//! Ok(())
52//! }
53//! ```
54//!
55//! ## Main Types
56//!
57//! - [`Client`] - Main client for interacting with HyperSync servers
58//! - [`net_types::Query`] - Query builder for specifying what data to fetch
59//! - [`StreamConfig`] - Configuration for streaming operations
60//! - [`QueryResponse`] - Response containing blocks, transactions, logs, and traces
61//! - [`ArrowResponse`] - Response in Apache Arrow format for high-performance processing
62//!
63//! ## Authentication
64//!
65//! You'll need a HyperSync API token to access the service. Get one from
66//! [https://envio.dev/app/api-tokens](https://envio.dev/app/api-tokens).
67//!
68//! ## Examples
69//!
70//! See the `examples/` directory for more detailed usage patterns including:
71//! - ERC20 token transfers
72//! - Wallet transaction history
73//! - Event decoding and filtering
74//! - Real-time data streaming
75use std::{sync::Arc, time::Duration};
76
77use anyhow::{anyhow, Context, Result};
78use futures::StreamExt;
79use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
80use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk};
81use reqwest::{header, Method};
82use reqwest_eventsource::retry::ExponentialBackoff;
83use reqwest_eventsource::{Event, EventSource};
84
85mod column_mapping;
86mod config;
87mod decode;
88mod decode_call;
89mod from_arrow;
90mod parquet_out;
91mod parse_response;
92pub mod preset_query;
93mod rayon_async;
94pub mod simple_types;
95mod stream;
96#[cfg(feature = "ethers")]
97pub mod to_ethers;
98mod types;
99mod util;
100
101pub use from_arrow::FromArrow;
102pub use hypersync_format as format;
103pub use hypersync_net_types as net_types;
104pub use hypersync_schema as schema;
105
106use parse_response::parse_query_response;
107use tokio::sync::mpsc;
108use types::{EventResponse, ResponseData};
109use url::Url;
110
111pub use column_mapping::{ColumnMapping, DataType};
112pub use config::HexOutput;
113pub use config::{ClientConfig, SerializationFormat, StreamConfig};
114pub use decode::Decoder;
115pub use decode_call::CallDecoder;
116pub use types::{ArrowBatch, ArrowResponse, ArrowResponseData, QueryResponse};
117
118use crate::parse_response::read_query_response;
119use crate::simple_types::InternalEventJoinStrategy;
120
121type ArrowChunk = Chunk<Box<dyn Array>>;
122
123/// Internal client state to handle http requests and retries.
124#[derive(Debug)]
125struct ClientInner {
126 /// Initialized reqwest instance for client url.
127 http_client: reqwest::Client,
128 /// HyperSync server URL.
129 url: Url,
130 /// HyperSync server api token.
131 api_token: String,
132 /// Number of retries to attempt before returning error.
133 max_num_retries: usize,
134 /// Milliseconds that would be used for retry backoff increasing.
135 retry_backoff_ms: u64,
136 /// Initial wait time for request backoff.
137 retry_base_ms: u64,
138 /// Ceiling time for request backoff.
139 retry_ceiling_ms: u64,
140 /// Query serialization format to use for HTTP requests.
141 serialization_format: SerializationFormat,
142}
143
144/// Client to handle http requests and retries.
145#[derive(Clone, Debug)]
146pub struct Client {
147 inner: Arc<ClientInner>,
148}
149
150impl Client {
151 /// Creates a new client with the given configuration.
152 ///
153 /// Configuration must include the `url` and `api_token` fields.
154 /// # Example
155 /// ```
156 /// use hypersync_client::{Client, ClientConfig};
157 ///
158 /// let config = ClientConfig {
159 /// url: "https://eth.hypersync.xyz".to_string(),
160 /// api_token: std::env::var("ENVIO_API_TOKEN")?,
161 /// ..Default::default()
162 /// };
163 /// let client = Client::new(config)?;
164 /// # Ok::<(), anyhow::Error>(())
165 /// ```
166 ///
167 /// # Errors
168 /// This method fails if the config is invalid.
169 pub fn new(cfg: ClientConfig) -> Result<Self> {
170 // hscr stands for hypersync client rust
171 cfg.validate().context("invalid ClientConfig")?;
172 let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
173 Self::new_internal(cfg, user_agent)
174 }
175
176 /// Creates a new client builder.
177 ///
178 /// # Example
179 /// ```
180 /// use hypersync_client::Client;
181 ///
182 /// let client = Client::builder()
183 /// .chain_id(1)
184 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
185 /// .build()
186 /// .unwrap();
187 /// ```
188 pub fn builder() -> ClientBuilder {
189 ClientBuilder::new()
190 }
191
192 #[doc(hidden)]
193 pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
194 // Creates a new client with the given configuration and custom user agent.
195 // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
196 Self::new_internal(cfg, user_agent.into())
197 }
198
199 /// Internal constructor that takes both config and user agent.
200 fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
201 let http_client = reqwest::Client::builder()
202 .no_gzip()
203 .timeout(Duration::from_millis(cfg.http_req_timeout_millis))
204 .user_agent(user_agent)
205 .build()
206 .unwrap();
207
208 let url = Url::parse(&cfg.url).context("url is malformed")?;
209
210 Ok(Self {
211 inner: Arc::new(ClientInner {
212 http_client,
213 url,
214 api_token: cfg.api_token,
215 max_num_retries: cfg.max_num_retries,
216 retry_backoff_ms: cfg.retry_backoff_ms,
217 retry_base_ms: cfg.retry_base_ms,
218 retry_ceiling_ms: cfg.retry_ceiling_ms,
219 serialization_format: cfg.serialization_format,
220 }),
221 })
222 }
223
224 /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
225 /// query and stream configuration.
226 ///
227 /// ### Implementation
228 /// Runs multiple queries simultaneously based on config.concurrency.
229 ///
230 /// Each query runs until it reaches query.to, server height, any max_num_* query param,
231 /// or execution timed out by server.
232 ///
233 /// # ⚠️ Important Warning
234 ///
235 /// This method will continue executing until the query has run to completion from beginning
236 /// to the end of the block range defined in the query. For heavy queries with large block
237 /// ranges or high data volumes, consider:
238 ///
239 /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
240 /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
241 /// - Break large queries into smaller block ranges
242 ///
243 /// # Example
244 /// ```no_run
245 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
246 ///
247 /// # async fn example() -> anyhow::Result<()> {
248 /// let client = Client::builder()
249 /// .chain_id(1)
250 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
251 /// .build()?;
252 ///
253 /// // Query ERC20 transfer events
254 /// let query = Query::new()
255 /// .from_block(19000000)
256 /// .to_block_excl(19000010)
257 /// .where_logs(
258 /// LogFilter::all()
259 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
260 /// )
261 /// .select_log_fields([LogField::Address, LogField::Data]);
262 /// let response = client.collect(query, StreamConfig::default()).await?;
263 ///
264 /// println!("Collected {} events", response.data.logs.len());
265 /// # Ok(())
266 /// # }
267 /// ```
268 pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
269 check_simple_stream_params(&config)?;
270
271 let mut recv = stream::stream_arrow(self, query, config)
272 .await
273 .context("start stream")?;
274
275 let mut data = ResponseData::default();
276 let mut archive_height = None;
277 let mut next_block = 0;
278 let mut total_execution_time = 0;
279
280 while let Some(res) = recv.recv().await {
281 let res = res.context("get response")?;
282 let res: QueryResponse = QueryResponse::from(&res);
283
284 for batch in res.data.blocks {
285 data.blocks.push(batch);
286 }
287 for batch in res.data.transactions {
288 data.transactions.push(batch);
289 }
290 for batch in res.data.logs {
291 data.logs.push(batch);
292 }
293 for batch in res.data.traces {
294 data.traces.push(batch);
295 }
296
297 archive_height = res.archive_height;
298 next_block = res.next_block;
299 total_execution_time += res.total_execution_time
300 }
301
302 Ok(QueryResponse {
303 archive_height,
304 next_block,
305 total_execution_time,
306 data,
307 rollback_guard: None,
308 })
309 }
310
311 /// Retrieves events through a stream using the provided query and stream configuration.
312 ///
313 /// # ⚠️ Important Warning
314 ///
315 /// This method will continue executing until the query has run to completion from beginning
316 /// to the end of the block range defined in the query. For heavy queries with large block
317 /// ranges or high data volumes, consider:
318 ///
319 /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
320 /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
321 /// - Break large queries into smaller block ranges
322 ///
323 /// # Example
324 /// ```no_run
325 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
326 ///
327 /// # async fn example() -> anyhow::Result<()> {
328 /// let client = Client::builder()
329 /// .chain_id(1)
330 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
331 /// .build()?;
332 ///
333 /// // Query transactions to a specific address
334 /// let query = Query::new()
335 /// .from_block(19000000)
336 /// .to_block_excl(19000100)
337 /// .where_transactions(
338 /// TransactionFilter::all()
339 /// .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
340 /// )
341 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
342 /// let response = client.collect_events(query, StreamConfig::default()).await?;
343 ///
344 /// println!("Collected {} events", response.data.len());
345 /// # Ok(())
346 /// # }
347 /// ```
348 pub async fn collect_events(
349 &self,
350 mut query: Query,
351 config: StreamConfig,
352 ) -> Result<EventResponse> {
353 check_simple_stream_params(&config)?;
354
355 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
356 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
357
358 let mut recv = stream::stream_arrow(self, query, config)
359 .await
360 .context("start stream")?;
361
362 let mut data = Vec::new();
363 let mut archive_height = None;
364 let mut next_block = 0;
365 let mut total_execution_time = 0;
366
367 while let Some(res) = recv.recv().await {
368 let res = res.context("get response")?;
369 let res: QueryResponse = QueryResponse::from(&res);
370 let events = event_join_strategy.join_from_response_data(res.data);
371
372 data.extend(events);
373
374 archive_height = res.archive_height;
375 next_block = res.next_block;
376 total_execution_time += res.total_execution_time
377 }
378
379 Ok(EventResponse {
380 archive_height,
381 next_block,
382 total_execution_time,
383 data,
384 rollback_guard: None,
385 })
386 }
387
388 /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
389 /// the provided query and stream configuration.
390 ///
391 /// Returns data in Apache Arrow format for high-performance columnar processing.
392 /// Useful for analytics workloads or when working with Arrow-compatible tools.
393 ///
394 /// # ⚠️ Important Warning
395 ///
396 /// This method will continue executing until the query has run to completion from beginning
397 /// to the end of the block range defined in the query. For heavy queries with large block
398 /// ranges or high data volumes, consider:
399 ///
400 /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
401 /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
402 /// - Break large queries into smaller block ranges
403 ///
404 /// # Example
405 /// ```no_run
406 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
407 ///
408 /// # async fn example() -> anyhow::Result<()> {
409 /// let client = Client::builder()
410 /// .chain_id(1)
411 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
412 /// .build()?;
413 ///
414 /// // Get block data in Arrow format for analytics
415 /// let query = Query::new()
416 /// .from_block(19000000)
417 /// .to_block_excl(19000100)
418 /// .include_all_blocks()
419 /// .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
420 /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
421 ///
422 /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
423 /// # Ok(())
424 /// # }
425 /// ```
426 pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
427 let mut recv = stream::stream_arrow(self, query, config)
428 .await
429 .context("start stream")?;
430
431 let mut data = ArrowResponseData::default();
432 let mut archive_height = None;
433 let mut next_block = 0;
434 let mut total_execution_time = 0;
435
436 while let Some(res) = recv.recv().await {
437 let res = res.context("get response")?;
438
439 for batch in res.data.blocks {
440 data.blocks.push(batch);
441 }
442 for batch in res.data.transactions {
443 data.transactions.push(batch);
444 }
445 for batch in res.data.logs {
446 data.logs.push(batch);
447 }
448 for batch in res.data.traces {
449 data.traces.push(batch);
450 }
451 for batch in res.data.decoded_logs {
452 data.decoded_logs.push(batch);
453 }
454
455 archive_height = res.archive_height;
456 next_block = res.next_block;
457 total_execution_time += res.total_execution_time
458 }
459
460 Ok(ArrowResponse {
461 archive_height,
462 next_block,
463 total_execution_time,
464 data,
465 rollback_guard: None,
466 })
467 }
468
469 /// Writes parquet file getting data through a stream using the provided path, query,
470 /// and stream configuration.
471 ///
472 /// Streams data directly to a Parquet file for efficient storage and later analysis.
473 /// Perfect for data exports or ETL pipelines.
474 ///
475 /// # ⚠️ Important Warning
476 ///
477 /// This method will continue executing until the query has run to completion from beginning
478 /// to the end of the block range defined in the query. For heavy queries with large block
479 /// ranges or high data volumes, consider:
480 ///
481 /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
482 /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
483 /// - Break large queries into smaller block ranges
484 ///
485 /// # Example
486 /// ```no_run
487 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
488 ///
489 /// # async fn example() -> anyhow::Result<()> {
490 /// let client = Client::builder()
491 /// .chain_id(1)
492 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
493 /// .build()?;
494 ///
495 /// // Export all DEX trades to Parquet for analysis
496 /// let query = Query::new()
497 /// .from_block(19000000)
498 /// .to_block_excl(19010000)
499 /// .where_logs(
500 /// LogFilter::all()
501 /// .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
502 /// )
503 /// .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
504 /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
505 ///
506 /// println!("Trade data exported to trades.parquet");
507 /// # Ok(())
508 /// # }
509 /// ```
510 pub async fn collect_parquet(
511 &self,
512 path: &str,
513 query: Query,
514 config: StreamConfig,
515 ) -> Result<()> {
516 parquet_out::collect_parquet(self, path, query, config).await
517 }
518
519 /// Internal implementation of getting chain_id from server
520 async fn get_chain_id_impl(&self) -> Result<u64> {
521 let mut url = self.inner.url.clone();
522 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
523 segments.push("chain_id");
524 std::mem::drop(segments);
525 let req = self
526 .inner
527 .http_client
528 .request(Method::GET, url)
529 .bearer_auth(&self.inner.api_token);
530
531 let res = req.send().await.context("execute http req")?;
532
533 let status = res.status();
534 if !status.is_success() {
535 return Err(anyhow!("http response status code {}", status));
536 }
537
538 let chain_id: ChainId = res.json().await.context("read response body json")?;
539
540 Ok(chain_id.chain_id)
541 }
542
543 /// Internal implementation of getting height from server
544 async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
545 let mut url = self.inner.url.clone();
546 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
547 segments.push("height");
548 std::mem::drop(segments);
549 let mut req = self
550 .inner
551 .http_client
552 .request(Method::GET, url)
553 .bearer_auth(&self.inner.api_token);
554
555 if let Some(http_timeout_override) = http_timeout_override {
556 req = req.timeout(http_timeout_override);
557 }
558
559 let res = req.send().await.context("execute http req")?;
560
561 let status = res.status();
562 if !status.is_success() {
563 return Err(anyhow!("http response status code {}", status));
564 }
565
566 let height: ArchiveHeight = res.json().await.context("read response body json")?;
567
568 Ok(height.height.unwrap_or(0))
569 }
570
571 /// Get the chain_id from the server with retries.
572 ///
573 /// # Example
574 /// ```no_run
575 /// use hypersync_client::Client;
576 ///
577 /// # async fn example() -> anyhow::Result<()> {
578 /// let client = Client::builder()
579 /// .chain_id(1)
580 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
581 /// .build()?;
582 ///
583 /// let chain_id = client.get_chain_id().await?;
584 /// println!("Connected to chain ID: {}", chain_id);
585 /// # Ok(())
586 /// # }
587 /// ```
588 pub async fn get_chain_id(&self) -> Result<u64> {
589 let mut base = self.inner.retry_base_ms;
590
591 let mut err = anyhow!("");
592
593 for _ in 0..self.inner.max_num_retries + 1 {
594 match self.get_chain_id_impl().await {
595 Ok(res) => return Ok(res),
596 Err(e) => {
597 log::error!(
598 "failed to get chain_id from server, retrying... The error was: {e:?}"
599 );
600 err = err.context(format!("{e:?}"));
601 }
602 }
603
604 let base_ms = Duration::from_millis(base);
605 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
606 rand::random(),
607 self.inner.retry_backoff_ms,
608 ));
609
610 tokio::time::sleep(base_ms + jitter).await;
611
612 base = std::cmp::min(
613 base + self.inner.retry_backoff_ms,
614 self.inner.retry_ceiling_ms,
615 );
616 }
617
618 Err(err)
619 }
620
621 /// Get the height of from server with retries.
622 ///
623 /// # Example
624 /// ```no_run
625 /// use hypersync_client::Client;
626 ///
627 /// # async fn example() -> anyhow::Result<()> {
628 /// let client = Client::builder()
629 /// .chain_id(1)
630 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
631 /// .build()?;
632 ///
633 /// let height = client.get_height().await?;
634 /// println!("Current block height: {}", height);
635 /// # Ok(())
636 /// # }
637 /// ```
638 pub async fn get_height(&self) -> Result<u64> {
639 let mut base = self.inner.retry_base_ms;
640
641 let mut err = anyhow!("");
642
643 for _ in 0..self.inner.max_num_retries + 1 {
644 match self.get_height_impl(None).await {
645 Ok(res) => return Ok(res),
646 Err(e) => {
647 log::error!(
648 "failed to get height from server, retrying... The error was: {e:?}"
649 );
650 err = err.context(format!("{e:?}"));
651 }
652 }
653
654 let base_ms = Duration::from_millis(base);
655 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
656 rand::random(),
657 self.inner.retry_backoff_ms,
658 ));
659
660 tokio::time::sleep(base_ms + jitter).await;
661
662 base = std::cmp::min(
663 base + self.inner.retry_backoff_ms,
664 self.inner.retry_ceiling_ms,
665 );
666 }
667
668 Err(err)
669 }
670
671 /// Get the height of the Client instance for health checks.
672 ///
673 /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
674 ///
675 /// # Example
676 /// ```no_run
677 /// use hypersync_client::Client;
678 /// use std::time::Duration;
679 ///
680 /// # async fn example() -> anyhow::Result<()> {
681 /// let client = Client::builder()
682 /// .chain_id(1)
683 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
684 /// .build()?;
685 ///
686 /// // Quick health check with 5 second timeout
687 /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
688 /// println!("Server is healthy at block: {}", height);
689 /// # Ok(())
690 /// # }
691 /// ```
692 pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
693 self.get_height_impl(http_req_timeout).await
694 }
695
696 /// Executes query with retries and returns the response.
697 ///
698 /// # Example
699 /// ```no_run
700 /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
701 ///
702 /// # async fn example() -> anyhow::Result<()> {
703 /// let client = Client::builder()
704 /// .chain_id(1)
705 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
706 /// .build()?;
707 ///
708 /// // Query all blocks from a specific range
709 /// let query = Query::new()
710 /// .from_block(19000000)
711 /// .to_block_excl(19000010)
712 /// .include_all_blocks()
713 /// .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
714 /// let response = client.get(&query).await?;
715 ///
716 /// println!("Retrieved {} blocks", response.data.blocks.len());
717 /// # Ok(())
718 /// # }
719 /// ```
720 pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
721 let arrow_response = self.get_arrow(query).await.context("get data")?;
722 Ok(QueryResponse::from(&arrow_response))
723 }
724
725 /// Add block, transaction and log fields selection to the query, executes it with retries
726 /// and returns the response.
727 ///
728 /// This method automatically joins blocks, transactions, and logs into unified events,
729 /// making it easier to work with related blockchain data.
730 ///
731 /// # Example
732 /// ```no_run
733 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
734 ///
735 /// # async fn example() -> anyhow::Result<()> {
736 /// let client = Client::builder()
737 /// .chain_id(1)
738 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
739 /// .build()?;
740 ///
741 /// // Query ERC20 transfers with transaction context
742 /// let query = Query::new()
743 /// .from_block(19000000)
744 /// .to_block_excl(19000010)
745 /// .where_logs(
746 /// LogFilter::all()
747 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
748 /// )
749 /// .select_log_fields([LogField::Address, LogField::Data])
750 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
751 /// let response = client.get_events(query).await?;
752 ///
753 /// println!("Retrieved {} joined events", response.data.len());
754 /// # Ok(())
755 /// # }
756 /// ```
757 pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
758 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
759 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
760 let arrow_response = self.get_arrow(&query).await.context("get data")?;
761 Ok(EventResponse::from_arrow_response(
762 &arrow_response,
763 &event_join_strategy,
764 ))
765 }
766
767 /// Executes query once and returns the result in (Arrow, size) format using JSON serialization.
768 async fn get_arrow_impl_json(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
769 let mut url = self.inner.url.clone();
770 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
771 segments.push("query");
772 segments.push("arrow-ipc");
773 std::mem::drop(segments);
774 let req = self
775 .inner
776 .http_client
777 .request(Method::POST, url)
778 .bearer_auth(&self.inner.api_token);
779
780 let res = req.json(&query).send().await.context("execute http req")?;
781
782 let status = res.status();
783 if !status.is_success() {
784 let text = res.text().await.context("read text to see error")?;
785
786 return Err(anyhow!(
787 "http response status code {}, err body: {}",
788 status,
789 text
790 ));
791 }
792
793 let bytes = res.bytes().await.context("read response body bytes")?;
794
795 let res = tokio::task::block_in_place(|| {
796 parse_query_response(&bytes).context("parse query response")
797 })?;
798
799 Ok((res, bytes.len().try_into().unwrap()))
800 }
801
802 fn should_cache_queries(&self) -> bool {
803 matches!(
804 self.inner.serialization_format,
805 SerializationFormat::CapnProto {
806 should_cache_queries: true
807 }
808 )
809 }
810
811 /// Executes query once and returns the result in (Arrow, size) format using Cap'n Proto serialization.
812 async fn get_arrow_impl_capnp(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
813 let mut url = self.inner.url.clone();
814 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
815 segments.push("query");
816 segments.push("arrow-ipc");
817 segments.push("capnp");
818 std::mem::drop(segments);
819
820 let should_cache = self.should_cache_queries();
821
822 if should_cache {
823 let query_with_id = {
824 let mut message = capnp::message::Builder::new_default();
825 let mut request_builder =
826 message.init_root::<hypersync_net_types_capnp::request::Builder>();
827
828 request_builder.build_query_id_from_query(query)?;
829 let mut query_with_id = Vec::new();
830 capnp::serialize_packed::write_message(&mut query_with_id, &message)?;
831 query_with_id
832 };
833
834 let mut req = self
835 .inner
836 .http_client
837 .request(Method::POST, url.clone())
838 .bearer_auth(&self.inner.api_token);
839 req = req.header("content-type", "application/x-capnp");
840
841 let res = req
842 .body(query_with_id)
843 .send()
844 .await
845 .context("execute http req")?;
846
847 let status = res.status();
848 if status.is_success() {
849 let bytes = res.bytes().await.context("read response body bytes")?;
850
851 let mut opts = capnp::message::ReaderOptions::new();
852 opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
853 let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
854 .context("create message reader")?;
855 let query_response = message_reader
856 .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
857 .context("get cached_query_response root")?;
858 match query_response.get_either().which()? {
859 hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
860 query_response,
861 ) => {
862 let res = tokio::task::block_in_place(|| {
863 let res = query_response?;
864 read_query_response(&res).context("parse query response cached")
865 })?;
866 return Ok((res, bytes.len().try_into().unwrap()));
867 }
868 hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
869 log::trace!("query was not cached, retrying with full query");
870 }
871 }
872 } else {
873 let text = res.text().await.context("read text to see error")?;
874 log::warn!(
875 "Failed cache query, will retry full query. {}, err body: {}",
876 status,
877 text
878 );
879 }
880 };
881
882 let full_query_bytes = {
883 let mut message = capnp::message::Builder::new_default();
884 let mut query_builder =
885 message.init_root::<hypersync_net_types_capnp::request::Builder>();
886
887 query_builder.build_full_query_from_query(query, should_cache)?;
888 let mut bytes = Vec::new();
889 capnp::serialize_packed::write_message(&mut bytes, &message)?;
890 bytes
891 };
892
893 let mut req = self
894 .inner
895 .http_client
896 .request(Method::POST, url)
897 .bearer_auth(&self.inner.api_token);
898 req = req.header("content-type", "application/x-capnp");
899
900 let res = req
901 .header("content-type", "application/x-capnp")
902 .body(full_query_bytes)
903 .send()
904 .await
905 .context("execute http req")?;
906
907 let status = res.status();
908 if !status.is_success() {
909 let text = res.text().await.context("read text to see error")?;
910
911 return Err(anyhow!(
912 "http response status code {}, err body: {}",
913 status,
914 text
915 ));
916 }
917
918 let bytes = res.bytes().await.context("read response body bytes")?;
919
920 let res = tokio::task::block_in_place(|| {
921 parse_query_response(&bytes).context("parse query response")
922 })?;
923
924 Ok((res, bytes.len().try_into().unwrap()))
925 }
926
927 /// Executes query once and returns the result in (Arrow, size) format.
928 async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
929 match self.inner.serialization_format {
930 SerializationFormat::Json => self.get_arrow_impl_json(query).await,
931 SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(query).await,
932 }
933 }
934
935 /// Executes query with retries and returns the response in Arrow format.
936 pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
937 self.get_arrow_with_size(query).await.map(|res| res.0)
938 }
939
940 /// Internal implementation for get_arrow.
941 async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
942 let mut base = self.inner.retry_base_ms;
943
944 let mut err = anyhow!("");
945
946 for _ in 0..self.inner.max_num_retries + 1 {
947 match self.get_arrow_impl(query).await {
948 Ok(res) => return Ok(res),
949 Err(e) => {
950 log::error!(
951 "failed to get arrow data from server, retrying... The error was: {e:?}"
952 );
953 err = err.context(format!("{e:?}"));
954 }
955 }
956
957 let base_ms = Duration::from_millis(base);
958 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
959 rand::random(),
960 self.inner.retry_backoff_ms,
961 ));
962
963 tokio::time::sleep(base_ms + jitter).await;
964
965 base = std::cmp::min(
966 base + self.inner.retry_backoff_ms,
967 self.inner.retry_ceiling_ms,
968 );
969 }
970
971 Err(err)
972 }
973
974 /// Spawns task to execute query and return data via a channel.
975 ///
976 /// # Example
977 /// ```no_run
978 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
979 ///
980 /// # async fn example() -> anyhow::Result<()> {
981 /// let client = Client::builder()
982 /// .chain_id(1)
983 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
984 /// .build()?;
985 ///
986 /// // Stream all ERC20 transfer events
987 /// let query = Query::new()
988 /// .from_block(19000000)
989 /// .where_logs(
990 /// LogFilter::all()
991 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
992 /// )
993 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
994 /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
995 ///
996 /// while let Some(response) = receiver.recv().await {
997 /// let response = response?;
998 /// println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
999 /// }
1000 /// # Ok(())
1001 /// # }
1002 /// ```
1003 pub async fn stream(
1004 &self,
1005 query: Query,
1006 config: StreamConfig,
1007 ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1008 check_simple_stream_params(&config)?;
1009
1010 let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1011 mpsc::channel(config.concurrency);
1012
1013 let mut inner_rx = self
1014 .stream_arrow(query, config)
1015 .await
1016 .context("start inner stream")?;
1017
1018 tokio::spawn(async move {
1019 while let Some(resp) = inner_rx.recv().await {
1020 let is_err = resp.is_err();
1021 if tx
1022 .send(resp.map(|r| QueryResponse::from(&r)))
1023 .await
1024 .is_err()
1025 || is_err
1026 {
1027 return;
1028 }
1029 }
1030 });
1031
1032 Ok(rx)
1033 }
1034
1035 /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1036 /// returning data via a channel.
1037 ///
1038 /// This method automatically joins blocks, transactions, and logs into unified events,
1039 /// then streams them via a channel for real-time processing.
1040 ///
1041 /// # Example
1042 /// ```no_run
1043 /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1044 ///
1045 /// # async fn example() -> anyhow::Result<()> {
1046 /// let client = Client::builder()
1047 /// .chain_id(1)
1048 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1049 /// .build()?;
1050 ///
1051 /// // Stream NFT transfer events with transaction context
1052 /// let query = Query::new()
1053 /// .from_block(19000000)
1054 /// .where_logs(
1055 /// LogFilter::all()
1056 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1057 /// )
1058 /// .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1059 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1060 /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1061 ///
1062 /// while let Some(response) = receiver.recv().await {
1063 /// let response = response?;
1064 /// println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1065 /// }
1066 /// # Ok(())
1067 /// # }
1068 /// ```
1069 pub async fn stream_events(
1070 &self,
1071 mut query: Query,
1072 config: StreamConfig,
1073 ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1074 check_simple_stream_params(&config)?;
1075
1076 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1077
1078 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1079
1080 let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1081 mpsc::channel(config.concurrency);
1082
1083 let mut inner_rx = self
1084 .stream_arrow(query, config)
1085 .await
1086 .context("start inner stream")?;
1087
1088 tokio::spawn(async move {
1089 while let Some(resp) = inner_rx.recv().await {
1090 let is_err = resp.is_err();
1091 if tx
1092 .send(
1093 resp.map(|r| EventResponse::from_arrow_response(&r, &event_join_strategy)),
1094 )
1095 .await
1096 .is_err()
1097 || is_err
1098 {
1099 return;
1100 }
1101 }
1102 });
1103
1104 Ok(rx)
1105 }
1106
1107 /// Spawns task to execute query and return data via a channel in Arrow format.
1108 ///
1109 /// Returns raw Apache Arrow data via a channel for high-performance processing.
1110 /// Ideal for applications that need to work directly with columnar data.
1111 ///
1112 /// # Example
1113 /// ```no_run
1114 /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1115 ///
1116 /// # async fn example() -> anyhow::Result<()> {
1117 /// let client = Client::builder()
1118 /// .chain_id(1)
1119 /// .api_token(std::env::var("ENVIO_API_TOKEN")?)
1120 /// .build()?;
1121 ///
1122 /// // Stream transaction data in Arrow format for analytics
1123 /// let query = Query::new()
1124 /// .from_block(19000000)
1125 /// .to_block_excl(19000100)
1126 /// .where_transactions(
1127 /// TransactionFilter::all()
1128 /// .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1129 /// )
1130 /// .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1131 /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1132 ///
1133 /// while let Some(response) = receiver.recv().await {
1134 /// let response = response?;
1135 /// println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1136 /// }
1137 /// # Ok(())
1138 /// # }
1139 /// ```
1140 pub async fn stream_arrow(
1141 &self,
1142 query: Query,
1143 config: StreamConfig,
1144 ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1145 stream::stream_arrow(self, query, config).await
1146 }
1147
1148 /// Getter for url field.
1149 ///
1150 /// # Example
1151 /// ```
1152 /// use hypersync_client::Client;
1153 ///
1154 /// let client = Client::builder()
1155 /// .chain_id(1)
1156 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1157 /// .build()
1158 /// .unwrap();
1159 ///
1160 /// println!("Client URL: {}", client.url());
1161 /// ```
1162 pub fn url(&self) -> &Url {
1163 &self.inner.url
1164 }
1165}
1166
1167/// Builder for creating a hypersync client with configuration options.
1168///
1169/// This builder provides a fluent API for configuring client settings like URL,
1170/// authentication, timeouts, and retry behavior.
1171///
1172/// # Example
1173/// ```
1174/// use hypersync_client::{Client, SerializationFormat};
1175///
1176/// let client = Client::builder()
1177/// .chain_id(1)
1178/// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1179/// .http_req_timeout_millis(30000)
1180/// .max_num_retries(3)
1181/// .build()
1182/// .unwrap();
1183/// ```
1184#[derive(Debug, Clone, Default)]
1185pub struct ClientBuilder(ClientConfig);
1186
1187impl ClientBuilder {
1188 /// Creates a new ClientBuilder with default configuration.
1189 pub fn new() -> Self {
1190 Self::default()
1191 }
1192
1193 /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1194 ///
1195 /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1196 ///
1197 /// # Arguments
1198 /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1199 ///
1200 /// # Example
1201 /// ```
1202 /// use hypersync_client::Client;
1203 ///
1204 /// let client = Client::builder()
1205 /// .chain_id(1) // Ethereum mainnet
1206 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1207 /// .build()
1208 /// .unwrap();
1209 /// ```
1210 pub fn chain_id(mut self, chain_id: u64) -> Self {
1211 self.0.url = format!("https://{chain_id}.hypersync.xyz");
1212 self
1213 }
1214
1215 /// Sets a custom URL for the hypersync server.
1216 ///
1217 /// Use this method when you need to connect to a custom hypersync endpoint
1218 /// instead of the default public endpoints.
1219 ///
1220 /// # Arguments
1221 /// * `url` - The hypersync server URL
1222 ///
1223 /// # Example
1224 /// ```
1225 /// use hypersync_client::Client;
1226 ///
1227 /// let client = Client::builder()
1228 /// .url("https://my-custom-hypersync.example.com")
1229 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1230 /// .build()
1231 /// .unwrap();
1232 /// ```
1233 pub fn url<S: ToString>(mut self, url: S) -> Self {
1234 self.0.url = url.to_string();
1235 self
1236 }
1237
1238 /// Sets the api token for authentication.
1239 ///
1240 /// Required for accessing authenticated hypersync endpoints.
1241 ///
1242 /// # Arguments
1243 /// * `api_token` - The authentication token
1244 ///
1245 /// # Example
1246 /// ```
1247 /// use hypersync_client::Client;
1248 ///
1249 /// let client = Client::builder()
1250 /// .chain_id(1)
1251 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1252 /// .build()
1253 /// .unwrap();
1254 /// ```
1255 pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1256 self.0.api_token = api_token.to_string();
1257 self
1258 }
1259
1260 /// Sets the HTTP request timeout in milliseconds.
1261 ///
1262 /// # Arguments
1263 /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1264 ///
1265 /// # Example
1266 /// ```
1267 /// use hypersync_client::Client;
1268 ///
1269 /// let client = Client::builder()
1270 /// .chain_id(1)
1271 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1272 /// .http_req_timeout_millis(60000) // 60 second timeout
1273 /// .build()
1274 /// .unwrap();
1275 /// ```
1276 pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1277 self.0.http_req_timeout_millis = http_req_timeout_millis;
1278 self
1279 }
1280
1281 /// Sets the maximum number of retries for failed requests.
1282 ///
1283 /// # Arguments
1284 /// * `max_num_retries` - Maximum number of retries (default: 10)
1285 ///
1286 /// # Example
1287 /// ```
1288 /// use hypersync_client::Client;
1289 ///
1290 /// let client = Client::builder()
1291 /// .chain_id(1)
1292 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1293 /// .max_num_retries(5)
1294 /// .build()
1295 /// .unwrap();
1296 /// ```
1297 pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1298 self.0.max_num_retries = max_num_retries;
1299 self
1300 }
1301
1302 /// Sets the backoff increment for retry delays.
1303 ///
1304 /// This value is added to the base delay on each retry attempt.
1305 ///
1306 /// # Arguments
1307 /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1308 ///
1309 /// # Example
1310 /// ```
1311 /// use hypersync_client::Client;
1312 ///
1313 /// let client = Client::builder()
1314 /// .chain_id(1)
1315 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1316 /// .retry_backoff_ms(1000) // 1 second backoff increment
1317 /// .build()
1318 /// .unwrap();
1319 /// ```
1320 pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1321 self.0.retry_backoff_ms = retry_backoff_ms;
1322 self
1323 }
1324
1325 /// Sets the initial delay for retry attempts.
1326 ///
1327 /// # Arguments
1328 /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1329 ///
1330 /// # Example
1331 /// ```
1332 /// use hypersync_client::Client;
1333 ///
1334 /// let client = Client::builder()
1335 /// .chain_id(1)
1336 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1337 /// .retry_base_ms(1000) // Start with 1 second delay
1338 /// .build()
1339 /// .unwrap();
1340 /// ```
1341 pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1342 self.0.retry_base_ms = retry_base_ms;
1343 self
1344 }
1345
1346 /// Sets the maximum delay for retry attempts.
1347 ///
1348 /// The retry delay will not exceed this value, even with backoff increments.
1349 ///
1350 /// # Arguments
1351 /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1352 ///
1353 /// # Example
1354 /// ```
1355 /// use hypersync_client::Client;
1356 ///
1357 /// let client = Client::builder()
1358 /// .chain_id(1)
1359 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1360 /// .retry_ceiling_ms(30000) // Cap at 30 seconds
1361 /// .build()
1362 /// .unwrap();
1363 /// ```
1364 pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1365 self.0.retry_ceiling_ms = retry_ceiling_ms;
1366 self
1367 }
1368
1369 /// Sets the serialization format for client-server communication.
1370 ///
1371 /// # Arguments
1372 /// * `serialization_format` - The format to use (JSON or CapnProto)
1373 ///
1374 /// # Example
1375 /// ```
1376 /// use hypersync_client::{Client, SerializationFormat};
1377 ///
1378 /// let client = Client::builder()
1379 /// .chain_id(1)
1380 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1381 /// .serialization_format(SerializationFormat::Json)
1382 /// .build()
1383 /// .unwrap();
1384 /// ```
1385 pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1386 self.0.serialization_format = serialization_format;
1387 self
1388 }
1389
1390 /// Builds the client with the configured settings.
1391 ///
1392 /// # Returns
1393 /// * `Result<Client>` - The configured client or an error if configuration is invalid
1394 ///
1395 /// # Errors
1396 /// Returns an error if:
1397 /// * The URL is malformed
1398 /// * Required configuration is missing
1399 ///
1400 /// # Example
1401 /// ```
1402 /// use hypersync_client::Client;
1403 ///
1404 /// let client = Client::builder()
1405 /// .chain_id(1)
1406 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1407 /// .build()
1408 /// .unwrap();
1409 /// ```
1410 pub fn build(self) -> Result<Client> {
1411 if self.0.url.is_empty() {
1412 anyhow::bail!(
1413 "endpoint needs to be set, try using builder.chain_id(1) or\
1414 builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1415 )
1416 }
1417 Client::new(self.0)
1418 }
1419}
1420
1421/// 200ms
1422const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1423const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1424/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1425/// so we timeout after 15s (3x the ping interval).
1426const READ_TIMEOUT: Duration = Duration::from_secs(15);
1427
1428/// Events emitted by the height stream.
1429#[derive(Debug, Clone, PartialEq, Eq)]
1430pub enum HeightStreamEvent {
1431 /// Successfully connected or reconnected to the SSE stream.
1432 Connected,
1433 /// Received a height update from the server.
1434 Height(u64),
1435 /// Connection lost, will attempt to reconnect after the specified delay.
1436 Reconnecting {
1437 /// Duration to wait before attempting reconnection.
1438 delay: Duration,
1439 },
1440}
1441
1442enum InternalStreamEvent {
1443 Publish(HeightStreamEvent),
1444 Ping,
1445 Unknown(String),
1446}
1447
1448impl Client {
1449 fn get_es_stream(&self) -> Result<EventSource> {
1450 // Build the GET /height/sse request
1451 let mut url = self.inner.url.clone();
1452 url.path_segments_mut()
1453 .ok()
1454 .context("invalid base URL")?
1455 .extend(&["height", "sse"]);
1456
1457 let req = self
1458 .inner
1459 .http_client
1460 .get(url)
1461 .header(header::ACCEPT, "text/event-stream")
1462 .bearer_auth(&self.inner.api_token);
1463
1464 // Configure exponential backoff for library-level retries
1465 let retry_policy = ExponentialBackoff::new(
1466 INITIAL_RECONNECT_DELAY,
1467 2.0,
1468 Some(MAX_RECONNECT_DELAY),
1469 None, // unlimited retries
1470 );
1471
1472 // Turn the request into an EventSource stream with retries
1473 let mut es = reqwest_eventsource::EventSource::new(req)
1474 .context("unexpected error creating EventSource")?;
1475 es.set_retry_policy(Box::new(retry_policy));
1476 Ok(es)
1477 }
1478
1479 async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1480 let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1481 .await
1482 .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1483 else {
1484 return Ok(None);
1485 };
1486
1487 let e = match res.context("failed response")? {
1488 Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1489 Event::Message(event) => match event.event.as_str() {
1490 "height" => {
1491 let height = event
1492 .data
1493 .trim()
1494 .parse::<u64>()
1495 .context("parsing height from event data")?;
1496 InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1497 }
1498 "ping" => InternalStreamEvent::Ping,
1499 _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1500 },
1501 };
1502
1503 Ok(Some(e))
1504 }
1505
1506 async fn stream_height_events(
1507 es: &mut EventSource,
1508 tx: &mpsc::Sender<HeightStreamEvent>,
1509 ) -> Result<bool> {
1510 let mut received_an_event = false;
1511 while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1512 match event {
1513 InternalStreamEvent::Publish(event) => {
1514 received_an_event = true;
1515 if tx.send(event).await.is_err() {
1516 return Ok(received_an_event); // Receiver dropped, exit task
1517 }
1518 }
1519 InternalStreamEvent::Ping => (), // ignore pings
1520 InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1521 }
1522 }
1523 Ok(received_an_event)
1524 }
1525
1526 fn get_delay(consecutive_failures: u32) -> Duration {
1527 if consecutive_failures > 0 {
1528 /// helper function to calculate 2^x
1529 /// optimization using bit shifting
1530 const fn two_to_pow(x: u32) -> u32 {
1531 1 << x
1532 }
1533 // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1534 INITIAL_RECONNECT_DELAY
1535 .saturating_mul(two_to_pow(consecutive_failures - 1))
1536 .min(MAX_RECONNECT_DELAY)
1537 } else {
1538 // On zero consecutive failures, 0 delay
1539 Duration::from_millis(0)
1540 }
1541 }
1542
1543 async fn stream_height_events_with_retry(
1544 &self,
1545 tx: &mpsc::Sender<HeightStreamEvent>,
1546 ) -> Result<()> {
1547 let mut consecutive_failures = 0u32;
1548
1549 loop {
1550 // should always be able to creat a new es stream
1551 // something is wrong with the req builder otherwise
1552 let mut es = self.get_es_stream().context("get es stream")?;
1553
1554 match Self::stream_height_events(&mut es, tx).await {
1555 Ok(received_an_event) => {
1556 if received_an_event {
1557 consecutive_failures = 0; // Reset after successful connection that then failed
1558 }
1559 log::trace!("Stream height exited");
1560 }
1561 Err(e) => {
1562 log::trace!("Stream height failed: {e:?}");
1563 }
1564 }
1565
1566 es.close();
1567
1568 // If the receiver is closed, exit the task
1569 if tx.is_closed() {
1570 break;
1571 }
1572
1573 let delay = Self::get_delay(consecutive_failures);
1574 log::trace!("Reconnecting in {:?}...", delay);
1575
1576 if tx
1577 .send(HeightStreamEvent::Reconnecting { delay })
1578 .await
1579 .is_err()
1580 {
1581 return Ok(()); // Receiver dropped, exit task
1582 }
1583 tokio::time::sleep(delay).await;
1584
1585 // increment consecutive failures so that on the next try
1586 // it will start using back offs
1587 consecutive_failures += 1;
1588 }
1589
1590 Ok(())
1591 }
1592
1593 /// Streams archive height updates from the server via Server-Sent Events.
1594 ///
1595 /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1596 /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1597 ///
1598 /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1599 /// and height updates. This allows applications to display connection status to users.
1600 ///
1601 /// # Returns
1602 /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1603 /// lifecycle and sends events through this channel.
1604 ///
1605 /// # Example
1606 /// ```
1607 /// # use hypersync_client::{Client, HeightStreamEvent};
1608 /// # async fn example() -> anyhow::Result<()> {
1609 /// let client = Client::builder()
1610 /// .url("https://eth.hypersync.xyz")
1611 /// .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1612 /// .build()?;
1613 ///
1614 /// let mut rx = client.stream_height();
1615 ///
1616 /// while let Some(event) = rx.recv().await {
1617 /// match event {
1618 /// HeightStreamEvent::Connected => println!("Connected to stream"),
1619 /// HeightStreamEvent::Height(h) => println!("Height: {}", h),
1620 /// HeightStreamEvent::Reconnecting { delay } => {
1621 /// println!("Reconnecting in {:?}...", delay)
1622 /// }
1623 /// }
1624 /// }
1625 /// # Ok(())
1626 /// # }
1627 /// ```
1628 pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1629 let (tx, rx) = mpsc::channel(16);
1630 let client = self.clone();
1631
1632 tokio::spawn(async move {
1633 if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1634 log::error!("Stream height failed unexpectedly: {e:?}");
1635 }
1636 });
1637
1638 rx
1639 }
1640}
1641
1642fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1643 if config.event_signature.is_some() {
1644 return Err(anyhow!(
1645 "config.event_signature can't be passed to simple type function. User is expected to \
1646 decode the logs using Decoder."
1647 ));
1648 }
1649 if config.column_mapping.is_some() {
1650 return Err(anyhow!(
1651 "config.column_mapping can't be passed to single type function. User is expected to \
1652 map values manually."
1653 ));
1654 }
1655
1656 Ok(())
1657}
1658
1659#[cfg(test)]
1660mod tests {
1661 use super::*;
1662 #[test]
1663 fn test_get_delay() {
1664 assert_eq!(
1665 Client::get_delay(0),
1666 Duration::from_millis(0),
1667 "starts with 0 delay"
1668 );
1669 // powers of 2 backoff
1670 assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1671 assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1672 assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1673 // maxes out at 30s
1674 assert_eq!(
1675 Client::get_delay(9),
1676 Duration::from_secs(30),
1677 "max delay is 30s"
1678 );
1679 assert_eq!(
1680 Client::get_delay(10),
1681 Duration::from_secs(30),
1682 "max delay is 30s"
1683 );
1684 }
1685
1686 #[tokio::test]
1687 #[ignore = "integration test with live hs server for height stream"]
1688 async fn test_stream_height_events() -> anyhow::Result<()> {
1689 let (tx, mut rx) = mpsc::channel(16);
1690 let handle = tokio::spawn(async move {
1691 let client = Client::builder()
1692 .url("https://monad-testnet.hypersync.xyz")
1693 .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1694 .build()?;
1695 let mut es = client.get_es_stream().context("get es stream")?;
1696 Client::stream_height_events(&mut es, &tx).await
1697 });
1698
1699 let val = rx.recv().await;
1700 assert!(val.is_some());
1701 assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
1702 let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
1703 panic!("should have received height")
1704 };
1705 let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
1706 panic!("should have received height")
1707 };
1708 assert!(height2 > height);
1709 drop(rx);
1710
1711 let res = handle.await.expect("should have joined");
1712 let received_an_event =
1713 res.expect("should have ended the stream gracefully after dropping rx");
1714 assert!(received_an_event, "should have received an event");
1715
1716 Ok(())
1717 }
1718}