use anyhow::Result;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use url::Url;
use crate::ColumnMapping;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ClientConfig {
#[serde(default)]
pub url: String,
#[serde(default)]
pub api_token: String,
#[serde(default = "ClientConfig::default_http_req_timeout_millis")]
pub http_req_timeout_millis: u64,
#[serde(default = "ClientConfig::default_max_num_retries")]
pub max_num_retries: usize,
#[serde(default = "ClientConfig::default_retry_backoff_ms")]
pub retry_backoff_ms: u64,
#[serde(default = "ClientConfig::default_retry_base_ms")]
pub retry_base_ms: u64,
#[serde(default = "ClientConfig::default_retry_ceiling_ms")]
pub retry_ceiling_ms: u64,
#[serde(default)]
pub serialization_format: SerializationFormat,
#[serde(default = "ClientConfig::default_proactive_rate_limit_sleep")]
pub proactive_rate_limit_sleep: bool,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
url: String::default(),
api_token: String::default(),
http_req_timeout_millis: Self::default_http_req_timeout_millis(),
max_num_retries: Self::default_max_num_retries(),
retry_backoff_ms: Self::default_retry_backoff_ms(),
retry_base_ms: Self::default_retry_base_ms(),
retry_ceiling_ms: Self::default_retry_ceiling_ms(),
serialization_format: SerializationFormat::default(),
proactive_rate_limit_sleep: Self::default_proactive_rate_limit_sleep(),
}
}
}
impl ClientConfig {
pub const fn default_http_req_timeout_millis() -> u64 {
30_000
}
pub const fn default_max_num_retries() -> usize {
12
}
pub const fn default_retry_backoff_ms() -> u64 {
500
}
pub const fn default_retry_base_ms() -> u64 {
200
}
pub const fn default_retry_ceiling_ms() -> u64 {
5_000
}
pub const fn default_proactive_rate_limit_sleep() -> bool {
true
}
pub fn validate(&self) -> Result<()> {
if self.url.is_empty() {
anyhow::bail!("url is required");
}
if Url::parse(&self.url).is_err() {
anyhow::bail!("url is malformed");
}
if self.api_token.is_empty() {
anyhow::bail!("api_token is required - get one from https://envio.dev/app/api-tokens");
}
if uuid::Uuid::parse_str(self.api_token.as_str()).is_err() {
anyhow::bail!("api_token is malformed - make sure its a token from https://envio.dev/app/api-tokens");
}
if self.http_req_timeout_millis == 0 {
anyhow::bail!("http_req_timeout_millis must be greater than 0");
}
Ok(())
}
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum SerializationFormat {
Json,
CapnProto {
should_cache_queries: bool,
},
}
impl Default for SerializationFormat {
fn default() -> Self {
Self::CapnProto {
should_cache_queries: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct StreamConfig {
pub column_mapping: Option<ColumnMapping>,
pub event_signature: Option<String>,
#[serde(default)]
pub hex_output: HexOutput,
#[serde(default = "StreamConfig::default_batch_size")]
pub batch_size: u64,
#[serde(default = "StreamConfig::default_max_batch_size")]
pub max_batch_size: u64,
#[serde(default = "StreamConfig::default_min_batch_size")]
pub min_batch_size: u64,
#[serde(default = "StreamConfig::default_concurrency")]
pub concurrency: usize,
#[serde(default)]
pub max_num_blocks: Option<usize>,
#[serde(default)]
pub max_num_transactions: Option<usize>,
#[serde(default)]
pub max_num_logs: Option<usize>,
#[serde(default)]
pub max_num_traces: Option<usize>,
#[serde(default = "StreamConfig::default_response_bytes_ceiling")]
pub response_bytes_ceiling: u64,
#[serde(default = "StreamConfig::default_response_bytes_floor")]
pub response_bytes_floor: u64,
#[serde(default = "StreamConfig::default_reverse")]
pub reverse: bool,
}
#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize, PartialEq, JsonSchema)]
pub enum HexOutput {
#[default]
NoEncode,
Prefixed,
NonPrefixed,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
column_mapping: None,
event_signature: None,
hex_output: HexOutput::default(),
batch_size: Self::default_batch_size(),
max_batch_size: Self::default_max_batch_size(),
min_batch_size: Self::default_min_batch_size(),
concurrency: Self::default_concurrency(),
max_num_blocks: None,
max_num_transactions: None,
max_num_logs: None,
max_num_traces: None,
response_bytes_ceiling: Self::default_response_bytes_ceiling(),
response_bytes_floor: Self::default_response_bytes_floor(),
reverse: Self::default_reverse(),
}
}
}
impl StreamConfig {
pub const fn default_concurrency() -> usize {
10
}
pub const fn default_batch_size() -> u64 {
1000
}
pub const fn default_max_batch_size() -> u64 {
200_000
}
pub const fn default_min_batch_size() -> u64 {
200
}
pub const fn default_response_bytes_ceiling() -> u64 {
500_000
}
pub const fn default_response_bytes_floor() -> u64 {
250_000
}
pub const fn default_reverse() -> bool {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate() {
let valid_cfg = ClientConfig {
url: "https://hypersync.xyz".into(),
api_token: "00000000-0000-0000-0000-000000000000".to_string(),
..Default::default()
};
assert!(valid_cfg.validate().is_ok(), "valid config");
let cfg = ClientConfig {
url: "https://hypersync.xyz".to_string(),
api_token: "not a uuid".to_string(),
..Default::default()
};
assert!(cfg.validate().is_err(), "invalid uuid");
let cfg = ClientConfig {
url: "https://hypersync.xyz".to_string(),
..Default::default()
};
assert!(cfg.validate().is_err(), "missing api token");
let cfg = ClientConfig {
api_token: "00000000-0000-0000-0000-000000000000".to_string(),
..Default::default()
};
assert!(cfg.validate().is_err(), "missing url");
let cfg = ClientConfig {
http_req_timeout_millis: 0,
..valid_cfg
};
assert!(
cfg.validate().is_err(),
"http_req_timeout_millis must be greater than 0"
);
}
#[test]
fn test_stream_config_defaults() {
let default_config = StreamConfig::default();
assert_eq!(default_config.concurrency, 10);
assert_eq!(default_config.batch_size, 1000);
assert_eq!(default_config.max_batch_size, 200_000);
assert_eq!(default_config.min_batch_size, 200);
assert_eq!(default_config.response_bytes_ceiling, 500_000);
assert_eq!(default_config.response_bytes_floor, 250_000);
assert!(!default_config.reverse);
assert_eq!(default_config.hex_output, HexOutput::NoEncode);
assert!(default_config.column_mapping.is_none());
assert!(default_config.event_signature.is_none());
assert!(default_config.max_num_blocks.is_none());
assert!(default_config.max_num_transactions.is_none());
assert!(default_config.max_num_logs.is_none());
assert!(default_config.max_num_traces.is_none());
}
#[test]
fn test_stream_config_serde() {
let default_config = StreamConfig::default();
let json = serde_json::to_string(&default_config).unwrap();
let deserialized: StreamConfig = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.concurrency, default_config.concurrency);
assert_eq!(deserialized.batch_size, default_config.batch_size);
assert_eq!(deserialized.reverse, default_config.reverse);
let partial_json = r#"{"reverse": true, "batch_size": 500}"#;
let partial_config: StreamConfig = serde_json::from_str(partial_json).unwrap();
assert!(partial_config.reverse);
assert_eq!(partial_config.batch_size, 500);
assert_eq!(partial_config.concurrency, 10); assert_eq!(partial_config.max_batch_size, 200_000); }
}