Skip to main content

hypersync_client/
config.rs

1use anyhow::Result;
2use schemars::JsonSchema;
3use serde::{Deserialize, Serialize};
4use url::Url;
5
6use crate::ColumnMapping;
7
8/// Configuration for the hypersync client.
9#[derive(Debug, Clone, Deserialize, Serialize)]
10pub struct ClientConfig {
11    /// HyperSync server URL.
12    #[serde(default)]
13    pub url: String,
14    /// HyperSync server api token.
15    #[serde(default)]
16    pub api_token: String,
17    /// Milliseconds to wait for a response before timing out.
18    #[serde(default = "ClientConfig::default_http_req_timeout_millis")]
19    pub http_req_timeout_millis: u64,
20    /// Number of retries to attempt before returning error.
21    #[serde(default = "ClientConfig::default_max_num_retries")]
22    pub max_num_retries: usize,
23    /// Milliseconds that would be used for retry backoff increasing.
24    #[serde(default = "ClientConfig::default_retry_backoff_ms")]
25    pub retry_backoff_ms: u64,
26    /// Initial wait time for request backoff.
27    #[serde(default = "ClientConfig::default_retry_base_ms")]
28    pub retry_base_ms: u64,
29    /// Ceiling time for request backoff.
30    #[serde(default = "ClientConfig::default_retry_ceiling_ms")]
31    pub retry_ceiling_ms: u64,
32    /// Query serialization format to use for HTTP requests.
33    #[serde(default)]
34    pub serialization_format: SerializationFormat,
35    /// Whether to proactively sleep when the rate limit is exhausted instead of
36    /// sending requests that will be rejected with 429.
37    ///
38    /// Enabled by default. Set to `false` to opt out and handle rate limits yourself.
39    #[serde(default = "ClientConfig::default_proactive_rate_limit_sleep")]
40    pub proactive_rate_limit_sleep: bool,
41}
42
43impl Default for ClientConfig {
44    fn default() -> Self {
45        Self {
46            url: String::default(),
47            api_token: String::default(),
48            http_req_timeout_millis: Self::default_http_req_timeout_millis(),
49            max_num_retries: Self::default_max_num_retries(),
50            retry_backoff_ms: Self::default_retry_backoff_ms(),
51            retry_base_ms: Self::default_retry_base_ms(),
52            retry_ceiling_ms: Self::default_retry_ceiling_ms(),
53            serialization_format: SerializationFormat::Json,
54            proactive_rate_limit_sleep: Self::default_proactive_rate_limit_sleep(),
55        }
56    }
57}
58
59impl ClientConfig {
60    /// Default HTTP request timeout in milliseconds
61    pub const fn default_http_req_timeout_millis() -> u64 {
62        30_000
63    }
64
65    /// Default maximum number of retries
66    pub const fn default_max_num_retries() -> usize {
67        12
68    }
69
70    /// Default retry backoff in milliseconds
71    pub const fn default_retry_backoff_ms() -> u64 {
72        500
73    }
74
75    /// Default retry base time in milliseconds
76    pub const fn default_retry_base_ms() -> u64 {
77        200
78    }
79
80    /// Default retry ceiling time in milliseconds
81    pub const fn default_retry_ceiling_ms() -> u64 {
82        5_000
83    }
84
85    /// Default proactive rate limit sleep setting
86    pub const fn default_proactive_rate_limit_sleep() -> bool {
87        true
88    }
89    /// Validates the config
90    pub fn validate(&self) -> Result<()> {
91        if self.url.is_empty() {
92            anyhow::bail!("url is required");
93        }
94
95        // validate that url is a valid url
96        if Url::parse(&self.url).is_err() {
97            anyhow::bail!("url is malformed");
98        }
99
100        if self.api_token.is_empty() {
101            anyhow::bail!("api_token is required - get one from https://envio.dev/app/api-tokens");
102        }
103        // validate that api token is a uuid
104        if uuid::Uuid::parse_str(self.api_token.as_str()).is_err() {
105            anyhow::bail!("api_token is malformed - make sure its a token from https://envio.dev/app/api-tokens");
106        }
107
108        if self.http_req_timeout_millis == 0 {
109            anyhow::bail!("http_req_timeout_millis must be greater than 0");
110        }
111
112        Ok(())
113    }
114}
115
116/// Determines query serialization format for HTTP requests.
117#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize)]
118pub enum SerializationFormat {
119    /// Use JSON serialization (default)
120    #[default]
121    Json,
122    /// Use Cap'n Proto binary serialization
123    CapnProto {
124        /// Whether to use query caching
125        should_cache_queries: bool,
126    },
127}
128
129/// Config for hypersync event streaming.
130#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
131pub struct StreamConfig {
132    /// Column mapping for stream function output.
133    /// It lets you map columns you want into the DataTypes you want.
134    pub column_mapping: Option<ColumnMapping>,
135    /// Event signature used to populate decode logs. Decode logs would be empty if set to None.
136    pub event_signature: Option<String>,
137    /// Determines formatting of binary columns numbers into utf8 hex.
138    #[serde(default)]
139    pub hex_output: HexOutput,
140    /// Initial batch size. Size would be adjusted based on response size during execution.
141    #[serde(default = "StreamConfig::default_batch_size")]
142    pub batch_size: u64,
143    /// Maximum batch size that could be used during dynamic adjustment.
144    #[serde(default = "StreamConfig::default_max_batch_size")]
145    pub max_batch_size: u64,
146    /// Minimum batch size that could be used during dynamic adjustment.
147    #[serde(default = "StreamConfig::default_min_batch_size")]
148    pub min_batch_size: u64,
149    /// Number of async threads that would be spawned to execute different block ranges of queries.
150    #[serde(default = "StreamConfig::default_concurrency")]
151    pub concurrency: usize,
152    /// Max number of blocks to fetch in a single request.
153    #[serde(default)]
154    pub max_num_blocks: Option<usize>,
155    /// Max number of transactions to fetch in a single request.
156    #[serde(default)]
157    pub max_num_transactions: Option<usize>,
158    /// Max number of logs to fetch in a single request.
159    #[serde(default)]
160    pub max_num_logs: Option<usize>,
161    /// Max number of traces to fetch in a single request.
162    #[serde(default)]
163    pub max_num_traces: Option<usize>,
164    /// Size of a response in bytes from which step size will be lowered
165    #[serde(default = "StreamConfig::default_response_bytes_ceiling")]
166    pub response_bytes_ceiling: u64,
167    /// Size of a response in bytes from which step size will be increased
168    #[serde(default = "StreamConfig::default_response_bytes_floor")]
169    pub response_bytes_floor: u64,
170    /// Stream data in reverse order
171    #[serde(default = "StreamConfig::default_reverse")]
172    pub reverse: bool,
173}
174
175/// Determines format of Binary column
176#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize, PartialEq, JsonSchema)]
177pub enum HexOutput {
178    /// Binary column won't be formatted as hex
179    #[default]
180    NoEncode,
181    /// Binary column would be formatted as prefixed hex i.e. 0xdeadbeef
182    Prefixed,
183    /// Binary column would be formatted as non prefixed hex i.e. deadbeef
184    NonPrefixed,
185}
186
187impl Default for StreamConfig {
188    fn default() -> Self {
189        Self {
190            column_mapping: None,
191            event_signature: None,
192            hex_output: HexOutput::default(),
193            batch_size: Self::default_batch_size(),
194            max_batch_size: Self::default_max_batch_size(),
195            min_batch_size: Self::default_min_batch_size(),
196            concurrency: Self::default_concurrency(),
197            max_num_blocks: None,
198            max_num_transactions: None,
199            max_num_logs: None,
200            max_num_traces: None,
201            response_bytes_ceiling: Self::default_response_bytes_ceiling(),
202            response_bytes_floor: Self::default_response_bytes_floor(),
203            reverse: Self::default_reverse(),
204        }
205    }
206}
207
208impl StreamConfig {
209    /// Default concurrency for stream processing
210    pub const fn default_concurrency() -> usize {
211        10
212    }
213
214    /// Default initial batch size
215    pub const fn default_batch_size() -> u64 {
216        1000
217    }
218
219    /// Default maximum batch size
220    pub const fn default_max_batch_size() -> u64 {
221        200_000
222    }
223
224    /// Default minimum batch size
225    pub const fn default_min_batch_size() -> u64 {
226        200
227    }
228
229    /// Default response bytes ceiling for dynamic batch adjustment
230    pub const fn default_response_bytes_ceiling() -> u64 {
231        500_000
232    }
233
234    /// Default response bytes floor for dynamic batch adjustment
235    pub const fn default_response_bytes_floor() -> u64 {
236        250_000
237    }
238
239    /// Default reverse streaming setting
240    pub const fn default_reverse() -> bool {
241        false
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248
249    #[test]
250    fn test_validate() {
251        let valid_cfg = ClientConfig {
252            url: "https://hypersync.xyz".into(),
253            api_token: "00000000-0000-0000-0000-000000000000".to_string(),
254            ..Default::default()
255        };
256
257        assert!(valid_cfg.validate().is_ok(), "valid config");
258
259        let cfg = ClientConfig {
260            url: "https://hypersync.xyz".to_string(),
261            api_token: "not a uuid".to_string(),
262            ..Default::default()
263        };
264
265        assert!(cfg.validate().is_err(), "invalid uuid");
266
267        let cfg = ClientConfig {
268            url: "https://hypersync.xyz".to_string(),
269            ..Default::default()
270        };
271
272        assert!(cfg.validate().is_err(), "missing api token");
273
274        let cfg = ClientConfig {
275            api_token: "00000000-0000-0000-0000-000000000000".to_string(),
276            ..Default::default()
277        };
278
279        assert!(cfg.validate().is_err(), "missing url");
280        let cfg = ClientConfig {
281            http_req_timeout_millis: 0,
282            ..valid_cfg
283        };
284        assert!(
285            cfg.validate().is_err(),
286            "http_req_timeout_millis must be greater than 0"
287        );
288    }
289
290    #[test]
291    fn test_stream_config_defaults() {
292        let default_config = StreamConfig::default();
293
294        // Check that all defaults are applied correctly
295        assert_eq!(default_config.concurrency, 10);
296        assert_eq!(default_config.batch_size, 1000);
297        assert_eq!(default_config.max_batch_size, 200_000);
298        assert_eq!(default_config.min_batch_size, 200);
299        assert_eq!(default_config.response_bytes_ceiling, 500_000);
300        assert_eq!(default_config.response_bytes_floor, 250_000);
301        assert!(!default_config.reverse);
302        assert_eq!(default_config.hex_output, HexOutput::NoEncode);
303        assert!(default_config.column_mapping.is_none());
304        assert!(default_config.event_signature.is_none());
305        assert!(default_config.max_num_blocks.is_none());
306        assert!(default_config.max_num_transactions.is_none());
307        assert!(default_config.max_num_logs.is_none());
308        assert!(default_config.max_num_traces.is_none());
309    }
310
311    #[test]
312    fn test_stream_config_serde() {
313        // Test serialization of default config
314        let default_config = StreamConfig::default();
315        let json = serde_json::to_string(&default_config).unwrap();
316        let deserialized: StreamConfig = serde_json::from_str(&json).unwrap();
317
318        // Verify round-trip works
319        assert_eq!(deserialized.concurrency, default_config.concurrency);
320        assert_eq!(deserialized.batch_size, default_config.batch_size);
321        assert_eq!(deserialized.reverse, default_config.reverse);
322
323        // Test partial JSON (missing some fields should use defaults)
324        let partial_json = r#"{"reverse": true, "batch_size": 500}"#;
325        let partial_config: StreamConfig = serde_json::from_str(partial_json).unwrap();
326
327        assert!(partial_config.reverse);
328        assert_eq!(partial_config.batch_size, 500);
329        assert_eq!(partial_config.concurrency, 10); // should use default
330        assert_eq!(partial_config.max_batch_size, 200_000); // should use default
331    }
332}