1use anyhow::Result;
2use schemars::JsonSchema;
3use serde::{Deserialize, Serialize};
4use url::Url;
5
6use crate::ColumnMapping;
7
8#[derive(Debug, Clone, Deserialize, Serialize)]
10pub struct ClientConfig {
11 #[serde(default)]
13 pub url: String,
14 #[serde(default)]
16 pub api_token: String,
17 #[serde(default = "ClientConfig::default_http_req_timeout_millis")]
19 pub http_req_timeout_millis: u64,
20 #[serde(default = "ClientConfig::default_max_num_retries")]
22 pub max_num_retries: usize,
23 #[serde(default = "ClientConfig::default_retry_backoff_ms")]
25 pub retry_backoff_ms: u64,
26 #[serde(default = "ClientConfig::default_retry_base_ms")]
28 pub retry_base_ms: u64,
29 #[serde(default = "ClientConfig::default_retry_ceiling_ms")]
31 pub retry_ceiling_ms: u64,
32 #[serde(default)]
34 pub serialization_format: SerializationFormat,
35 #[serde(default = "ClientConfig::default_proactive_rate_limit_sleep")]
40 pub proactive_rate_limit_sleep: bool,
41}
42
43impl Default for ClientConfig {
44 fn default() -> Self {
45 Self {
46 url: String::default(),
47 api_token: String::default(),
48 http_req_timeout_millis: Self::default_http_req_timeout_millis(),
49 max_num_retries: Self::default_max_num_retries(),
50 retry_backoff_ms: Self::default_retry_backoff_ms(),
51 retry_base_ms: Self::default_retry_base_ms(),
52 retry_ceiling_ms: Self::default_retry_ceiling_ms(),
53 serialization_format: SerializationFormat::default(),
54 proactive_rate_limit_sleep: Self::default_proactive_rate_limit_sleep(),
55 }
56 }
57}
58
59impl ClientConfig {
60 pub const fn default_http_req_timeout_millis() -> u64 {
62 30_000
63 }
64
65 pub const fn default_max_num_retries() -> usize {
67 12
68 }
69
70 pub const fn default_retry_backoff_ms() -> u64 {
72 500
73 }
74
75 pub const fn default_retry_base_ms() -> u64 {
77 200
78 }
79
80 pub const fn default_retry_ceiling_ms() -> u64 {
82 5_000
83 }
84
85 pub const fn default_proactive_rate_limit_sleep() -> bool {
87 true
88 }
89 pub fn validate(&self) -> Result<()> {
91 if self.url.is_empty() {
92 anyhow::bail!("url is required");
93 }
94
95 if Url::parse(&self.url).is_err() {
97 anyhow::bail!("url is malformed");
98 }
99
100 if self.api_token.is_empty() {
101 anyhow::bail!("api_token is required - get one from https://envio.dev/app/api-tokens");
102 }
103 if uuid::Uuid::parse_str(self.api_token.as_str()).is_err() {
105 anyhow::bail!("api_token is malformed - make sure its a token from https://envio.dev/app/api-tokens");
106 }
107
108 if self.http_req_timeout_millis == 0 {
109 anyhow::bail!("http_req_timeout_millis must be greater than 0");
110 }
111
112 Ok(())
113 }
114}
115
116#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
118pub enum SerializationFormat {
119 Json,
121 CapnProto {
123 should_cache_queries: bool,
125 },
126}
127
128impl Default for SerializationFormat {
129 fn default() -> Self {
130 Self::CapnProto {
131 should_cache_queries: true,
132 }
133 }
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
138pub struct StreamConfig {
139 pub column_mapping: Option<ColumnMapping>,
142 pub event_signature: Option<String>,
144 #[serde(default)]
146 pub hex_output: HexOutput,
147 #[serde(default = "StreamConfig::default_batch_size")]
150 pub batch_size: u64,
151 #[serde(default)]
156 pub max_batch_size: Option<u64>,
157 #[serde(default = "StreamConfig::default_min_batch_size")]
159 pub min_batch_size: u64,
160 #[serde(default = "StreamConfig::default_concurrency")]
163 pub concurrency: usize,
164 #[serde(default)]
166 pub max_num_blocks: Option<usize>,
167 #[serde(default)]
169 pub max_num_transactions: Option<usize>,
170 #[serde(default)]
172 pub max_num_logs: Option<usize>,
173 #[serde(default)]
175 pub max_num_traces: Option<usize>,
176 #[serde(default = "StreamConfig::default_response_bytes_target")]
179 pub response_bytes_target: u64,
180 #[serde(default)]
193 pub max_buffered_bytes: Option<u64>,
194 #[serde(default = "StreamConfig::default_reverse")]
196 pub reverse: bool,
197}
198
199#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize, PartialEq, JsonSchema)]
201pub enum HexOutput {
202 #[default]
204 NoEncode,
205 Prefixed,
207 NonPrefixed,
209}
210
211impl Default for StreamConfig {
212 fn default() -> Self {
213 Self {
214 column_mapping: None,
215 event_signature: None,
216 hex_output: HexOutput::default(),
217 batch_size: Self::default_batch_size(),
218 max_batch_size: None,
219 min_batch_size: Self::default_min_batch_size(),
220 concurrency: Self::default_concurrency(),
221 max_num_blocks: None,
222 max_num_transactions: None,
223 max_num_logs: None,
224 max_num_traces: None,
225 response_bytes_target: Self::default_response_bytes_target(),
226 max_buffered_bytes: None,
227 reverse: Self::default_reverse(),
228 }
229 }
230}
231
232impl StreamConfig {
233 pub const fn default_concurrency() -> usize {
235 10
236 }
237
238 pub const fn default_batch_size() -> u64 {
240 1000
241 }
242
243 pub const fn default_min_batch_size() -> u64 {
245 200
246 }
247
248 pub const fn default_response_bytes_target() -> u64 {
250 400_000
251 }
252
253 pub const fn default_reverse() -> bool {
255 false
256 }
257
258 pub fn dense() -> Self {
270 Self {
271 concurrency: 20,
272 response_bytes_target: Self::default_response_bytes_target(),
273 ..Self::default()
274 }
275 }
276
277 pub fn sparse() -> Self {
287 Self {
288 concurrency: Self::default_concurrency(),
289 batch_size: 20_000,
290 ..Self::default()
291 }
292 }
293
294 pub fn archival() -> Self {
304 Self {
305 concurrency: 12,
306 ..Self::default()
307 }
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314
315 #[test]
316 fn test_validate() {
317 let valid_cfg = ClientConfig {
318 url: "https://hypersync.xyz".into(),
319 api_token: "00000000-0000-0000-0000-000000000000".to_string(),
320 ..Default::default()
321 };
322
323 assert!(valid_cfg.validate().is_ok(), "valid config");
324
325 let cfg = ClientConfig {
326 url: "https://hypersync.xyz".to_string(),
327 api_token: "not a uuid".to_string(),
328 ..Default::default()
329 };
330
331 assert!(cfg.validate().is_err(), "invalid uuid");
332
333 let cfg = ClientConfig {
334 url: "https://hypersync.xyz".to_string(),
335 ..Default::default()
336 };
337
338 assert!(cfg.validate().is_err(), "missing api token");
339
340 let cfg = ClientConfig {
341 api_token: "00000000-0000-0000-0000-000000000000".to_string(),
342 ..Default::default()
343 };
344
345 assert!(cfg.validate().is_err(), "missing url");
346 let cfg = ClientConfig {
347 http_req_timeout_millis: 0,
348 ..valid_cfg
349 };
350 assert!(
351 cfg.validate().is_err(),
352 "http_req_timeout_millis must be greater than 0"
353 );
354 }
355
356 #[test]
357 fn test_stream_config_defaults() {
358 let default_config = StreamConfig::default();
359
360 assert_eq!(default_config.concurrency, 10);
362 assert_eq!(default_config.batch_size, 1000);
363 assert_eq!(default_config.max_batch_size, None);
364 assert_eq!(default_config.min_batch_size, 200);
365 assert_eq!(default_config.response_bytes_target, 400_000);
366 assert_eq!(default_config.max_buffered_bytes, None);
367 assert!(!default_config.reverse);
368 assert_eq!(default_config.hex_output, HexOutput::NoEncode);
369 assert!(default_config.column_mapping.is_none());
370 assert!(default_config.event_signature.is_none());
371 assert!(default_config.max_num_blocks.is_none());
372 assert!(default_config.max_num_transactions.is_none());
373 assert!(default_config.max_num_logs.is_none());
374 assert!(default_config.max_num_traces.is_none());
375 }
376
377 #[test]
378 fn test_stream_config_serde() {
379 let default_config = StreamConfig::default();
381 let json = serde_json::to_string(&default_config).unwrap();
382 let deserialized: StreamConfig = serde_json::from_str(&json).unwrap();
383
384 assert_eq!(deserialized.concurrency, default_config.concurrency);
386 assert_eq!(deserialized.batch_size, default_config.batch_size);
387 assert_eq!(deserialized.reverse, default_config.reverse);
388
389 let partial_json = r#"{"reverse": true, "batch_size": 500}"#;
391 let partial_config: StreamConfig = serde_json::from_str(partial_json).unwrap();
392
393 assert!(partial_config.reverse);
394 assert_eq!(partial_config.batch_size, 500);
395 assert_eq!(partial_config.concurrency, 10); assert_eq!(partial_config.max_batch_size, None); assert_eq!(partial_config.response_bytes_target, 400_000); assert_eq!(partial_config.max_buffered_bytes, None); let explicit_json = r#"{"max_batch_size": 50000, "response_bytes_target": 800000, "max_buffered_bytes": 1048576}"#;
402 let explicit_config: StreamConfig = serde_json::from_str(explicit_json).unwrap();
403 assert_eq!(explicit_config.max_batch_size, Some(50_000));
404 assert_eq!(explicit_config.response_bytes_target, 800_000);
405 assert_eq!(explicit_config.max_buffered_bytes, Some(1_048_576));
406 }
407
408 #[test]
409 fn test_stream_config_presets() {
410 let dense = StreamConfig::dense();
412 assert_eq!(dense.concurrency, 20);
413 assert_eq!(dense.response_bytes_target, 400_000);
414 assert_eq!(dense.max_buffered_bytes, None);
415
416 let sparse = StreamConfig::sparse();
418 assert_eq!(sparse.concurrency, 10);
419 assert_eq!(sparse.batch_size, 20_000);
420 assert_eq!(sparse.max_buffered_bytes, None);
421
422 let archival = StreamConfig::archival();
424 assert_eq!(archival.concurrency, 12);
425 assert_eq!(archival.max_buffered_bytes, None);
426
427 assert_eq!(dense.min_batch_size, StreamConfig::default_min_batch_size());
429 assert!(!sparse.reverse);
430 }
431}