Skip to main content

hypersync_client/
config.rs

1use anyhow::Result;
2use schemars::JsonSchema;
3use serde::{Deserialize, Serialize};
4use url::Url;
5
6use crate::ColumnMapping;
7
8/// Configuration for the hypersync client.
9#[derive(Debug, Clone, Deserialize, Serialize)]
10pub struct ClientConfig {
11    /// HyperSync server URL.
12    #[serde(default)]
13    pub url: String,
14    /// HyperSync server api token.
15    #[serde(default)]
16    pub api_token: String,
17    /// Milliseconds to wait for a response before timing out.
18    #[serde(default = "ClientConfig::default_http_req_timeout_millis")]
19    pub http_req_timeout_millis: u64,
20    /// Number of retries to attempt before returning error.
21    #[serde(default = "ClientConfig::default_max_num_retries")]
22    pub max_num_retries: usize,
23    /// Milliseconds that would be used for retry backoff increasing.
24    #[serde(default = "ClientConfig::default_retry_backoff_ms")]
25    pub retry_backoff_ms: u64,
26    /// Initial wait time for request backoff.
27    #[serde(default = "ClientConfig::default_retry_base_ms")]
28    pub retry_base_ms: u64,
29    /// Ceiling time for request backoff.
30    #[serde(default = "ClientConfig::default_retry_ceiling_ms")]
31    pub retry_ceiling_ms: u64,
32    /// Query serialization format to use for HTTP requests.
33    #[serde(default)]
34    pub serialization_format: SerializationFormat,
35}
36
37impl Default for ClientConfig {
38    fn default() -> Self {
39        Self {
40            url: String::default(),
41            api_token: String::default(),
42            http_req_timeout_millis: Self::default_http_req_timeout_millis(),
43            max_num_retries: Self::default_max_num_retries(),
44            retry_backoff_ms: Self::default_retry_backoff_ms(),
45            retry_base_ms: Self::default_retry_base_ms(),
46            retry_ceiling_ms: Self::default_retry_ceiling_ms(),
47            serialization_format: SerializationFormat::Json,
48        }
49    }
50}
51
52impl ClientConfig {
53    /// Default HTTP request timeout in milliseconds
54    pub const fn default_http_req_timeout_millis() -> u64 {
55        30_000
56    }
57
58    /// Default maximum number of retries
59    pub const fn default_max_num_retries() -> usize {
60        12
61    }
62
63    /// Default retry backoff in milliseconds
64    pub const fn default_retry_backoff_ms() -> u64 {
65        500
66    }
67
68    /// Default retry base time in milliseconds
69    pub const fn default_retry_base_ms() -> u64 {
70        200
71    }
72
73    /// Default retry ceiling time in milliseconds
74    pub const fn default_retry_ceiling_ms() -> u64 {
75        5_000
76    }
77    /// Validates the config
78    pub fn validate(&self) -> Result<()> {
79        if self.url.is_empty() {
80            anyhow::bail!("url is required");
81        }
82
83        // validate that url is a valid url
84        if Url::parse(&self.url).is_err() {
85            anyhow::bail!("url is malformed");
86        }
87
88        if self.api_token.is_empty() {
89            anyhow::bail!("api_token is required - get one from https://envio.dev/app/api-tokens");
90        }
91        // validate that api token is a uuid
92        if uuid::Uuid::parse_str(self.api_token.as_str()).is_err() {
93            anyhow::bail!("api_token is malformed - make sure its a token from https://envio.dev/app/api-tokens");
94        }
95
96        if self.http_req_timeout_millis == 0 {
97            anyhow::bail!("http_req_timeout_millis must be greater than 0");
98        }
99
100        Ok(())
101    }
102}
103
104/// Determines query serialization format for HTTP requests.
105#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize)]
106pub enum SerializationFormat {
107    /// Use JSON serialization (default)
108    #[default]
109    Json,
110    /// Use Cap'n Proto binary serialization
111    CapnProto {
112        /// Whether to use query caching
113        should_cache_queries: bool,
114    },
115}
116
117/// Config for hypersync event streaming.
118#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
119pub struct StreamConfig {
120    /// Column mapping for stream function output.
121    /// It lets you map columns you want into the DataTypes you want.
122    pub column_mapping: Option<ColumnMapping>,
123    /// Event signature used to populate decode logs. Decode logs would be empty if set to None.
124    pub event_signature: Option<String>,
125    /// Determines formatting of binary columns numbers into utf8 hex.
126    #[serde(default)]
127    pub hex_output: HexOutput,
128    /// Initial batch size. Size would be adjusted based on response size during execution.
129    #[serde(default = "StreamConfig::default_batch_size")]
130    pub batch_size: u64,
131    /// Maximum batch size that could be used during dynamic adjustment.
132    #[serde(default = "StreamConfig::default_max_batch_size")]
133    pub max_batch_size: u64,
134    /// Minimum batch size that could be used during dynamic adjustment.
135    #[serde(default = "StreamConfig::default_min_batch_size")]
136    pub min_batch_size: u64,
137    /// Number of async threads that would be spawned to execute different block ranges of queries.
138    #[serde(default = "StreamConfig::default_concurrency")]
139    pub concurrency: usize,
140    /// Max number of blocks to fetch in a single request.
141    #[serde(default)]
142    pub max_num_blocks: Option<usize>,
143    /// Max number of transactions to fetch in a single request.
144    #[serde(default)]
145    pub max_num_transactions: Option<usize>,
146    /// Max number of logs to fetch in a single request.
147    #[serde(default)]
148    pub max_num_logs: Option<usize>,
149    /// Max number of traces to fetch in a single request.
150    #[serde(default)]
151    pub max_num_traces: Option<usize>,
152    /// Size of a response in bytes from which step size will be lowered
153    #[serde(default = "StreamConfig::default_response_bytes_ceiling")]
154    pub response_bytes_ceiling: u64,
155    /// Size of a response in bytes from which step size will be increased
156    #[serde(default = "StreamConfig::default_response_bytes_floor")]
157    pub response_bytes_floor: u64,
158    /// Stream data in reverse order
159    #[serde(default = "StreamConfig::default_reverse")]
160    pub reverse: bool,
161}
162
163/// Determines format of Binary column
164#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize, PartialEq, JsonSchema)]
165pub enum HexOutput {
166    /// Binary column won't be formatted as hex
167    #[default]
168    NoEncode,
169    /// Binary column would be formatted as prefixed hex i.e. 0xdeadbeef
170    Prefixed,
171    /// Binary column would be formatted as non prefixed hex i.e. deadbeef
172    NonPrefixed,
173}
174
175impl Default for StreamConfig {
176    fn default() -> Self {
177        Self {
178            column_mapping: None,
179            event_signature: None,
180            hex_output: HexOutput::default(),
181            batch_size: Self::default_batch_size(),
182            max_batch_size: Self::default_max_batch_size(),
183            min_batch_size: Self::default_min_batch_size(),
184            concurrency: Self::default_concurrency(),
185            max_num_blocks: None,
186            max_num_transactions: None,
187            max_num_logs: None,
188            max_num_traces: None,
189            response_bytes_ceiling: Self::default_response_bytes_ceiling(),
190            response_bytes_floor: Self::default_response_bytes_floor(),
191            reverse: Self::default_reverse(),
192        }
193    }
194}
195
196impl StreamConfig {
197    /// Default concurrency for stream processing
198    pub const fn default_concurrency() -> usize {
199        10
200    }
201
202    /// Default initial batch size
203    pub const fn default_batch_size() -> u64 {
204        1000
205    }
206
207    /// Default maximum batch size
208    pub const fn default_max_batch_size() -> u64 {
209        200_000
210    }
211
212    /// Default minimum batch size
213    pub const fn default_min_batch_size() -> u64 {
214        200
215    }
216
217    /// Default response bytes ceiling for dynamic batch adjustment
218    pub const fn default_response_bytes_ceiling() -> u64 {
219        500_000
220    }
221
222    /// Default response bytes floor for dynamic batch adjustment
223    pub const fn default_response_bytes_floor() -> u64 {
224        250_000
225    }
226
227    /// Default reverse streaming setting
228    pub const fn default_reverse() -> bool {
229        false
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn test_validate() {
239        let valid_cfg = ClientConfig {
240            url: "https://hypersync.xyz".into(),
241            api_token: "00000000-0000-0000-0000-000000000000".to_string(),
242            ..Default::default()
243        };
244
245        assert!(valid_cfg.validate().is_ok(), "valid config");
246
247        let cfg = ClientConfig {
248            url: "https://hypersync.xyz".to_string(),
249            api_token: "not a uuid".to_string(),
250            ..Default::default()
251        };
252
253        assert!(cfg.validate().is_err(), "invalid uuid");
254
255        let cfg = ClientConfig {
256            url: "https://hypersync.xyz".to_string(),
257            ..Default::default()
258        };
259
260        assert!(cfg.validate().is_err(), "missing api token");
261
262        let cfg = ClientConfig {
263            api_token: "00000000-0000-0000-0000-000000000000".to_string(),
264            ..Default::default()
265        };
266
267        assert!(cfg.validate().is_err(), "missing url");
268        let cfg = ClientConfig {
269            http_req_timeout_millis: 0,
270            ..valid_cfg
271        };
272        assert!(
273            cfg.validate().is_err(),
274            "http_req_timeout_millis must be greater than 0"
275        );
276    }
277
278    #[test]
279    fn test_stream_config_defaults() {
280        let default_config = StreamConfig::default();
281
282        // Check that all defaults are applied correctly
283        assert_eq!(default_config.concurrency, 10);
284        assert_eq!(default_config.batch_size, 1000);
285        assert_eq!(default_config.max_batch_size, 200_000);
286        assert_eq!(default_config.min_batch_size, 200);
287        assert_eq!(default_config.response_bytes_ceiling, 500_000);
288        assert_eq!(default_config.response_bytes_floor, 250_000);
289        assert!(!default_config.reverse);
290        assert_eq!(default_config.hex_output, HexOutput::NoEncode);
291        assert!(default_config.column_mapping.is_none());
292        assert!(default_config.event_signature.is_none());
293        assert!(default_config.max_num_blocks.is_none());
294        assert!(default_config.max_num_transactions.is_none());
295        assert!(default_config.max_num_logs.is_none());
296        assert!(default_config.max_num_traces.is_none());
297    }
298
299    #[test]
300    fn test_stream_config_serde() {
301        // Test serialization of default config
302        let default_config = StreamConfig::default();
303        let json = serde_json::to_string(&default_config).unwrap();
304        let deserialized: StreamConfig = serde_json::from_str(&json).unwrap();
305
306        // Verify round-trip works
307        assert_eq!(deserialized.concurrency, default_config.concurrency);
308        assert_eq!(deserialized.batch_size, default_config.batch_size);
309        assert_eq!(deserialized.reverse, default_config.reverse);
310
311        // Test partial JSON (missing some fields should use defaults)
312        let partial_json = r#"{"reverse": true, "batch_size": 500}"#;
313        let partial_config: StreamConfig = serde_json::from_str(partial_json).unwrap();
314
315        assert!(partial_config.reverse);
316        assert_eq!(partial_config.batch_size, 500);
317        assert_eq!(partial_config.concurrency, 10); // should use default
318        assert_eq!(partial_config.max_batch_size, 200_000); // should use default
319    }
320}