hypersync_client/
config.rs

1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use url::Url;
4
5use crate::ColumnMapping;
6
7/// Configuration for the hypersync client.
8#[derive(Debug, Clone, Deserialize, Serialize)]
9pub struct ClientConfig {
10    /// HyperSync server URL.
11    #[serde(default)]
12    pub url: String,
13    /// HyperSync server api token.
14    #[serde(default)]
15    pub api_token: String,
16    /// Milliseconds to wait for a response before timing out.
17    #[serde(default = "ClientConfig::default_http_req_timeout_millis")]
18    pub http_req_timeout_millis: u64,
19    /// Number of retries to attempt before returning error.
20    #[serde(default = "ClientConfig::default_max_num_retries")]
21    pub max_num_retries: usize,
22    /// Milliseconds that would be used for retry backoff increasing.
23    #[serde(default = "ClientConfig::default_retry_backoff_ms")]
24    pub retry_backoff_ms: u64,
25    /// Initial wait time for request backoff.
26    #[serde(default = "ClientConfig::default_retry_base_ms")]
27    pub retry_base_ms: u64,
28    /// Ceiling time for request backoff.
29    #[serde(default = "ClientConfig::default_retry_ceiling_ms")]
30    pub retry_ceiling_ms: u64,
31    /// Query serialization format to use for HTTP requests.
32    #[serde(default)]
33    pub serialization_format: SerializationFormat,
34}
35
36impl Default for ClientConfig {
37    fn default() -> Self {
38        Self {
39            url: String::default(),
40            api_token: String::default(),
41            http_req_timeout_millis: Self::default_http_req_timeout_millis(),
42            max_num_retries: Self::default_max_num_retries(),
43            retry_backoff_ms: Self::default_retry_backoff_ms(),
44            retry_base_ms: Self::default_retry_base_ms(),
45            retry_ceiling_ms: Self::default_retry_ceiling_ms(),
46            serialization_format: SerializationFormat::Json,
47        }
48    }
49}
50
51impl ClientConfig {
52    /// Default HTTP request timeout in milliseconds
53    pub const fn default_http_req_timeout_millis() -> u64 {
54        30_000
55    }
56
57    /// Default maximum number of retries
58    pub const fn default_max_num_retries() -> usize {
59        12
60    }
61
62    /// Default retry backoff in milliseconds
63    pub const fn default_retry_backoff_ms() -> u64 {
64        500
65    }
66
67    /// Default retry base time in milliseconds
68    pub const fn default_retry_base_ms() -> u64 {
69        200
70    }
71
72    /// Default retry ceiling time in milliseconds
73    pub const fn default_retry_ceiling_ms() -> u64 {
74        5_000
75    }
76    /// Validates the config
77    pub fn validate(&self) -> Result<()> {
78        if self.url.is_empty() {
79            anyhow::bail!("url is required");
80        }
81
82        // validate that url is a valid url
83        if Url::parse(&self.url).is_err() {
84            anyhow::bail!("url is malformed");
85        }
86
87        if self.api_token.is_empty() {
88            anyhow::bail!("api_token is required - get one from https://envio.dev/app/api-tokens");
89        }
90        // validate that api token is a uuid
91        if uuid::Uuid::parse_str(self.api_token.as_str()).is_err() {
92            anyhow::bail!("api_token is malformed - make sure its a token from https://envio.dev/app/api-tokens");
93        }
94
95        if self.http_req_timeout_millis == 0 {
96            anyhow::bail!("http_req_timeout_millis must be greater than 0");
97        }
98
99        Ok(())
100    }
101}
102
103/// Determines query serialization format for HTTP requests.
104#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize)]
105pub enum SerializationFormat {
106    /// Use JSON serialization (default)
107    #[default]
108    Json,
109    /// Use Cap'n Proto binary serialization
110    CapnProto {
111        /// Whether to use query caching
112        should_cache_queries: bool,
113    },
114}
115
116/// Config for hypersync event streaming.
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct StreamConfig {
119    /// Column mapping for stream function output.
120    /// It lets you map columns you want into the DataTypes you want.
121    pub column_mapping: Option<ColumnMapping>,
122    /// Event signature used to populate decode logs. Decode logs would be empty if set to None.
123    pub event_signature: Option<String>,
124    /// Determines formatting of binary columns numbers into utf8 hex.
125    #[serde(default)]
126    pub hex_output: HexOutput,
127    /// Initial batch size. Size would be adjusted based on response size during execution.
128    #[serde(default = "StreamConfig::default_batch_size")]
129    pub batch_size: u64,
130    /// Maximum batch size that could be used during dynamic adjustment.
131    #[serde(default = "StreamConfig::default_max_batch_size")]
132    pub max_batch_size: u64,
133    /// Minimum batch size that could be used during dynamic adjustment.
134    #[serde(default = "StreamConfig::default_min_batch_size")]
135    pub min_batch_size: u64,
136    /// Number of async threads that would be spawned to execute different block ranges of queries.
137    #[serde(default = "StreamConfig::default_concurrency")]
138    pub concurrency: usize,
139    /// Max number of blocks to fetch in a single request.
140    #[serde(default)]
141    pub max_num_blocks: Option<usize>,
142    /// Max number of transactions to fetch in a single request.
143    #[serde(default)]
144    pub max_num_transactions: Option<usize>,
145    /// Max number of logs to fetch in a single request.
146    #[serde(default)]
147    pub max_num_logs: Option<usize>,
148    /// Max number of traces to fetch in a single request.
149    #[serde(default)]
150    pub max_num_traces: Option<usize>,
151    /// Size of a response in bytes from which step size will be lowered
152    #[serde(default = "StreamConfig::default_response_bytes_ceiling")]
153    pub response_bytes_ceiling: u64,
154    /// Size of a response in bytes from which step size will be increased
155    #[serde(default = "StreamConfig::default_response_bytes_floor")]
156    pub response_bytes_floor: u64,
157    /// Stream data in reverse order
158    #[serde(default = "StreamConfig::default_reverse")]
159    pub reverse: bool,
160}
161
162/// Determines format of Binary column
163#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
164pub enum HexOutput {
165    /// Binary column won't be formatted as hex
166    #[default]
167    NoEncode,
168    /// Binary column would be formatted as prefixed hex i.e. 0xdeadbeef
169    Prefixed,
170    /// Binary column would be formatted as non prefixed hex i.e. deadbeef
171    NonPrefixed,
172}
173
174impl Default for StreamConfig {
175    fn default() -> Self {
176        Self {
177            column_mapping: None,
178            event_signature: None,
179            hex_output: HexOutput::default(),
180            batch_size: Self::default_batch_size(),
181            max_batch_size: Self::default_max_batch_size(),
182            min_batch_size: Self::default_min_batch_size(),
183            concurrency: Self::default_concurrency(),
184            max_num_blocks: None,
185            max_num_transactions: None,
186            max_num_logs: None,
187            max_num_traces: None,
188            response_bytes_ceiling: Self::default_response_bytes_ceiling(),
189            response_bytes_floor: Self::default_response_bytes_floor(),
190            reverse: Self::default_reverse(),
191        }
192    }
193}
194
195impl StreamConfig {
196    /// Default concurrency for stream processing
197    pub const fn default_concurrency() -> usize {
198        10
199    }
200
201    /// Default initial batch size
202    pub const fn default_batch_size() -> u64 {
203        1000
204    }
205
206    /// Default maximum batch size
207    pub const fn default_max_batch_size() -> u64 {
208        200_000
209    }
210
211    /// Default minimum batch size
212    pub const fn default_min_batch_size() -> u64 {
213        200
214    }
215
216    /// Default response bytes ceiling for dynamic batch adjustment
217    pub const fn default_response_bytes_ceiling() -> u64 {
218        500_000
219    }
220
221    /// Default response bytes floor for dynamic batch adjustment
222    pub const fn default_response_bytes_floor() -> u64 {
223        250_000
224    }
225
226    /// Default reverse streaming setting
227    pub const fn default_reverse() -> bool {
228        false
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn test_validate() {
238        let valid_cfg = ClientConfig {
239            url: "https://hypersync.xyz".into(),
240            api_token: "00000000-0000-0000-0000-000000000000".to_string(),
241            ..Default::default()
242        };
243
244        assert!(valid_cfg.validate().is_ok(), "valid config");
245
246        let cfg = ClientConfig {
247            url: "https://hypersync.xyz".to_string(),
248            api_token: "not a uuid".to_string(),
249            ..Default::default()
250        };
251
252        assert!(cfg.validate().is_err(), "invalid uuid");
253
254        let cfg = ClientConfig {
255            url: "https://hypersync.xyz".to_string(),
256            ..Default::default()
257        };
258
259        assert!(cfg.validate().is_err(), "missing api token");
260
261        let cfg = ClientConfig {
262            api_token: "00000000-0000-0000-0000-000000000000".to_string(),
263            ..Default::default()
264        };
265
266        assert!(cfg.validate().is_err(), "missing url");
267        let cfg = ClientConfig {
268            http_req_timeout_millis: 0,
269            ..valid_cfg
270        };
271        assert!(
272            cfg.validate().is_err(),
273            "http_req_timeout_millis must be greater than 0"
274        );
275    }
276
277    #[test]
278    fn test_stream_config_defaults() {
279        let default_config = StreamConfig::default();
280
281        // Check that all defaults are applied correctly
282        assert_eq!(default_config.concurrency, 10);
283        assert_eq!(default_config.batch_size, 1000);
284        assert_eq!(default_config.max_batch_size, 200_000);
285        assert_eq!(default_config.min_batch_size, 200);
286        assert_eq!(default_config.response_bytes_ceiling, 500_000);
287        assert_eq!(default_config.response_bytes_floor, 250_000);
288        assert!(!default_config.reverse);
289        assert_eq!(default_config.hex_output, HexOutput::NoEncode);
290        assert!(default_config.column_mapping.is_none());
291        assert!(default_config.event_signature.is_none());
292        assert!(default_config.max_num_blocks.is_none());
293        assert!(default_config.max_num_transactions.is_none());
294        assert!(default_config.max_num_logs.is_none());
295        assert!(default_config.max_num_traces.is_none());
296    }
297
298    #[test]
299    fn test_stream_config_serde() {
300        // Test serialization of default config
301        let default_config = StreamConfig::default();
302        let json = serde_json::to_string(&default_config).unwrap();
303        let deserialized: StreamConfig = serde_json::from_str(&json).unwrap();
304
305        // Verify round-trip works
306        assert_eq!(deserialized.concurrency, default_config.concurrency);
307        assert_eq!(deserialized.batch_size, default_config.batch_size);
308        assert_eq!(deserialized.reverse, default_config.reverse);
309
310        // Test partial JSON (missing some fields should use defaults)
311        let partial_json = r#"{"reverse": true, "batch_size": 500}"#;
312        let partial_config: StreamConfig = serde_json::from_str(partial_json).unwrap();
313
314        assert!(partial_config.reverse);
315        assert_eq!(partial_config.batch_size, 500);
316        assert_eq!(partial_config.concurrency, 10); // should use default
317        assert_eq!(partial_config.max_batch_size, 200_000); // should use default
318    }
319}