1use anyhow::Result;
2use schemars::JsonSchema;
3use serde::{Deserialize, Serialize};
4use url::Url;
5
6use crate::ColumnMapping;
7
8#[derive(Debug, Clone, Deserialize, Serialize)]
10pub struct ClientConfig {
11 #[serde(default)]
13 pub url: String,
14 #[serde(default)]
16 pub api_token: String,
17 #[serde(default = "ClientConfig::default_http_req_timeout_millis")]
19 pub http_req_timeout_millis: u64,
20 #[serde(default = "ClientConfig::default_max_num_retries")]
22 pub max_num_retries: usize,
23 #[serde(default = "ClientConfig::default_retry_backoff_ms")]
25 pub retry_backoff_ms: u64,
26 #[serde(default = "ClientConfig::default_retry_base_ms")]
28 pub retry_base_ms: u64,
29 #[serde(default = "ClientConfig::default_retry_ceiling_ms")]
31 pub retry_ceiling_ms: u64,
32 #[serde(default)]
34 pub serialization_format: SerializationFormat,
35 #[serde(default = "ClientConfig::default_proactive_rate_limit_sleep")]
40 pub proactive_rate_limit_sleep: bool,
41}
42
43impl Default for ClientConfig {
44 fn default() -> Self {
45 Self {
46 url: String::default(),
47 api_token: String::default(),
48 http_req_timeout_millis: Self::default_http_req_timeout_millis(),
49 max_num_retries: Self::default_max_num_retries(),
50 retry_backoff_ms: Self::default_retry_backoff_ms(),
51 retry_base_ms: Self::default_retry_base_ms(),
52 retry_ceiling_ms: Self::default_retry_ceiling_ms(),
53 serialization_format: SerializationFormat::Json,
54 proactive_rate_limit_sleep: Self::default_proactive_rate_limit_sleep(),
55 }
56 }
57}
58
59impl ClientConfig {
60 pub const fn default_http_req_timeout_millis() -> u64 {
62 30_000
63 }
64
65 pub const fn default_max_num_retries() -> usize {
67 12
68 }
69
70 pub const fn default_retry_backoff_ms() -> u64 {
72 500
73 }
74
75 pub const fn default_retry_base_ms() -> u64 {
77 200
78 }
79
80 pub const fn default_retry_ceiling_ms() -> u64 {
82 5_000
83 }
84
85 pub const fn default_proactive_rate_limit_sleep() -> bool {
87 true
88 }
89 pub fn validate(&self) -> Result<()> {
91 if self.url.is_empty() {
92 anyhow::bail!("url is required");
93 }
94
95 if Url::parse(&self.url).is_err() {
97 anyhow::bail!("url is malformed");
98 }
99
100 if self.api_token.is_empty() {
101 anyhow::bail!("api_token is required - get one from https://envio.dev/app/api-tokens");
102 }
103 if uuid::Uuid::parse_str(self.api_token.as_str()).is_err() {
105 anyhow::bail!("api_token is malformed - make sure its a token from https://envio.dev/app/api-tokens");
106 }
107
108 if self.http_req_timeout_millis == 0 {
109 anyhow::bail!("http_req_timeout_millis must be greater than 0");
110 }
111
112 Ok(())
113 }
114}
115
116#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize)]
118pub enum SerializationFormat {
119 #[default]
121 Json,
122 CapnProto {
124 should_cache_queries: bool,
126 },
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
131pub struct StreamConfig {
132 pub column_mapping: Option<ColumnMapping>,
135 pub event_signature: Option<String>,
137 #[serde(default)]
139 pub hex_output: HexOutput,
140 #[serde(default = "StreamConfig::default_batch_size")]
142 pub batch_size: u64,
143 #[serde(default = "StreamConfig::default_max_batch_size")]
145 pub max_batch_size: u64,
146 #[serde(default = "StreamConfig::default_min_batch_size")]
148 pub min_batch_size: u64,
149 #[serde(default = "StreamConfig::default_concurrency")]
151 pub concurrency: usize,
152 #[serde(default)]
154 pub max_num_blocks: Option<usize>,
155 #[serde(default)]
157 pub max_num_transactions: Option<usize>,
158 #[serde(default)]
160 pub max_num_logs: Option<usize>,
161 #[serde(default)]
163 pub max_num_traces: Option<usize>,
164 #[serde(default = "StreamConfig::default_response_bytes_ceiling")]
166 pub response_bytes_ceiling: u64,
167 #[serde(default = "StreamConfig::default_response_bytes_floor")]
169 pub response_bytes_floor: u64,
170 #[serde(default = "StreamConfig::default_reverse")]
172 pub reverse: bool,
173}
174
175#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize, PartialEq, JsonSchema)]
177pub enum HexOutput {
178 #[default]
180 NoEncode,
181 Prefixed,
183 NonPrefixed,
185}
186
187impl Default for StreamConfig {
188 fn default() -> Self {
189 Self {
190 column_mapping: None,
191 event_signature: None,
192 hex_output: HexOutput::default(),
193 batch_size: Self::default_batch_size(),
194 max_batch_size: Self::default_max_batch_size(),
195 min_batch_size: Self::default_min_batch_size(),
196 concurrency: Self::default_concurrency(),
197 max_num_blocks: None,
198 max_num_transactions: None,
199 max_num_logs: None,
200 max_num_traces: None,
201 response_bytes_ceiling: Self::default_response_bytes_ceiling(),
202 response_bytes_floor: Self::default_response_bytes_floor(),
203 reverse: Self::default_reverse(),
204 }
205 }
206}
207
208impl StreamConfig {
209 pub const fn default_concurrency() -> usize {
211 10
212 }
213
214 pub const fn default_batch_size() -> u64 {
216 1000
217 }
218
219 pub const fn default_max_batch_size() -> u64 {
221 200_000
222 }
223
224 pub const fn default_min_batch_size() -> u64 {
226 200
227 }
228
229 pub const fn default_response_bytes_ceiling() -> u64 {
231 500_000
232 }
233
234 pub const fn default_response_bytes_floor() -> u64 {
236 250_000
237 }
238
239 pub const fn default_reverse() -> bool {
241 false
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248
249 #[test]
250 fn test_validate() {
251 let valid_cfg = ClientConfig {
252 url: "https://hypersync.xyz".into(),
253 api_token: "00000000-0000-0000-0000-000000000000".to_string(),
254 ..Default::default()
255 };
256
257 assert!(valid_cfg.validate().is_ok(), "valid config");
258
259 let cfg = ClientConfig {
260 url: "https://hypersync.xyz".to_string(),
261 api_token: "not a uuid".to_string(),
262 ..Default::default()
263 };
264
265 assert!(cfg.validate().is_err(), "invalid uuid");
266
267 let cfg = ClientConfig {
268 url: "https://hypersync.xyz".to_string(),
269 ..Default::default()
270 };
271
272 assert!(cfg.validate().is_err(), "missing api token");
273
274 let cfg = ClientConfig {
275 api_token: "00000000-0000-0000-0000-000000000000".to_string(),
276 ..Default::default()
277 };
278
279 assert!(cfg.validate().is_err(), "missing url");
280 let cfg = ClientConfig {
281 http_req_timeout_millis: 0,
282 ..valid_cfg
283 };
284 assert!(
285 cfg.validate().is_err(),
286 "http_req_timeout_millis must be greater than 0"
287 );
288 }
289
290 #[test]
291 fn test_stream_config_defaults() {
292 let default_config = StreamConfig::default();
293
294 assert_eq!(default_config.concurrency, 10);
296 assert_eq!(default_config.batch_size, 1000);
297 assert_eq!(default_config.max_batch_size, 200_000);
298 assert_eq!(default_config.min_batch_size, 200);
299 assert_eq!(default_config.response_bytes_ceiling, 500_000);
300 assert_eq!(default_config.response_bytes_floor, 250_000);
301 assert!(!default_config.reverse);
302 assert_eq!(default_config.hex_output, HexOutput::NoEncode);
303 assert!(default_config.column_mapping.is_none());
304 assert!(default_config.event_signature.is_none());
305 assert!(default_config.max_num_blocks.is_none());
306 assert!(default_config.max_num_transactions.is_none());
307 assert!(default_config.max_num_logs.is_none());
308 assert!(default_config.max_num_traces.is_none());
309 }
310
311 #[test]
312 fn test_stream_config_serde() {
313 let default_config = StreamConfig::default();
315 let json = serde_json::to_string(&default_config).unwrap();
316 let deserialized: StreamConfig = serde_json::from_str(&json).unwrap();
317
318 assert_eq!(deserialized.concurrency, default_config.concurrency);
320 assert_eq!(deserialized.batch_size, default_config.batch_size);
321 assert_eq!(deserialized.reverse, default_config.reverse);
322
323 let partial_json = r#"{"reverse": true, "batch_size": 500}"#;
325 let partial_config: StreamConfig = serde_json::from_str(partial_json).unwrap();
326
327 assert!(partial_config.reverse);
328 assert_eq!(partial_config.batch_size, 500);
329 assert_eq!(partial_config.concurrency, 10); assert_eq!(partial_config.max_batch_size, 200_000); }
332}