Skip to main content

hypersync_client/
config.rs

1use anyhow::Result;
2use schemars::JsonSchema;
3use serde::{Deserialize, Serialize};
4use url::Url;
5
6use crate::ColumnMapping;
7
8/// Configuration for the hypersync client.
9#[derive(Debug, Clone, Deserialize, Serialize)]
10pub struct ClientConfig {
11    /// HyperSync server URL.
12    #[serde(default)]
13    pub url: String,
14    /// HyperSync server api token.
15    #[serde(default)]
16    pub api_token: String,
17    /// Milliseconds to wait for a response before timing out.
18    #[serde(default = "ClientConfig::default_http_req_timeout_millis")]
19    pub http_req_timeout_millis: u64,
20    /// Number of retries to attempt before returning error.
21    #[serde(default = "ClientConfig::default_max_num_retries")]
22    pub max_num_retries: usize,
23    /// Milliseconds that would be used for retry backoff increasing.
24    #[serde(default = "ClientConfig::default_retry_backoff_ms")]
25    pub retry_backoff_ms: u64,
26    /// Initial wait time for request backoff.
27    #[serde(default = "ClientConfig::default_retry_base_ms")]
28    pub retry_base_ms: u64,
29    /// Ceiling time for request backoff.
30    #[serde(default = "ClientConfig::default_retry_ceiling_ms")]
31    pub retry_ceiling_ms: u64,
32    /// Query serialization format to use for HTTP requests.
33    #[serde(default)]
34    pub serialization_format: SerializationFormat,
35    /// Whether to proactively sleep when the rate limit is exhausted instead of
36    /// sending requests that will be rejected with 429.
37    ///
38    /// Enabled by default. Set to `false` to opt out and handle rate limits yourself.
39    #[serde(default = "ClientConfig::default_proactive_rate_limit_sleep")]
40    pub proactive_rate_limit_sleep: bool,
41}
42
43impl Default for ClientConfig {
44    fn default() -> Self {
45        Self {
46            url: String::default(),
47            api_token: String::default(),
48            http_req_timeout_millis: Self::default_http_req_timeout_millis(),
49            max_num_retries: Self::default_max_num_retries(),
50            retry_backoff_ms: Self::default_retry_backoff_ms(),
51            retry_base_ms: Self::default_retry_base_ms(),
52            retry_ceiling_ms: Self::default_retry_ceiling_ms(),
53            serialization_format: SerializationFormat::default(),
54            proactive_rate_limit_sleep: Self::default_proactive_rate_limit_sleep(),
55        }
56    }
57}
58
59impl ClientConfig {
60    /// Default HTTP request timeout in milliseconds
61    pub const fn default_http_req_timeout_millis() -> u64 {
62        30_000
63    }
64
65    /// Default maximum number of retries
66    pub const fn default_max_num_retries() -> usize {
67        12
68    }
69
70    /// Default retry backoff in milliseconds
71    pub const fn default_retry_backoff_ms() -> u64 {
72        500
73    }
74
75    /// Default retry base time in milliseconds
76    pub const fn default_retry_base_ms() -> u64 {
77        200
78    }
79
80    /// Default retry ceiling time in milliseconds
81    pub const fn default_retry_ceiling_ms() -> u64 {
82        5_000
83    }
84
85    /// Default proactive rate limit sleep setting
86    pub const fn default_proactive_rate_limit_sleep() -> bool {
87        true
88    }
89    /// Validates the config
90    pub fn validate(&self) -> Result<()> {
91        if self.url.is_empty() {
92            anyhow::bail!("url is required");
93        }
94
95        // validate that url is a valid url
96        if Url::parse(&self.url).is_err() {
97            anyhow::bail!("url is malformed");
98        }
99
100        if self.api_token.is_empty() {
101            anyhow::bail!("api_token is required - get one from https://envio.dev/app/api-tokens");
102        }
103        // validate that api token is a uuid
104        if uuid::Uuid::parse_str(self.api_token.as_str()).is_err() {
105            anyhow::bail!("api_token is malformed - make sure its a token from https://envio.dev/app/api-tokens");
106        }
107
108        if self.http_req_timeout_millis == 0 {
109            anyhow::bail!("http_req_timeout_millis must be greater than 0");
110        }
111
112        Ok(())
113    }
114}
115
116/// Determines query serialization format for HTTP requests.
117#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
118pub enum SerializationFormat {
119    /// Use JSON serialization
120    Json,
121    /// Use Cap'n Proto binary serialization (default, with query caching enabled)
122    CapnProto {
123        /// Whether to use query caching
124        should_cache_queries: bool,
125    },
126}
127
128impl Default for SerializationFormat {
129    fn default() -> Self {
130        Self::CapnProto {
131            should_cache_queries: true,
132        }
133    }
134}
135
136/// Config for hypersync event streaming.
137#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
138pub struct StreamConfig {
139    /// Column mapping for stream function output.
140    /// It lets you map columns you want into the DataTypes you want.
141    pub column_mapping: Option<ColumnMapping>,
142    /// Event signature used to populate decode logs. Decode logs would be empty if set to None.
143    pub event_signature: Option<String>,
144    /// Determines formatting of binary columns numbers into utf8 hex.
145    #[serde(default)]
146    pub hex_output: HexOutput,
147    /// Initial, deliberately-overestimated batch size, used for the first wave of
148    /// requests and as a fallback before any response density has been measured.
149    #[serde(default = "StreamConfig::default_batch_size")]
150    pub batch_size: u64,
151    /// Optional hard upper cap on the number of blocks fetched in a single
152    /// request. `None` (the default) means no cap: an over-large request that the
153    /// server truncates simply leaves a gap that is backfilled, so overshoot is
154    /// self-correcting. Set it to bound blocks-per-chunk explicitly.
155    #[serde(default)]
156    pub max_batch_size: Option<u64>,
157    /// Hard lower clamp on the projected block count, to avoid tiny ranges.
158    #[serde(default = "StreamConfig::default_min_batch_size")]
159    pub min_batch_size: u64,
160    /// Number of async threads that would be spawned to execute different block ranges of queries.
161    /// `0` is an error, `1` streams sequentially, `>= 2` uses the projecting scheduler.
162    #[serde(default = "StreamConfig::default_concurrency")]
163    pub concurrency: usize,
164    /// Max number of blocks to fetch in a single request.
165    #[serde(default)]
166    pub max_num_blocks: Option<usize>,
167    /// Max number of transactions to fetch in a single request.
168    #[serde(default)]
169    pub max_num_transactions: Option<usize>,
170    /// Max number of logs to fetch in a single request.
171    #[serde(default)]
172    pub max_num_logs: Option<usize>,
173    /// Max number of traces to fetch in a single request.
174    #[serde(default)]
175    pub max_num_traces: Option<usize>,
176    /// Target response size in bytes. Each request's block span is projected from
177    /// the most recently observed byte-density to aim each response at this size.
178    #[serde(default = "StreamConfig::default_response_bytes_target")]
179    pub response_bytes_target: u64,
180    /// Optional cap on the bytes of fetched-but-undelivered chunks held in the
181    /// reorder buffer (consumer backpressure). `None` (the default) is
182    /// **adaptive**: it starts at `2 * concurrency * response_bytes_target` and
183    /// grows to `2 * concurrency * max(response_bytes_target, largest_response)`
184    /// so the pipeline stays full even for byte-heavy queries whose responses far
185    /// exceed the target (otherwise a single response could exceed the cap and
186    /// throttle look-ahead to near-sequential). Set an explicit value to bound
187    /// memory; an explicit cap is honoured verbatim and never grown. `Some(0)` is
188    /// valid and means "no look-ahead buffer": only the chunk delivery is
189    /// currently waiting on is fetched, so the stream runs effectively
190    /// sequentially with minimal memory (it still completes — the watermark chunk
191    /// is always allowed through).
192    #[serde(default)]
193    pub max_buffered_bytes: Option<u64>,
194    /// Stream data in reverse order
195    #[serde(default = "StreamConfig::default_reverse")]
196    pub reverse: bool,
197}
198
199/// Determines format of Binary column
200#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize, PartialEq, JsonSchema)]
201pub enum HexOutput {
202    /// Binary column won't be formatted as hex
203    #[default]
204    NoEncode,
205    /// Binary column would be formatted as prefixed hex i.e. 0xdeadbeef
206    Prefixed,
207    /// Binary column would be formatted as non prefixed hex i.e. deadbeef
208    NonPrefixed,
209}
210
211impl Default for StreamConfig {
212    fn default() -> Self {
213        Self {
214            column_mapping: None,
215            event_signature: None,
216            hex_output: HexOutput::default(),
217            batch_size: Self::default_batch_size(),
218            max_batch_size: None,
219            min_batch_size: Self::default_min_batch_size(),
220            concurrency: Self::default_concurrency(),
221            max_num_blocks: None,
222            max_num_transactions: None,
223            max_num_logs: None,
224            max_num_traces: None,
225            response_bytes_target: Self::default_response_bytes_target(),
226            max_buffered_bytes: None,
227            reverse: Self::default_reverse(),
228        }
229    }
230}
231
232impl StreamConfig {
233    /// Default concurrency for stream processing
234    pub const fn default_concurrency() -> usize {
235        10
236    }
237
238    /// Default initial batch size
239    pub const fn default_batch_size() -> u64 {
240        1000
241    }
242
243    /// Default minimum batch size
244    pub const fn default_min_batch_size() -> u64 {
245        200
246    }
247
248    /// Default target response size in bytes that projection aims each response at
249    pub const fn default_response_bytes_target() -> u64 {
250        400_000
251    }
252
253    /// Default reverse streaming setting
254    pub const fn default_reverse() -> bool {
255        false
256    }
257
258    /// Preset for **dense** workloads: queries that match a lot of data per block
259    /// (busy contracts, all-logs, popular ERC-20 transfers).
260    ///
261    /// Such streams are throughput-bound and scale well with parallelism, so this
262    /// raises `concurrency` above the default. The default `response_bytes_target`
263    /// (400 KB) is already a good fit — benchmarking showed dense responses
264    /// plateau near that size, and pushing the target higher mostly adds
265    /// truncation/backfill rather than bigger responses.
266    ///
267    /// `max_buffered_bytes` is left unset so the adaptive default applies. If you
268    /// have plenty of rate-limit headroom you can push `concurrency` higher still.
269    pub fn dense() -> Self {
270        Self {
271            concurrency: 20,
272            response_bytes_target: Self::default_response_bytes_target(),
273            ..Self::default()
274        }
275    }
276
277    /// Preset for **sparse** workloads: selective queries over wide block ranges
278    /// (rare events, low-volume contracts) where most blocks match nothing.
279    ///
280    /// Here latency, not bytes, dominates, and benchmarking showed that *high*
281    /// concurrency actually hurts: extra workers just fragment a large empty
282    /// region into more (smaller) requests. So this keeps concurrency moderate and
283    /// raises `batch_size` so the first wave covers a lot of ground before
284    /// per-request projection kicks in — an over-estimate that self-corrects via
285    /// backfill if it hits a dense patch.
286    pub fn sparse() -> Self {
287        Self {
288            concurrency: Self::default_concurrency(),
289            batch_size: 20_000,
290            ..Self::default()
291        }
292    }
293
294    /// Preset for **archival / byte-heavy** workloads: full block + transaction
295    /// pulls (e.g. `include_all_blocks` with wide field selection) where each
296    /// response is many megabytes.
297    ///
298    /// These streams are bounded by the reorder buffer, not concurrency: a single
299    /// response can dwarf `response_bytes_target`, so the adaptive
300    /// `max_buffered_bytes` default (left unset here) is what keeps the pipeline
301    /// full — in benchmarks it roughly doubled throughput versus a buffer sized to
302    /// the target. Concurrency past ~10–15 gives little extra here.
303    pub fn archival() -> Self {
304        Self {
305            concurrency: 12,
306            ..Self::default()
307        }
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314
315    #[test]
316    fn test_validate() {
317        let valid_cfg = ClientConfig {
318            url: "https://hypersync.xyz".into(),
319            api_token: "00000000-0000-0000-0000-000000000000".to_string(),
320            ..Default::default()
321        };
322
323        assert!(valid_cfg.validate().is_ok(), "valid config");
324
325        let cfg = ClientConfig {
326            url: "https://hypersync.xyz".to_string(),
327            api_token: "not a uuid".to_string(),
328            ..Default::default()
329        };
330
331        assert!(cfg.validate().is_err(), "invalid uuid");
332
333        let cfg = ClientConfig {
334            url: "https://hypersync.xyz".to_string(),
335            ..Default::default()
336        };
337
338        assert!(cfg.validate().is_err(), "missing api token");
339
340        let cfg = ClientConfig {
341            api_token: "00000000-0000-0000-0000-000000000000".to_string(),
342            ..Default::default()
343        };
344
345        assert!(cfg.validate().is_err(), "missing url");
346        let cfg = ClientConfig {
347            http_req_timeout_millis: 0,
348            ..valid_cfg
349        };
350        assert!(
351            cfg.validate().is_err(),
352            "http_req_timeout_millis must be greater than 0"
353        );
354    }
355
356    #[test]
357    fn test_stream_config_defaults() {
358        let default_config = StreamConfig::default();
359
360        // Check that all defaults are applied correctly
361        assert_eq!(default_config.concurrency, 10);
362        assert_eq!(default_config.batch_size, 1000);
363        assert_eq!(default_config.max_batch_size, None);
364        assert_eq!(default_config.min_batch_size, 200);
365        assert_eq!(default_config.response_bytes_target, 400_000);
366        assert_eq!(default_config.max_buffered_bytes, None);
367        assert!(!default_config.reverse);
368        assert_eq!(default_config.hex_output, HexOutput::NoEncode);
369        assert!(default_config.column_mapping.is_none());
370        assert!(default_config.event_signature.is_none());
371        assert!(default_config.max_num_blocks.is_none());
372        assert!(default_config.max_num_transactions.is_none());
373        assert!(default_config.max_num_logs.is_none());
374        assert!(default_config.max_num_traces.is_none());
375    }
376
377    #[test]
378    fn test_stream_config_serde() {
379        // Test serialization of default config
380        let default_config = StreamConfig::default();
381        let json = serde_json::to_string(&default_config).unwrap();
382        let deserialized: StreamConfig = serde_json::from_str(&json).unwrap();
383
384        // Verify round-trip works
385        assert_eq!(deserialized.concurrency, default_config.concurrency);
386        assert_eq!(deserialized.batch_size, default_config.batch_size);
387        assert_eq!(deserialized.reverse, default_config.reverse);
388
389        // Test partial JSON (missing some fields should use defaults)
390        let partial_json = r#"{"reverse": true, "batch_size": 500}"#;
391        let partial_config: StreamConfig = serde_json::from_str(partial_json).unwrap();
392
393        assert!(partial_config.reverse);
394        assert_eq!(partial_config.batch_size, 500);
395        assert_eq!(partial_config.concurrency, 10); // should use default
396        assert_eq!(partial_config.max_batch_size, None); // should use default
397        assert_eq!(partial_config.response_bytes_target, 400_000); // should use default
398        assert_eq!(partial_config.max_buffered_bytes, None); // should use default
399
400        // Explicitly setting the new optional caps round-trips.
401        let explicit_json = r#"{"max_batch_size": 50000, "response_bytes_target": 800000, "max_buffered_bytes": 1048576}"#;
402        let explicit_config: StreamConfig = serde_json::from_str(explicit_json).unwrap();
403        assert_eq!(explicit_config.max_batch_size, Some(50_000));
404        assert_eq!(explicit_config.response_bytes_target, 800_000);
405        assert_eq!(explicit_config.max_buffered_bytes, Some(1_048_576));
406    }
407
408    #[test]
409    fn test_stream_config_presets() {
410        // Dense: more parallelism, default target, adaptive buffer.
411        let dense = StreamConfig::dense();
412        assert_eq!(dense.concurrency, 20);
413        assert_eq!(dense.response_bytes_target, 400_000);
414        assert_eq!(dense.max_buffered_bytes, None);
415
416        // Sparse: moderate concurrency, big first wave.
417        let sparse = StreamConfig::sparse();
418        assert_eq!(sparse.concurrency, 10);
419        assert_eq!(sparse.batch_size, 20_000);
420        assert_eq!(sparse.max_buffered_bytes, None);
421
422        // Archival: modest concurrency, relies on adaptive buffer.
423        let archival = StreamConfig::archival();
424        assert_eq!(archival.concurrency, 12);
425        assert_eq!(archival.max_buffered_bytes, None);
426
427        // Presets keep the rest of the defaults.
428        assert_eq!(dense.min_batch_size, StreamConfig::default_min_batch_size());
429        assert!(!sparse.reverse);
430    }
431}