Skip to main content

hypersync_client/
lib.rs

1#![deny(missing_docs)]
2// README.md is a symlink (ln -s ../README.md README.md) to the root workspace
3// README. This is needed so that both `cargo doc` and `cargo package` work:
4// - `cargo doc`: include_str! resolves the symlink to the root README.
5// - `cargo package`: cargo copies the symlink target into the tarball, so
6//   include_str! finds README.md at the package root during verification.
7#![doc = include_str!("../README.md")]
8use std::time::Instant;
9use std::{sync::Arc, time::Duration};
10
11use anyhow::{anyhow, Context, Result};
12use futures::StreamExt;
13use hypersync_net_types::{hypersync_net_types_capnp, ArchiveHeight, ChainId, Query};
14use reqwest::{Method, StatusCode};
15use reqwest_eventsource::retry::ExponentialBackoff;
16use reqwest_eventsource::{Event, EventSource};
17
18pub mod arrow_reader;
19mod column_mapping;
20mod config;
21mod decode;
22mod decode_call;
23mod from_arrow;
24mod parquet_out;
25mod parse_response;
26pub mod preset_query;
27mod rate_limit;
28mod rayon_async;
29pub mod simple_types;
30mod stream;
31mod types;
32mod util;
33
34pub use hypersync_format as format;
35pub use hypersync_net_types as net_types;
36pub use hypersync_schema as schema;
37
38use parse_response::parse_query_response;
39use tokio::sync::mpsc;
40use types::ResponseData;
41use url::Url;
42
43pub use column_mapping::{ColumnMapping, DataType};
44pub use config::HexOutput;
45pub use config::{ClientConfig, SerializationFormat, StreamConfig};
46pub use decode::Decoder;
47pub use decode_call::CallDecoder;
48pub use rate_limit::RateLimitInfo;
49pub use types::{
50    ArrowResponse, ArrowResponseData, EventResponse, QueryResponse, QueryResponseWithRateLimit,
51};
52
53use crate::parse_response::read_query_response;
54use crate::simple_types::InternalEventJoinStrategy;
55
56#[derive(Debug)]
57struct HttpClientWrapper {
58    /// Mutable state that needs to be refreshed periodically
59    inner: std::sync::Mutex<HttpClientWrapperInner>,
60    /// HyperSync server api token.
61    api_token: String,
62    /// Standard timeout for http requests.
63    timeout: Duration,
64    /// Pools are never idle since polling often happens on the client
65    /// so connections never timeout and dns lookups never happen again
66    /// this is problematic if the underlying ip address changes during
67    /// failovers on hypersync. Since reqwest doesn't implement this we
68    /// simply recreate the client every time
69    max_connection_age: Duration,
70    /// For refreshing the client
71    user_agent: String,
72}
73
74#[derive(Debug)]
75struct HttpClientWrapperInner {
76    /// Initialized reqwest instance for client url.
77    client: reqwest::Client,
78    /// Timestamp when the client was created
79    created_at: Instant,
80}
81
82impl HttpClientWrapper {
83    fn new(user_agent: String, api_token: String, timeout: Duration) -> Self {
84        let client = reqwest::Client::builder()
85            .no_gzip()
86            .user_agent(&user_agent)
87            .build()
88            .unwrap();
89
90        Self {
91            inner: std::sync::Mutex::new(HttpClientWrapperInner {
92                client,
93                created_at: Instant::now(),
94            }),
95            api_token,
96            timeout,
97            max_connection_age: Duration::from_secs(60),
98            user_agent,
99        }
100    }
101
102    fn get_client(&self) -> reqwest::Client {
103        let mut inner = self.inner.lock().expect("HttpClientWrapper mutex poisoned");
104
105        // Check if client needs refresh due to age
106        if inner.created_at.elapsed() > self.max_connection_age {
107            // Recreate client to force new DNS lookup for failover scenarios
108            inner.client = reqwest::Client::builder()
109                .no_gzip()
110                .user_agent(&self.user_agent)
111                .build()
112                .unwrap();
113            inner.created_at = Instant::now();
114        }
115
116        // Clone is cheap for reqwest::Client (it's Arc-wrapped internally)
117        inner.client.clone()
118    }
119
120    fn request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
121        let client = self.get_client();
122        client
123            .request(method, url)
124            .timeout(self.timeout)
125            .bearer_auth(&self.api_token)
126    }
127
128    fn request_no_timeout(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
129        let client = self.get_client();
130        client.request(method, url).bearer_auth(&self.api_token)
131    }
132}
133
134/// Internal client state to handle http requests and retries.
135#[derive(Debug)]
136struct ClientInner {
137    http_client: HttpClientWrapper,
138    /// HyperSync server URL.
139    url: Url,
140    /// Number of retries to attempt before returning error.
141    max_num_retries: usize,
142    /// Milliseconds that would be used for retry backoff increasing.
143    retry_backoff_ms: u64,
144    /// Initial wait time for request backoff.
145    retry_base_ms: u64,
146    /// Ceiling time for request backoff.
147    retry_ceiling_ms: u64,
148    /// Query serialization format to use for HTTP requests.
149    serialization_format: SerializationFormat,
150    /// Most recently observed rate limit info from the server, paired with the
151    /// instant it was captured so elapsed time can be subtracted from `reset_secs`.
152    rate_limit_state: std::sync::Mutex<Option<(RateLimitInfo, Instant)>>,
153    /// Whether to proactively sleep when the rate limit is exhausted.
154    proactive_rate_limit_sleep: bool,
155}
156
157/// Client to handle http requests and retries.
158#[derive(Clone, Debug)]
159pub struct Client {
160    inner: Arc<ClientInner>,
161}
162
163impl Client {
164    /// Creates a new client with the given configuration.
165    ///
166    /// Configuration must include the `url` and `api_token` fields.
167    /// # Example
168    /// ```
169    /// use hypersync_client::{Client, ClientConfig};
170    ///
171    /// let config = ClientConfig {
172    ///     url: "https://eth.hypersync.xyz".to_string(),
173    ///     api_token: std::env::var("ENVIO_API_TOKEN")?,
174    ///     ..Default::default()
175    /// };
176    /// let client = Client::new(config)?;
177    /// # Ok::<(), anyhow::Error>(())
178    /// ```
179    ///
180    /// # Errors
181    /// This method fails if the config is invalid.
182    pub fn new(cfg: ClientConfig) -> Result<Self> {
183        // hscr stands for hypersync client rust
184        cfg.validate().context("invalid ClientConfig")?;
185        let user_agent = format!("hscr/{}", env!("CARGO_PKG_VERSION"));
186        Self::new_internal(cfg, user_agent)
187    }
188
189    /// Creates a new client builder.
190    ///
191    /// # Example
192    /// ```
193    /// use hypersync_client::Client;
194    ///
195    /// let client = Client::builder()
196    ///     .chain_id(1)
197    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
198    ///     .build()
199    ///     .unwrap();
200    /// ```
201    pub fn builder() -> ClientBuilder {
202        ClientBuilder::new()
203    }
204
205    #[doc(hidden)]
206    pub fn new_with_agent(cfg: ClientConfig, user_agent: impl Into<String>) -> Result<Self> {
207        // Creates a new client with the given configuration and custom user agent.
208        // This is intended for use by language bindings (Python, Node.js) and HyperIndex.
209        Self::new_internal(cfg, user_agent.into())
210    }
211
212    /// Internal constructor that takes both config and user agent.
213    fn new_internal(cfg: ClientConfig, user_agent: String) -> Result<Self> {
214        let http_client = HttpClientWrapper::new(
215            user_agent,
216            cfg.api_token,
217            Duration::from_millis(cfg.http_req_timeout_millis),
218        );
219
220        let url = Url::parse(&cfg.url).context("url is malformed")?;
221
222        Ok(Self {
223            inner: Arc::new(ClientInner {
224                http_client,
225                url,
226                max_num_retries: cfg.max_num_retries,
227                retry_backoff_ms: cfg.retry_backoff_ms,
228                retry_base_ms: cfg.retry_base_ms,
229                retry_ceiling_ms: cfg.retry_ceiling_ms,
230                serialization_format: cfg.serialization_format,
231                rate_limit_state: std::sync::Mutex::new(None),
232                proactive_rate_limit_sleep: cfg.proactive_rate_limit_sleep,
233            }),
234        })
235    }
236
237    /// Retrieves blocks, transactions, traces, and logs through a stream using the provided
238    /// query and stream configuration.
239    ///
240    /// ### Implementation
241    /// Runs multiple queries simultaneously based on config.concurrency.
242    ///
243    /// Each query runs until it reaches query.to, server height, any max_num_* query param,
244    /// or execution timed out by server.
245    ///
246    /// # ⚠️ Important Warning
247    ///
248    /// This method will continue executing until the query has run to completion from beginning
249    /// to the end of the block range defined in the query. For heavy queries with large block
250    /// ranges or high data volumes, consider:
251    ///
252    /// - Use [`stream()`](Self::stream) to interact with each streamed chunk individually
253    /// - Use [`get()`](Self::get) which returns a `next_block` that can be paginated for the next query
254    /// - Break large queries into smaller block ranges
255    ///
256    /// # Example
257    /// ```no_run
258    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
259    ///
260    /// # async fn example() -> anyhow::Result<()> {
261    /// let client = Client::builder()
262    ///     .chain_id(1)
263    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
264    ///     .build()?;
265    ///
266    /// // Query ERC20 transfer events
267    /// let query = Query::new()
268    ///     .from_block(19000000)
269    ///     .to_block_excl(19000010)
270    ///     .where_logs(
271    ///         LogFilter::all()
272    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
273    ///     )
274    ///     .select_log_fields([LogField::Address, LogField::Data]);
275    /// let response = client.collect(query, StreamConfig::default()).await?;
276    ///
277    /// println!("Collected {} events", response.data.logs.len());
278    /// # Ok(())
279    /// # }
280    /// ```
281    pub async fn collect(&self, query: Query, config: StreamConfig) -> Result<QueryResponse> {
282        check_simple_stream_params(&config)?;
283
284        let mut recv = stream::stream_arrow(self, query, config)
285            .await
286            .context("start stream")?;
287
288        let mut data = ResponseData::default();
289        let mut archive_height = None;
290        let mut next_block = 0;
291        let mut total_execution_time = 0;
292
293        while let Some(res) = recv.recv().await {
294            let res = res.context("get response")?;
295            let res: QueryResponse =
296                QueryResponse::try_from(&res).context("convert arrow response")?;
297
298            for batch in res.data.blocks {
299                data.blocks.push(batch);
300            }
301            for batch in res.data.transactions {
302                data.transactions.push(batch);
303            }
304            for batch in res.data.logs {
305                data.logs.push(batch);
306            }
307            for batch in res.data.traces {
308                data.traces.push(batch);
309            }
310
311            archive_height = res.archive_height;
312            next_block = res.next_block;
313            total_execution_time += res.total_execution_time
314        }
315
316        Ok(QueryResponse {
317            archive_height,
318            next_block,
319            total_execution_time,
320            data,
321            rollback_guard: None,
322        })
323    }
324
325    /// Retrieves events through a stream using the provided query and stream configuration.
326    ///
327    /// # ⚠️ Important Warning
328    ///
329    /// This method will continue executing until the query has run to completion from beginning
330    /// to the end of the block range defined in the query. For heavy queries with large block
331    /// ranges or high data volumes, consider:
332    ///
333    /// - Use [`stream_events()`](Self::stream_events) to interact with each streamed chunk individually
334    /// - Use [`get_events()`](Self::get_events) which returns a `next_block` that can be paginated for the next query
335    /// - Break large queries into smaller block ranges
336    ///
337    /// # Example
338    /// ```no_run
339    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
340    ///
341    /// # async fn example() -> anyhow::Result<()> {
342    /// let client = Client::builder()
343    ///     .chain_id(1)
344    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
345    ///     .build()?;
346    ///
347    /// // Query transactions to a specific address
348    /// let query = Query::new()
349    ///     .from_block(19000000)
350    ///     .to_block_excl(19000100)
351    ///     .where_transactions(
352    ///         TransactionFilter::all()
353    ///             .and_to(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
354    ///     )
355    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
356    /// let response = client.collect_events(query, StreamConfig::default()).await?;
357    ///
358    /// println!("Collected {} events", response.data.len());
359    /// # Ok(())
360    /// # }
361    /// ```
362    pub async fn collect_events(
363        &self,
364        mut query: Query,
365        config: StreamConfig,
366    ) -> Result<EventResponse> {
367        check_simple_stream_params(&config)?;
368
369        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
370        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
371
372        let mut recv = stream::stream_arrow(self, query, config)
373            .await
374            .context("start stream")?;
375
376        let mut data = Vec::new();
377        let mut archive_height = None;
378        let mut next_block = 0;
379        let mut total_execution_time = 0;
380
381        while let Some(res) = recv.recv().await {
382            let res = res.context("get response")?;
383            let res: QueryResponse =
384                QueryResponse::try_from(&res).context("convert arrow response")?;
385            let events = event_join_strategy.join_from_response_data(res.data);
386
387            data.extend(events);
388
389            archive_height = res.archive_height;
390            next_block = res.next_block;
391            total_execution_time += res.total_execution_time
392        }
393
394        Ok(EventResponse {
395            archive_height,
396            next_block,
397            total_execution_time,
398            data,
399            rollback_guard: None,
400        })
401    }
402
403    /// Retrieves blocks, transactions, traces, and logs in Arrow format through a stream using
404    /// the provided query and stream configuration.
405    ///
406    /// Returns data in Apache Arrow format for high-performance columnar processing.
407    /// Useful for analytics workloads or when working with Arrow-compatible tools.
408    ///
409    /// # ⚠️ Important Warning
410    ///
411    /// This method will continue executing until the query has run to completion from beginning
412    /// to the end of the block range defined in the query. For heavy queries with large block
413    /// ranges or high data volumes, consider:
414    ///
415    /// - Use [`stream_arrow()`](Self::stream_arrow) to interact with each streamed chunk individually
416    /// - Use [`get_arrow()`](Self::get_arrow) which returns a `next_block` that can be paginated for the next query
417    /// - Break large queries into smaller block ranges
418    ///
419    /// # Example
420    /// ```no_run
421    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}, StreamConfig};
422    ///
423    /// # async fn example() -> anyhow::Result<()> {
424    /// let client = Client::builder()
425    ///     .chain_id(1)
426    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
427    ///     .build()?;
428    ///
429    /// // Get block data in Arrow format for analytics
430    /// let query = Query::new()
431    ///     .from_block(19000000)
432    ///     .to_block_excl(19000100)
433    ///     .include_all_blocks()
434    ///     .select_block_fields([BlockField::Number, BlockField::Timestamp, BlockField::GasUsed]);
435    /// let response = client.collect_arrow(query, StreamConfig::default()).await?;
436    ///
437    /// println!("Retrieved {} Arrow batches for blocks", response.data.blocks.len());
438    /// # Ok(())
439    /// # }
440    /// ```
441    pub async fn collect_arrow(&self, query: Query, config: StreamConfig) -> Result<ArrowResponse> {
442        let mut recv = stream::stream_arrow(self, query, config)
443            .await
444            .context("start stream")?;
445
446        let mut data = ArrowResponseData::default();
447        let mut archive_height = None;
448        let mut next_block = 0;
449        let mut total_execution_time = 0;
450
451        while let Some(res) = recv.recv().await {
452            let res = res.context("get response")?;
453
454            for batch in res.data.blocks {
455                data.blocks.push(batch);
456            }
457            for batch in res.data.transactions {
458                data.transactions.push(batch);
459            }
460            for batch in res.data.logs {
461                data.logs.push(batch);
462            }
463            for batch in res.data.traces {
464                data.traces.push(batch);
465            }
466            for batch in res.data.decoded_logs {
467                data.decoded_logs.push(batch);
468            }
469
470            archive_height = res.archive_height;
471            next_block = res.next_block;
472            total_execution_time += res.total_execution_time
473        }
474
475        Ok(ArrowResponse {
476            archive_height,
477            next_block,
478            total_execution_time,
479            data,
480            rollback_guard: None,
481        })
482    }
483
484    /// Writes parquet file getting data through a stream using the provided path, query,
485    /// and stream configuration.
486    ///
487    /// Streams data directly to a Parquet file for efficient storage and later analysis.
488    /// Perfect for data exports or ETL pipelines.
489    ///
490    /// # ⚠️ Important Warning
491    ///
492    /// This method will continue executing until the query has run to completion from beginning
493    /// to the end of the block range defined in the query. For heavy queries with large block
494    /// ranges or high data volumes, consider:
495    ///
496    /// - Use [`stream_arrow()`](Self::stream_arrow) and write to Parquet incrementally
497    /// - Use [`get_arrow()`](Self::get_arrow) with pagination and append to Parquet files
498    /// - Break large queries into smaller block ranges
499    ///
500    /// # Example
501    /// ```no_run
502    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
503    ///
504    /// # async fn example() -> anyhow::Result<()> {
505    /// let client = Client::builder()
506    ///     .chain_id(1)
507    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
508    ///     .build()?;
509    ///
510    /// // Export all DEX trades to Parquet for analysis
511    /// let query = Query::new()
512    ///     .from_block(19000000)
513    ///     .to_block_excl(19010000)
514    ///     .where_logs(
515    ///         LogFilter::all()
516    ///             .and_topic0(["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"])?
517    ///     )
518    ///     .select_log_fields([LogField::Address, LogField::Data, LogField::BlockNumber]);
519    /// client.collect_parquet("./trades.parquet", query, StreamConfig::default()).await?;
520    ///
521    /// println!("Trade data exported to trades.parquet");
522    /// # Ok(())
523    /// # }
524    /// ```
525    pub async fn collect_parquet(
526        &self,
527        path: &str,
528        query: Query,
529        config: StreamConfig,
530    ) -> Result<()> {
531        parquet_out::collect_parquet(self, path, query, config).await
532    }
533
534    /// Internal implementation of getting chain_id from server
535    async fn get_chain_id_impl(&self) -> Result<u64> {
536        let mut url = self.inner.url.clone();
537        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
538        segments.push("chain_id");
539        std::mem::drop(segments);
540        let req = self.inner.http_client.request(Method::GET, url);
541
542        let res = req.send().await.context("execute http req")?;
543
544        let status = res.status();
545        if !status.is_success() {
546            return Err(anyhow!("http response status code {}", status));
547        }
548
549        let chain_id: ChainId = res.json().await.context("read response body json")?;
550
551        Ok(chain_id.chain_id)
552    }
553
554    /// Internal implementation of getting height from server
555    async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
556        let mut url = self.inner.url.clone();
557        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
558        segments.push("height");
559        std::mem::drop(segments);
560        let mut req = self.inner.http_client.request(Method::GET, url);
561        if let Some(http_timeout_override) = http_timeout_override {
562            req = req.timeout(http_timeout_override);
563        }
564
565        let res = req.send().await.context("execute http req")?;
566
567        let status = res.status();
568        if !status.is_success() {
569            return Err(anyhow!("http response status code {}", status));
570        }
571
572        let height: ArchiveHeight = res.json().await.context("read response body json")?;
573
574        Ok(height.height.unwrap_or(0))
575    }
576
577    /// Get the chain_id from the server with retries.
578    ///
579    /// # Example
580    /// ```no_run
581    /// use hypersync_client::Client;
582    ///
583    /// # async fn example() -> anyhow::Result<()> {
584    /// let client = Client::builder()
585    ///     .chain_id(1)
586    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
587    ///     .build()?;
588    ///
589    /// let chain_id = client.get_chain_id().await?;
590    /// println!("Connected to chain ID: {}", chain_id);
591    /// # Ok(())
592    /// # }
593    /// ```
594    pub async fn get_chain_id(&self) -> Result<u64> {
595        let mut base = self.inner.retry_base_ms;
596
597        let mut err = anyhow!("");
598
599        for _ in 0..self.inner.max_num_retries + 1 {
600            match self.get_chain_id_impl().await {
601                Ok(res) => return Ok(res),
602                Err(e) => {
603                    log::error!(
604                        "failed to get chain_id from server, retrying... The error was: {e:?}"
605                    );
606                    err = err.context(format!("{e:?}"));
607                }
608            }
609
610            let base_ms = Duration::from_millis(base);
611            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
612                rand::random(),
613                self.inner.retry_backoff_ms,
614            ));
615
616            tokio::time::sleep(base_ms + jitter).await;
617
618            base = std::cmp::min(
619                base + self.inner.retry_backoff_ms,
620                self.inner.retry_ceiling_ms,
621            );
622        }
623
624        Err(err)
625    }
626
627    /// Get the height of from server with retries.
628    ///
629    /// # Example
630    /// ```no_run
631    /// use hypersync_client::Client;
632    ///
633    /// # async fn example() -> anyhow::Result<()> {
634    /// let client = Client::builder()
635    ///     .chain_id(1)
636    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
637    ///     .build()?;
638    ///
639    /// let height = client.get_height().await?;
640    /// println!("Current block height: {}", height);
641    /// # Ok(())
642    /// # }
643    /// ```
644    pub async fn get_height(&self) -> Result<u64> {
645        let mut base = self.inner.retry_base_ms;
646
647        let mut err = anyhow!("");
648
649        for _ in 0..self.inner.max_num_retries + 1 {
650            match self.get_height_impl(None).await {
651                Ok(res) => return Ok(res),
652                Err(e) => {
653                    log::error!(
654                        "failed to get height from server, retrying... The error was: {e:?}"
655                    );
656                    err = err.context(format!("{e:?}"));
657                }
658            }
659
660            let base_ms = Duration::from_millis(base);
661            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
662                rand::random(),
663                self.inner.retry_backoff_ms,
664            ));
665
666            tokio::time::sleep(base_ms + jitter).await;
667
668            base = std::cmp::min(
669                base + self.inner.retry_backoff_ms,
670                self.inner.retry_ceiling_ms,
671            );
672        }
673
674        Err(err)
675    }
676
677    /// Get the height of the Client instance for health checks.
678    ///
679    /// Doesn't do any retries and the `http_req_timeout` parameter will override the http timeout config set when creating the client.
680    ///
681    /// # Example
682    /// ```no_run
683    /// use hypersync_client::Client;
684    /// use std::time::Duration;
685    ///
686    /// # async fn example() -> anyhow::Result<()> {
687    /// let client = Client::builder()
688    ///     .chain_id(1)
689    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
690    ///     .build()?;
691    ///
692    /// // Quick health check with 5 second timeout
693    /// let height = client.health_check(Some(Duration::from_secs(5))).await?;
694    /// println!("Server is healthy at block: {}", height);
695    /// # Ok(())
696    /// # }
697    /// ```
698    pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
699        self.get_height_impl(http_req_timeout).await
700    }
701
702    /// Executes query with retries and returns the response.
703    ///
704    /// # Example
705    /// ```no_run
706    /// use hypersync_client::{Client, net_types::{Query, BlockFilter, BlockField}};
707    ///
708    /// # async fn example() -> anyhow::Result<()> {
709    /// let client = Client::builder()
710    ///     .chain_id(1)
711    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
712    ///     .build()?;
713    ///
714    /// // Query all blocks from a specific range
715    /// let query = Query::new()
716    ///     .from_block(19000000)
717    ///     .to_block_excl(19000010)
718    ///     .include_all_blocks()
719    ///     .select_block_fields([BlockField::Number, BlockField::Hash, BlockField::Timestamp]);
720    /// let response = client.get(&query).await?;
721    ///
722    /// println!("Retrieved {} blocks", response.data.blocks.len());
723    /// # Ok(())
724    /// # }
725    /// ```
726    pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
727        let arrow_response = self.get_arrow(query).await.context("get data")?;
728        let converted =
729            QueryResponse::try_from(&arrow_response).context("convert arrow response")?;
730        Ok(converted)
731    }
732
733    /// Add block, transaction and log fields selection to the query, executes it with retries
734    /// and returns the response.
735    ///
736    /// This method automatically joins blocks, transactions, and logs into unified events,
737    /// making it easier to work with related blockchain data.
738    ///
739    /// # Example
740    /// ```no_run
741    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}};
742    ///
743    /// # async fn example() -> anyhow::Result<()> {
744    /// let client = Client::builder()
745    ///     .chain_id(1)
746    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
747    ///     .build()?;
748    ///
749    /// // Query ERC20 transfers with transaction context
750    /// let query = Query::new()
751    ///     .from_block(19000000)
752    ///     .to_block_excl(19000010)
753    ///     .where_logs(
754    ///         LogFilter::all()
755    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
756    ///     )
757    ///     .select_log_fields([LogField::Address, LogField::Data])
758    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
759    /// let response = client.get_events(query).await?;
760    ///
761    /// println!("Retrieved {} joined events", response.data.len());
762    /// # Ok(())
763    /// # }
764    /// ```
765    pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
766        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
767        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
768        let arrow_response = self.get_arrow(&query).await.context("get data")?;
769        EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy)
770    }
771
772    /// Executes query once and returns the result using JSON serialization.
773    async fn get_arrow_impl_json(
774        &self,
775        query: &Query,
776    ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
777        let mut url = self.inner.url.clone();
778        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
779        segments.push("query");
780        segments.push("arrow-ipc");
781        std::mem::drop(segments);
782        let req = self.inner.http_client.request(Method::POST, url);
783
784        let res = req.json(&query).send().await.context("execute http req")?;
785
786        let status = res.status();
787        let rate_limit = RateLimitInfo::from_response(&res);
788
789        if status == StatusCode::PAYLOAD_TOO_LARGE {
790            return Err(HyperSyncResponseError::PayloadTooLarge);
791        }
792        if status == StatusCode::TOO_MANY_REQUESTS {
793            return Err(HyperSyncResponseError::RateLimited { rate_limit });
794        }
795        if !status.is_success() {
796            let text = res.text().await.context("read text to see error")?;
797
798            return Err(HyperSyncResponseError::Other(anyhow!(
799                "http response status code {}, err body: {}",
800                status,
801                text
802            )));
803        }
804
805        let bytes = res.bytes().await.context("read response body bytes")?;
806
807        let res = tokio::task::block_in_place(|| {
808            parse_query_response(&bytes).context("parse query response")
809        })?;
810
811        Ok(ArrowImplResponse {
812            response: res,
813            response_bytes: bytes.len().try_into().unwrap(),
814            rate_limit,
815        })
816    }
817
818    fn should_cache_queries(&self) -> bool {
819        matches!(
820            self.inner.serialization_format,
821            SerializationFormat::CapnProto {
822                should_cache_queries: true
823            }
824        )
825    }
826
827    /// Executes query once and returns the result using Cap'n Proto serialization.
828    async fn get_arrow_impl_capnp(
829        &self,
830        query: &Query,
831    ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
832        let mut url = self.inner.url.clone();
833        let mut segments = url.path_segments_mut().ok().context("get path segments")?;
834        segments.push("query");
835        segments.push("arrow-ipc");
836        segments.push("capnp");
837        std::mem::drop(segments);
838
839        let should_cache = self.should_cache_queries();
840
841        if should_cache {
842            let query_with_id = {
843                let mut message = capnp::message::Builder::new_default();
844                let mut request_builder =
845                    message.init_root::<hypersync_net_types_capnp::request::Builder>();
846
847                request_builder
848                    .build_query_id_from_query(query)
849                    .context("build query id")?;
850                let mut query_with_id = Vec::new();
851                capnp::serialize_packed::write_message(&mut query_with_id, &message)
852                    .context("write capnp message")?;
853                query_with_id
854            };
855
856            let req = self.inner.http_client.request(Method::POST, url.clone());
857
858            let res = req
859                .body(query_with_id)
860                .send()
861                .await
862                .context("execute http req")?;
863
864            let status = res.status();
865            let rate_limit = RateLimitInfo::from_response(&res);
866
867            if status == StatusCode::PAYLOAD_TOO_LARGE {
868                return Err(HyperSyncResponseError::PayloadTooLarge);
869            }
870            if status == StatusCode::TOO_MANY_REQUESTS {
871                return Err(HyperSyncResponseError::RateLimited { rate_limit });
872            }
873            if status.is_success() {
874                let bytes = res.bytes().await.context("read response body bytes")?;
875
876                let mut opts = capnp::message::ReaderOptions::new();
877                opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
878                let message_reader = capnp::serialize_packed::read_message(bytes.as_ref(), opts)
879                    .context("create message reader")?;
880                let query_response = message_reader
881                    .get_root::<hypersync_net_types_capnp::cached_query_response::Reader>()
882                    .context("get cached_query_response root")?;
883                match query_response.get_either().which().context("get either")? {
884                hypersync_net_types_capnp::cached_query_response::either::Which::QueryResponse(
885                    query_response,
886                ) => {
887                    let res = tokio::task::block_in_place(|| {
888                        let res = query_response?;
889                        read_query_response(&res).context("parse query response cached")
890                    })?;
891                    return Ok(ArrowImplResponse {
892                        response: res,
893                        response_bytes: bytes.len().try_into().unwrap(),
894                        rate_limit,
895                    });
896                }
897                hypersync_net_types_capnp::cached_query_response::either::Which::NotCached(()) => {
898                    log::trace!("query was not cached, retrying with full query");
899                }
900            }
901            } else {
902                let text = res.text().await.context("read text to see error")?;
903                log::warn!(
904                    "Failed cache query, will retry full query. {}, err body: {}",
905                    status,
906                    text
907                );
908            }
909        };
910
911        let full_query_bytes = {
912            let mut message = capnp::message::Builder::new_default();
913            let mut query_builder =
914                message.init_root::<hypersync_net_types_capnp::request::Builder>();
915
916            query_builder
917                .build_full_query_from_query(query, should_cache)
918                .context("build full query")?;
919            let mut bytes = Vec::new();
920            capnp::serialize_packed::write_message(&mut bytes, &message)
921                .context("write full query capnp message")?;
922            bytes
923        };
924
925        let req = self.inner.http_client.request(Method::POST, url);
926
927        let res = req
928            .body(full_query_bytes)
929            .send()
930            .await
931            .context("execute http req")?;
932
933        let status = res.status();
934        let rate_limit = RateLimitInfo::from_response(&res);
935
936        if status == StatusCode::PAYLOAD_TOO_LARGE {
937            return Err(HyperSyncResponseError::PayloadTooLarge);
938        }
939        if status == StatusCode::TOO_MANY_REQUESTS {
940            return Err(HyperSyncResponseError::RateLimited { rate_limit });
941        }
942        if !status.is_success() {
943            let text = res.text().await.context("read text to see error")?;
944
945            return Err(HyperSyncResponseError::Other(anyhow!(
946                "http response status code {}, err body: {}",
947                status,
948                text
949            )));
950        }
951
952        let bytes = res.bytes().await.context("read response body bytes")?;
953
954        let res = tokio::task::block_in_place(|| {
955            parse_query_response(&bytes).context("parse query response")
956        })?;
957
958        Ok(ArrowImplResponse {
959            response: res,
960            response_bytes: bytes.len().try_into().unwrap(),
961            rate_limit,
962        })
963    }
964
965    /// Executes query once and returns the result, handling payload-too-large by halving block range.
966    async fn get_arrow_impl(
967        &self,
968        query: &Query,
969    ) -> std::result::Result<ArrowImplResponse, HyperSyncResponseError> {
970        let mut query = query.clone();
971        loop {
972            let res = match self.inner.serialization_format {
973                SerializationFormat::Json => self.get_arrow_impl_json(&query).await,
974                SerializationFormat::CapnProto { .. } => self.get_arrow_impl_capnp(&query).await,
975            };
976            match res {
977                Ok(res) => return Ok(res),
978                Err(
979                    e @ (HyperSyncResponseError::Other(_)
980                    | HyperSyncResponseError::RateLimited { .. }),
981                ) => return Err(e),
982                Err(HyperSyncResponseError::PayloadTooLarge) => {
983                    let block_range = if let Some(to_block) = query.to_block {
984                        let current = to_block - query.from_block;
985                        if current < 2 {
986                            return Err(HyperSyncResponseError::Other(anyhow!(
987                                "Payload is too large and query is using the minimum block range."
988                            )));
989                        }
990                        // Half the current block range
991                        current / 2
992                    } else {
993                        200
994                    };
995                    let to_block = query.from_block + block_range;
996                    query.to_block = Some(to_block);
997
998                    log::trace!(
999                        "Payload is too large, retrying with block range from: {} to: {}",
1000                        query.from_block,
1001                        to_block
1002                    );
1003                }
1004            }
1005        }
1006    }
1007
1008    /// Executes query with retries and returns the response in Arrow format.
1009    pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
1010        self.get_arrow_with_size(query)
1011            .await
1012            .map(|res| res.response)
1013    }
1014
1015    /// Internal implementation for get_arrow.
1016    async fn get_arrow_with_size(&self, query: &Query) -> Result<ArrowImplResponse> {
1017        let mut base = self.inner.retry_base_ms;
1018
1019        let mut err = anyhow!("");
1020
1021        // Proactive throttling: if we know we're rate limited, wait before sending
1022        if self.inner.proactive_rate_limit_sleep {
1023            self.wait_for_rate_limit().await;
1024        }
1025
1026        for _ in 0..self.inner.max_num_retries + 1 {
1027            match self.get_arrow_impl(query).await {
1028                Ok(res) => {
1029                    self.update_rate_limit_state(&res.rate_limit);
1030                    return Ok(res);
1031                }
1032                Err(HyperSyncResponseError::RateLimited { rate_limit }) => {
1033                    self.update_rate_limit_state(&rate_limit);
1034                    let wait_secs = rate_limit.suggested_wait_secs().unwrap_or(1) + 1;
1035                    log::warn!(
1036                        "rate limited by server ({rate_limit}), waiting {wait_secs}s before retry. To increase your rate limits, upgrade your plan at https://app.envio.dev/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens"
1037                    );
1038                    err = err.context(format!("rate limited by server ({rate_limit}). To increase your rate limits, upgrade your plan at https://app.envio.dev/api-tokens"));
1039                    tokio::time::sleep(Duration::from_secs(wait_secs)).await;
1040                    continue;
1041                }
1042                Err(HyperSyncResponseError::Other(e)) => {
1043                    log::error!(
1044                        "failed to get arrow data from server, retrying... The error was: {e:?}"
1045                    );
1046                    err = err.context(format!("{e:?}"));
1047                }
1048                Err(HyperSyncResponseError::PayloadTooLarge) => {
1049                    // This shouldn't happen since get_arrow_impl handles it, but just in case
1050                    log::error!("unexpected PayloadTooLarge from get_arrow_impl, retrying...");
1051                    err = err.context("unexpected PayloadTooLarge");
1052                }
1053            }
1054
1055            let base_ms = Duration::from_millis(base);
1056            let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
1057                rand::random(),
1058                self.inner.retry_backoff_ms,
1059            ));
1060
1061            tokio::time::sleep(base_ms + jitter).await;
1062
1063            base = std::cmp::min(
1064                base + self.inner.retry_backoff_ms,
1065                self.inner.retry_ceiling_ms,
1066            );
1067        }
1068
1069        Err(err)
1070    }
1071
1072    /// Spawns task to execute query and return data via a channel.
1073    ///
1074    /// # Example
1075    /// ```no_run
1076    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField}, StreamConfig};
1077    ///
1078    /// # async fn example() -> anyhow::Result<()> {
1079    /// let client = Client::builder()
1080    ///     .chain_id(1)
1081    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1082    ///     .build()?;
1083    ///
1084    /// // Stream all ERC20 transfer events
1085    /// let query = Query::new()
1086    ///     .from_block(19000000)
1087    ///     .where_logs(
1088    ///         LogFilter::all()
1089    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1090    ///     )
1091    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2, LogField::Data]);
1092    /// let mut receiver = client.stream(query, StreamConfig::default()).await?;
1093    ///
1094    /// while let Some(response) = receiver.recv().await {
1095    ///     let response = response?;
1096    ///     println!("Got {} events up to block: {}", response.data.logs.len(), response.next_block);
1097    /// }
1098    /// # Ok(())
1099    /// # }
1100    /// ```
1101    pub async fn stream(
1102        &self,
1103        query: Query,
1104        config: StreamConfig,
1105    ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
1106        check_simple_stream_params(&config)?;
1107
1108        let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
1109            mpsc::channel(config.concurrency);
1110
1111        let mut inner_rx = self
1112            .stream_arrow(query, config)
1113            .await
1114            .context("start inner stream")?;
1115
1116        tokio::spawn(async move {
1117            while let Some(resp) = inner_rx.recv().await {
1118                let msg = resp
1119                    .context("inner receiver")
1120                    .and_then(|r| QueryResponse::try_from(&r));
1121                let is_err = msg.is_err();
1122                if tx.send(msg).await.is_err() || is_err {
1123                    return;
1124                }
1125            }
1126        });
1127
1128        Ok(rx)
1129    }
1130
1131    /// Add block, transaction and log fields selection to the query and spawns task to execute it,
1132    /// returning data via a channel.
1133    ///
1134    /// This method automatically joins blocks, transactions, and logs into unified events,
1135    /// then streams them via a channel for real-time processing.
1136    ///
1137    /// # Example
1138    /// ```no_run
1139    /// use hypersync_client::{Client, net_types::{Query, LogFilter, LogField, TransactionField}, StreamConfig};
1140    ///
1141    /// # async fn example() -> anyhow::Result<()> {
1142    /// let client = Client::builder()
1143    ///     .chain_id(1)
1144    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1145    ///     .build()?;
1146    ///
1147    /// // Stream NFT transfer events with transaction context
1148    /// let query = Query::new()
1149    ///     .from_block(19000000)
1150    ///     .where_logs(
1151    ///         LogFilter::all()
1152    ///             .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])?
1153    ///     )
1154    ///     .select_log_fields([LogField::Address, LogField::Topic1, LogField::Topic2])
1155    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From]);
1156    /// let mut receiver = client.stream_events(query, StreamConfig::default()).await?;
1157    ///
1158    /// while let Some(response) = receiver.recv().await {
1159    ///     let response = response?;
1160    ///     println!("Got {} joined events up to block: {}", response.data.len(), response.next_block);
1161    /// }
1162    /// # Ok(())
1163    /// # }
1164    /// ```
1165    pub async fn stream_events(
1166        &self,
1167        mut query: Query,
1168        config: StreamConfig,
1169    ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
1170        check_simple_stream_params(&config)?;
1171
1172        let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
1173
1174        event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
1175
1176        let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
1177            mpsc::channel(config.concurrency);
1178
1179        let mut inner_rx = self
1180            .stream_arrow(query, config)
1181            .await
1182            .context("start inner stream")?;
1183
1184        tokio::spawn(async move {
1185            while let Some(resp) = inner_rx.recv().await {
1186                let msg = resp
1187                    .context("inner receiver")
1188                    .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy));
1189                let is_err = msg.is_err();
1190                if tx.send(msg).await.is_err() || is_err {
1191                    return;
1192                }
1193            }
1194        });
1195
1196        Ok(rx)
1197    }
1198
1199    /// Spawns task to execute query and return data via a channel in Arrow format.
1200    ///
1201    /// Returns raw Apache Arrow data via a channel for high-performance processing.
1202    /// Ideal for applications that need to work directly with columnar data.
1203    ///
1204    /// # Example
1205    /// ```no_run
1206    /// use hypersync_client::{Client, net_types::{Query, TransactionFilter, TransactionField}, StreamConfig};
1207    ///
1208    /// # async fn example() -> anyhow::Result<()> {
1209    /// let client = Client::builder()
1210    ///     .chain_id(1)
1211    ///     .api_token(std::env::var("ENVIO_API_TOKEN")?)
1212    ///     .build()?;
1213    ///
1214    /// // Stream transaction data in Arrow format for analytics
1215    /// let query = Query::new()
1216    ///     .from_block(19000000)
1217    ///     .to_block_excl(19000100)
1218    ///     .where_transactions(
1219    ///         TransactionFilter::all()
1220    ///             .and_contract_address(["0xA0b86a33E6411b87Fd9D3DF822C8698FC06BBe4c"])?
1221    ///     )
1222    ///     .select_transaction_fields([TransactionField::Hash, TransactionField::From, TransactionField::Value]);
1223    /// let mut receiver = client.stream_arrow(query, StreamConfig::default()).await?;
1224    ///
1225    /// while let Some(response) = receiver.recv().await {
1226    ///     let response = response?;
1227    ///     println!("Got {} Arrow batches for transactions", response.data.transactions.len());
1228    /// }
1229    /// # Ok(())
1230    /// # }
1231    /// ```
1232    pub async fn stream_arrow(
1233        &self,
1234        query: Query,
1235        config: StreamConfig,
1236    ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
1237        stream::stream_arrow(self, query, config).await
1238    }
1239
1240    /// Executes query with retries and returns the response in Arrow format along with
1241    /// rate limit information from the server.
1242    ///
1243    /// This is useful for consumers that want to inspect rate limit headers and implement
1244    /// their own rate limiting logic in external systems.
1245    pub async fn get_arrow_with_rate_limit(
1246        &self,
1247        query: &Query,
1248    ) -> Result<QueryResponseWithRateLimit<ArrowResponseData>> {
1249        let result = self.get_arrow_with_size(query).await?;
1250        Ok(QueryResponseWithRateLimit {
1251            response: result.response,
1252            rate_limit: result.rate_limit,
1253        })
1254    }
1255
1256    /// Executes query with retries and returns the response along with
1257    /// rate limit information from the server.
1258    ///
1259    /// This is useful for consumers that want to inspect rate limit headers and implement
1260    /// their own rate limiting logic in external systems.
1261    pub async fn get_with_rate_limit(
1262        &self,
1263        query: &Query,
1264    ) -> Result<QueryResponseWithRateLimit<ResponseData>> {
1265        let result = self.get_arrow_with_rate_limit(query).await?;
1266        let converted =
1267            QueryResponse::try_from(&result.response).context("convert arrow response")?;
1268        Ok(QueryResponseWithRateLimit {
1269            response: converted,
1270            rate_limit: result.rate_limit,
1271        })
1272    }
1273
1274    /// Returns the most recently observed rate limit information, if any.
1275    ///
1276    /// Updated after every request (including inside streams). Returns `None`
1277    /// if no requests have been made yet or the server hasn't returned rate limit headers.
1278    pub fn rate_limit_info(&self) -> Option<RateLimitInfo> {
1279        self.inner
1280            .rate_limit_state
1281            .lock()
1282            .expect("rate_limit_state mutex poisoned")
1283            .as_ref()
1284            .map(|(info, _captured_at)| info.clone())
1285    }
1286
1287    /// Waits until the current rate limit window resets, if the client is rate limited.
1288    ///
1289    /// Returns immediately if:
1290    /// - No rate limit information has been observed yet
1291    /// - There is remaining quota in the current window
1292    ///
1293    /// This method is useful for consumers who want to explicitly wait before making
1294    /// requests, for example when coordinating rate limits across multiple systems.
1295    pub async fn wait_for_rate_limit(&self) {
1296        let wait_info = {
1297            let state = self
1298                .inner
1299                .rate_limit_state
1300                .lock()
1301                .expect("rate_limit_state mutex poisoned");
1302            match state.as_ref() {
1303                Some((info, captured_at)) if info.is_rate_limited() => {
1304                    info.suggested_wait_secs().map(|secs| {
1305                        let elapsed = captured_at.elapsed().as_secs();
1306                        let remaining_wait = secs.saturating_sub(elapsed);
1307                        (remaining_wait, info.clone())
1308                    })
1309                }
1310                _ => None,
1311            }
1312        };
1313        if let Some((secs, info)) = wait_info {
1314            if secs > 0 {
1315                log::warn!(
1316                    "rate limit exhausted ({info}), proactively waiting {secs}s for window reset. To increase your rate limits, upgrade your plan at https://app.envio.dev/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens"
1317                );
1318                tokio::time::sleep(Duration::from_secs(secs)).await;
1319            }
1320        }
1321    }
1322
1323    /// Updates the internally tracked rate limit state with the current timestamp.
1324    fn update_rate_limit_state(&self, rate_limit: &RateLimitInfo) {
1325        // Only update if the response actually contained rate limit headers
1326        if rate_limit.limit.is_some()
1327            || rate_limit.remaining.is_some()
1328            || rate_limit.reset_secs.is_some()
1329        {
1330            let mut state = self
1331                .inner
1332                .rate_limit_state
1333                .lock()
1334                .expect("rate_limit_state mutex poisoned");
1335            *state = Some((rate_limit.clone(), Instant::now()));
1336        }
1337    }
1338
1339    /// Getter for url field.
1340    ///
1341    /// # Example
1342    /// ```
1343    /// use hypersync_client::Client;
1344    ///
1345    /// let client = Client::builder()
1346    ///     .chain_id(1)
1347    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1348    ///     .build()
1349    ///     .unwrap();
1350    ///
1351    /// println!("Client URL: {}", client.url());
1352    /// ```
1353    pub fn url(&self) -> &Url {
1354        &self.inner.url
1355    }
1356}
1357
1358/// Builder for creating a hypersync client with configuration options.
1359///
1360/// This builder provides a fluent API for configuring client settings like URL,
1361/// authentication, timeouts, and retry behavior.
1362///
1363/// # Example
1364/// ```
1365/// use hypersync_client::{Client, SerializationFormat};
1366///
1367/// let client = Client::builder()
1368///     .chain_id(1)
1369///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1370///     .http_req_timeout_millis(30000)
1371///     .max_num_retries(3)
1372///     .build()
1373///     .unwrap();
1374/// ```
1375#[derive(Debug, Clone, Default)]
1376pub struct ClientBuilder(ClientConfig);
1377
1378impl ClientBuilder {
1379    /// Creates a new ClientBuilder with default configuration.
1380    pub fn new() -> Self {
1381        Self::default()
1382    }
1383
1384    /// Sets the chain ID and automatically configures the URL for the hypersync endpoint.
1385    ///
1386    /// This is a convenience method that sets the URL to `https://{chain_id}.hypersync.xyz`.
1387    ///
1388    /// # Arguments
1389    /// * `chain_id` - The blockchain chain ID (e.g., 1 for Ethereum mainnet)
1390    ///
1391    /// # Example
1392    /// ```
1393    /// use hypersync_client::Client;
1394    ///
1395    /// let client = Client::builder()
1396    ///     .chain_id(1) // Ethereum mainnet
1397    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1398    ///     .build()
1399    ///     .unwrap();
1400    /// ```
1401    pub fn chain_id(mut self, chain_id: u64) -> Self {
1402        self.0.url = format!("https://{chain_id}.hypersync.xyz");
1403        self
1404    }
1405
1406    /// Sets a custom URL for the hypersync server.
1407    ///
1408    /// Use this method when you need to connect to a custom hypersync endpoint
1409    /// instead of the default public endpoints.
1410    ///
1411    /// # Arguments
1412    /// * `url` - The hypersync server URL
1413    ///
1414    /// # Example
1415    /// ```
1416    /// use hypersync_client::Client;
1417    ///
1418    /// let client = Client::builder()
1419    ///     .url("https://my-custom-hypersync.example.com")
1420    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1421    ///     .build()
1422    ///     .unwrap();
1423    /// ```
1424    pub fn url<S: ToString>(mut self, url: S) -> Self {
1425        self.0.url = url.to_string();
1426        self
1427    }
1428
1429    /// Sets the api token for authentication.
1430    ///
1431    /// Required for accessing authenticated hypersync endpoints.
1432    ///
1433    /// # Arguments
1434    /// * `api_token` - The authentication token
1435    ///
1436    /// # Example
1437    /// ```
1438    /// use hypersync_client::Client;
1439    ///
1440    /// let client = Client::builder()
1441    ///     .chain_id(1)
1442    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1443    ///     .build()
1444    ///     .unwrap();
1445    /// ```
1446    pub fn api_token<S: ToString>(mut self, api_token: S) -> Self {
1447        self.0.api_token = api_token.to_string();
1448        self
1449    }
1450
1451    /// Sets the HTTP request timeout in milliseconds.
1452    ///
1453    /// # Arguments
1454    /// * `http_req_timeout_millis` - Timeout in milliseconds (default: 30000)
1455    ///
1456    /// # Example
1457    /// ```
1458    /// use hypersync_client::Client;
1459    ///
1460    /// let client = Client::builder()
1461    ///     .chain_id(1)
1462    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1463    ///     .http_req_timeout_millis(60000) // 60 second timeout
1464    ///     .build()
1465    ///     .unwrap();
1466    /// ```
1467    pub fn http_req_timeout_millis(mut self, http_req_timeout_millis: u64) -> Self {
1468        self.0.http_req_timeout_millis = http_req_timeout_millis;
1469        self
1470    }
1471
1472    /// Sets the maximum number of retries for failed requests.
1473    ///
1474    /// # Arguments
1475    /// * `max_num_retries` - Maximum number of retries (default: 10)
1476    ///
1477    /// # Example
1478    /// ```
1479    /// use hypersync_client::Client;
1480    ///
1481    /// let client = Client::builder()
1482    ///     .chain_id(1)
1483    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1484    ///     .max_num_retries(5)
1485    ///     .build()
1486    ///     .unwrap();
1487    /// ```
1488    pub fn max_num_retries(mut self, max_num_retries: usize) -> Self {
1489        self.0.max_num_retries = max_num_retries;
1490        self
1491    }
1492
1493    /// Sets the backoff increment for retry delays.
1494    ///
1495    /// This value is added to the base delay on each retry attempt.
1496    ///
1497    /// # Arguments
1498    /// * `retry_backoff_ms` - Backoff increment in milliseconds (default: 500)
1499    ///
1500    /// # Example
1501    /// ```
1502    /// use hypersync_client::Client;
1503    ///
1504    /// let client = Client::builder()
1505    ///     .chain_id(1)
1506    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1507    ///     .retry_backoff_ms(1000) // 1 second backoff increment
1508    ///     .build()
1509    ///     .unwrap();
1510    /// ```
1511    pub fn retry_backoff_ms(mut self, retry_backoff_ms: u64) -> Self {
1512        self.0.retry_backoff_ms = retry_backoff_ms;
1513        self
1514    }
1515
1516    /// Sets the initial delay for retry attempts.
1517    ///
1518    /// # Arguments
1519    /// * `retry_base_ms` - Initial retry delay in milliseconds (default: 500)
1520    ///
1521    /// # Example
1522    /// ```
1523    /// use hypersync_client::Client;
1524    ///
1525    /// let client = Client::builder()
1526    ///     .chain_id(1)
1527    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1528    ///     .retry_base_ms(1000) // Start with 1 second delay
1529    ///     .build()
1530    ///     .unwrap();
1531    /// ```
1532    pub fn retry_base_ms(mut self, retry_base_ms: u64) -> Self {
1533        self.0.retry_base_ms = retry_base_ms;
1534        self
1535    }
1536
1537    /// Sets the maximum delay for retry attempts.
1538    ///
1539    /// The retry delay will not exceed this value, even with backoff increments.
1540    ///
1541    /// # Arguments
1542    /// * `retry_ceiling_ms` - Maximum retry delay in milliseconds (default: 10000)
1543    ///
1544    /// # Example
1545    /// ```
1546    /// use hypersync_client::Client;
1547    ///
1548    /// let client = Client::builder()
1549    ///     .chain_id(1)
1550    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1551    ///     .retry_ceiling_ms(30000) // Cap at 30 seconds
1552    ///     .build()
1553    ///     .unwrap();
1554    /// ```
1555    pub fn retry_ceiling_ms(mut self, retry_ceiling_ms: u64) -> Self {
1556        self.0.retry_ceiling_ms = retry_ceiling_ms;
1557        self
1558    }
1559
1560    /// Sets whether to proactively sleep when rate limited.
1561    ///
1562    /// When enabled (default), the client will wait for the rate limit window to reset
1563    /// before sending requests it knows will be rejected. Set to `false` to disable
1564    /// this behavior and handle rate limits yourself.
1565    ///
1566    /// # Arguments
1567    /// * `proactive_rate_limit_sleep` - Whether to enable proactive rate limit sleeping (default: true)
1568    pub fn proactive_rate_limit_sleep(mut self, proactive_rate_limit_sleep: bool) -> Self {
1569        self.0.proactive_rate_limit_sleep = proactive_rate_limit_sleep;
1570        self
1571    }
1572
1573    /// Sets the serialization format for client-server communication.
1574    ///
1575    /// # Arguments
1576    /// * `serialization_format` - The format to use (JSON or CapnProto)
1577    ///
1578    /// # Example
1579    /// ```
1580    /// use hypersync_client::{Client, SerializationFormat};
1581    ///
1582    /// let client = Client::builder()
1583    ///     .chain_id(1)
1584    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1585    ///     .serialization_format(SerializationFormat::Json)
1586    ///     .build()
1587    ///     .unwrap();
1588    /// ```
1589    pub fn serialization_format(mut self, serialization_format: SerializationFormat) -> Self {
1590        self.0.serialization_format = serialization_format;
1591        self
1592    }
1593
1594    /// Builds the client with the configured settings.
1595    ///
1596    /// # Returns
1597    /// * `Result<Client>` - The configured client or an error if configuration is invalid
1598    ///
1599    /// # Errors
1600    /// Returns an error if:
1601    /// * The URL is malformed
1602    /// * Required configuration is missing
1603    ///
1604    /// # Example
1605    /// ```
1606    /// use hypersync_client::Client;
1607    ///
1608    /// let client = Client::builder()
1609    ///     .chain_id(1)
1610    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1611    ///     .build()
1612    ///     .unwrap();
1613    /// ```
1614    pub fn build(self) -> Result<Client> {
1615        if self.0.url.is_empty() {
1616            anyhow::bail!(
1617                "endpoint needs to be set, try using builder.chain_id(1) or\
1618                builder.url(\"https://eth.hypersync.xyz\") to set the endpoint"
1619            )
1620        }
1621        Client::new(self.0)
1622    }
1623}
1624
1625/// 200ms
1626const INITIAL_RECONNECT_DELAY: Duration = Duration::from_millis(200);
1627const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);
1628/// Timeout for detecting dead connections. Server sends keepalive pings every 5s,
1629/// so we timeout after 15s (3x the ping interval).
1630const READ_TIMEOUT: Duration = Duration::from_secs(15);
1631
1632/// Events emitted by the height stream.
1633#[derive(Debug, Clone, PartialEq, Eq)]
1634pub enum HeightStreamEvent {
1635    /// Successfully connected or reconnected to the SSE stream.
1636    Connected,
1637    /// Received a height update from the server.
1638    Height(u64),
1639    /// Connection lost, will attempt to reconnect after the specified delay.
1640    Reconnecting {
1641        /// Duration to wait before attempting reconnection.
1642        delay: Duration,
1643        /// The error that caused the reconnection.
1644        error_msg: String,
1645    },
1646}
1647
1648enum InternalStreamEvent {
1649    Publish(HeightStreamEvent),
1650    Ping,
1651    Unknown(String),
1652}
1653
1654impl Client {
1655    fn get_es_stream(&self) -> Result<EventSource> {
1656        // Build the GET /height/sse request
1657        let mut url = self.inner.url.clone();
1658        url.path_segments_mut()
1659            .ok()
1660            .context("invalid base URL")?
1661            .extend(&["height", "sse"]);
1662
1663        let req = self
1664            .inner
1665            .http_client
1666            // Don't set timeout for SSE stream
1667            .request_no_timeout(Method::GET, url);
1668
1669        // Configure exponential backoff for library-level retries
1670        let retry_policy = ExponentialBackoff::new(
1671            INITIAL_RECONNECT_DELAY,
1672            2.0,
1673            Some(MAX_RECONNECT_DELAY),
1674            None, // unlimited retries
1675        );
1676
1677        // Turn the request into an EventSource stream with retries
1678        let mut es = reqwest_eventsource::EventSource::new(req)
1679            .context("unexpected error creating EventSource")?;
1680        es.set_retry_policy(Box::new(retry_policy));
1681        Ok(es)
1682    }
1683
1684    async fn next_height(event_source: &mut EventSource) -> Result<Option<InternalStreamEvent>> {
1685        let Some(res) = tokio::time::timeout(READ_TIMEOUT, event_source.next())
1686            .await
1687            .map_err(|d| anyhow::anyhow!("stream timed out after {d}"))?
1688        else {
1689            return Ok(None);
1690        };
1691
1692        let e = match res.context("failed response")? {
1693            Event::Open => InternalStreamEvent::Publish(HeightStreamEvent::Connected),
1694            Event::Message(event) => match event.event.as_str() {
1695                "height" => {
1696                    let height = event
1697                        .data
1698                        .trim()
1699                        .parse::<u64>()
1700                        .context("parsing height from event data")?;
1701                    InternalStreamEvent::Publish(HeightStreamEvent::Height(height))
1702                }
1703                "ping" => InternalStreamEvent::Ping,
1704                _ => InternalStreamEvent::Unknown(format!("unknown event: {:?}", event)),
1705            },
1706        };
1707
1708        Ok(Some(e))
1709    }
1710
1711    async fn stream_height_events(
1712        es: &mut EventSource,
1713        tx: &mpsc::Sender<HeightStreamEvent>,
1714    ) -> Result<bool> {
1715        let mut received_an_event = false;
1716        while let Some(event) = Self::next_height(es).await.context("failed next height")? {
1717            match event {
1718                InternalStreamEvent::Publish(event) => {
1719                    received_an_event = true;
1720                    if tx.send(event).await.is_err() {
1721                        return Ok(received_an_event); // Receiver dropped, exit task
1722                    }
1723                }
1724                InternalStreamEvent::Ping => (), // ignore pings
1725                InternalStreamEvent::Unknown(_event) => (), // ignore unknown events
1726            }
1727        }
1728        Ok(received_an_event)
1729    }
1730
1731    fn get_delay(consecutive_failures: u32) -> Duration {
1732        if consecutive_failures > 0 {
1733            /// helper function to calculate 2^x
1734            /// optimization using bit shifting
1735            const fn two_to_pow(x: u32) -> u32 {
1736                1 << x
1737            }
1738            // Exponential backoff: 200ms, 400ms, 800ms, ... up to 30s
1739            INITIAL_RECONNECT_DELAY
1740                .saturating_mul(two_to_pow(consecutive_failures - 1))
1741                .min(MAX_RECONNECT_DELAY)
1742        } else {
1743            // On zero consecutive failures, 0 delay
1744            Duration::from_millis(0)
1745        }
1746    }
1747
1748    async fn stream_height_events_with_retry(
1749        &self,
1750        tx: &mpsc::Sender<HeightStreamEvent>,
1751    ) -> Result<()> {
1752        let mut consecutive_failures = 0u32;
1753
1754        loop {
1755            // should always be able to creat a new es stream
1756            // something is wrong with the req builder otherwise
1757            let mut es = self.get_es_stream().context("get es stream")?;
1758
1759            let mut error = anyhow!("");
1760
1761            match Self::stream_height_events(&mut es, tx).await {
1762                Ok(received_an_event) => {
1763                    if received_an_event {
1764                        consecutive_failures = 0; // Reset after successful connection that then failed
1765                    }
1766                    log::trace!("Stream height exited");
1767                }
1768                Err(e) => {
1769                    log::trace!("Stream height failed: {e:?}");
1770                    error = e;
1771                }
1772            }
1773
1774            es.close();
1775
1776            // If the receiver is closed, exit the task
1777            if tx.is_closed() {
1778                break;
1779            }
1780
1781            let delay = Self::get_delay(consecutive_failures);
1782            log::trace!("Reconnecting in {:?}...", delay);
1783
1784            let error_msg = format!("{error:?}");
1785
1786            if tx
1787                .send(HeightStreamEvent::Reconnecting { delay, error_msg })
1788                .await
1789                .is_err()
1790            {
1791                return Ok(()); // Receiver dropped, exit task
1792            }
1793            tokio::time::sleep(delay).await;
1794
1795            // increment consecutive failures so that on the next try
1796            // it will start using back offs
1797            consecutive_failures += 1;
1798        }
1799
1800        Ok(())
1801    }
1802
1803    /// Streams archive height updates from the server via Server-Sent Events.
1804    ///
1805    /// Establishes a long-lived SSE connection to `/height/sse` that automatically reconnects
1806    /// on disconnection with exponential backoff (200ms → 400ms → ... → max 30s).
1807    ///
1808    /// The stream emits [`HeightStreamEvent`] to notify consumers of connection state changes
1809    /// and height updates. This allows applications to display connection status to users.
1810    ///
1811    /// # Returns
1812    /// Channel receiver yielding [`HeightStreamEvent`]s. The background task handles connection
1813    /// lifecycle and sends events through this channel.
1814    ///
1815    /// # Example
1816    /// ```
1817    /// # use hypersync_client::{Client, HeightStreamEvent};
1818    /// # async fn example() -> anyhow::Result<()> {
1819    /// let client = Client::builder()
1820    ///     .url("https://eth.hypersync.xyz")
1821    ///     .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1822    ///     .build()?;
1823    ///
1824    /// let mut rx = client.stream_height();
1825    ///
1826    /// while let Some(event) = rx.recv().await {
1827    ///     match event {
1828    ///         HeightStreamEvent::Connected => println!("Connected to stream"),
1829    ///         HeightStreamEvent::Height(h) => println!("Height: {}", h),
1830    ///         HeightStreamEvent::Reconnecting { delay, error_msg } => {
1831    ///             println!("Reconnecting in {delay:?} due to error: {error_msg}")
1832    ///         }
1833    ///     }
1834    /// }
1835    /// # Ok(())
1836    /// # }
1837    /// ```
1838    pub fn stream_height(&self) -> mpsc::Receiver<HeightStreamEvent> {
1839        let (tx, rx) = mpsc::channel(16);
1840        let client = self.clone();
1841
1842        tokio::spawn(async move {
1843            if let Err(e) = client.stream_height_events_with_retry(&tx).await {
1844                log::error!("Stream height failed unexpectedly: {e:?}");
1845            }
1846        });
1847
1848        rx
1849    }
1850}
1851
1852fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
1853    if config.event_signature.is_some() {
1854        return Err(anyhow!(
1855            "config.event_signature can't be passed to simple type function. User is expected to \
1856             decode the logs using Decoder."
1857        ));
1858    }
1859    if config.column_mapping.is_some() {
1860        return Err(anyhow!(
1861            "config.column_mapping can't be passed to single type function. User is expected to \
1862             map values manually."
1863        ));
1864    }
1865
1866    Ok(())
1867}
1868
1869/// Used to indicate whether or not a retry should be attempted.
1870#[derive(Debug, thiserror::Error)]
1871pub enum HyperSyncResponseError {
1872    /// Means that the client should retry with a smaller block range.
1873    #[error("hypersync responded with 'payload too large' error")]
1874    PayloadTooLarge,
1875    /// Server responded with 429 Too Many Requests.
1876    #[error("rate limited by server. To increase your rate limits, upgrade your plan at https://app.envio.dev/api-tokens. For more info: https://docs.envio.dev/docs/HyperSync/api-tokens")]
1877    RateLimited {
1878        /// Rate limit information from the 429 response headers.
1879        rate_limit: RateLimitInfo,
1880    },
1881    /// Any other server error.
1882    #[error(transparent)]
1883    Other(#[from] anyhow::Error),
1884}
1885
1886/// Result of a single Arrow query execution, before retry logic.
1887struct ArrowImplResponse {
1888    /// The parsed Arrow response data.
1889    response: ArrowResponse,
1890    /// Size of the response body in bytes.
1891    response_bytes: u64,
1892    /// Rate limit information parsed from response headers.
1893    rate_limit: RateLimitInfo,
1894}
1895
1896#[cfg(test)]
1897mod tests {
1898    use super::*;
1899    #[test]
1900    fn test_get_delay() {
1901        assert_eq!(
1902            Client::get_delay(0),
1903            Duration::from_millis(0),
1904            "starts with 0 delay"
1905        );
1906        // powers of 2 backoff
1907        assert_eq!(Client::get_delay(1), Duration::from_millis(200));
1908        assert_eq!(Client::get_delay(2), Duration::from_millis(400));
1909        assert_eq!(Client::get_delay(3), Duration::from_millis(800));
1910        // maxes out at 30s
1911        assert_eq!(
1912            Client::get_delay(9),
1913            Duration::from_secs(30),
1914            "max delay is 30s"
1915        );
1916        assert_eq!(
1917            Client::get_delay(10),
1918            Duration::from_secs(30),
1919            "max delay is 30s"
1920        );
1921    }
1922
1923    #[tokio::test]
1924    #[ignore = "integration test requiring live hs server"]
1925    async fn test_http2_is_used() -> anyhow::Result<()> {
1926        let api_token = std::env::var("ENVIO_API_TOKEN")?;
1927        let client = reqwest::Client::builder()
1928            .no_gzip()
1929            .user_agent("hscr-test")
1930            .build()?;
1931
1932        let res = client
1933            .get("https://eth.hypersync.xyz/height")
1934            .bearer_auth(&api_token)
1935            .send()
1936            .await?;
1937
1938        assert_eq!(
1939            res.version(),
1940            reqwest::Version::HTTP_2,
1941            "expected HTTP/2 but got {:?}",
1942            res.version()
1943        );
1944        assert!(res.status().is_success());
1945        Ok(())
1946    }
1947
1948    #[tokio::test]
1949    #[ignore = "integration test with live hs server for height stream"]
1950    async fn test_stream_height_events() -> anyhow::Result<()> {
1951        let (tx, mut rx) = mpsc::channel(16);
1952        let handle = tokio::spawn(async move {
1953            let client = Client::builder()
1954                .url("https://monad-testnet.hypersync.xyz")
1955                .api_token(std::env::var("ENVIO_API_TOKEN").unwrap())
1956                .build()?;
1957            let mut es = client.get_es_stream().context("get es stream")?;
1958            Client::stream_height_events(&mut es, &tx).await
1959        });
1960
1961        let val = rx.recv().await;
1962        assert!(val.is_some());
1963        assert_eq!(val.unwrap(), HeightStreamEvent::Connected);
1964        let Some(HeightStreamEvent::Height(height)) = rx.recv().await else {
1965            panic!("should have received height")
1966        };
1967        let Some(HeightStreamEvent::Height(height2)) = rx.recv().await else {
1968            panic!("should have received height")
1969        };
1970        assert!(height2 > height);
1971        drop(rx);
1972
1973        let res = handle.await.expect("should have joined");
1974        let received_an_event =
1975            res.expect("should have ended the stream gracefully after dropping rx");
1976        assert!(received_an_event, "should have received an event");
1977
1978        Ok(())
1979    }
1980}