use std::time::Duration;
use alloy::rpc::json_rpc::ResponsePacket;
use crate::errors::PerpCityError;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum Strategy {
RoundRobin,
#[default]
LatencyBased,
Hedged {
fan_out: usize,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CircuitBreakerConfig {
pub failure_threshold: u32,
pub recovery_timeout: Duration,
pub half_open_max_requests: u32,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: 3,
recovery_timeout: Duration::from_secs(30),
half_open_max_requests: 1,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReadRetryConfig {
pub max_retries: u32,
pub base_delay: Duration,
}
impl Default for ReadRetryConfig {
fn default() -> Self {
Self {
max_retries: 2,
base_delay: Duration::from_millis(100),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WriteRetryConfig {
pub max_retries: u32,
pub base_delay: Duration,
}
impl Default for WriteRetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
base_delay: Duration::from_millis(500),
}
}
}
impl WriteRetryConfig {
pub fn is_retriable(&self, response: &ResponsePacket) -> bool {
response.first_error_code().is_some()
}
}
#[derive(Debug, Clone)]
pub struct TransportConfig {
pub shared_endpoints: Vec<String>,
pub read_endpoints: Vec<String>,
pub write_endpoints: Vec<String>,
pub ws_endpoint: Option<String>,
pub request_timeout: Duration,
pub strategy: Strategy,
pub circuit_breaker: CircuitBreakerConfig,
pub read_retry: ReadRetryConfig,
pub write_retry: WriteRetryConfig,
}
impl TransportConfig {
pub fn builder() -> TransportConfigBuilder {
TransportConfigBuilder::default()
}
}
#[derive(Debug, Clone)]
pub struct TransportConfigBuilder {
shared_endpoints: Vec<String>,
read_endpoints: Vec<String>,
write_endpoints: Vec<String>,
ws_endpoint: Option<String>,
request_timeout: Duration,
strategy: Strategy,
circuit_breaker: CircuitBreakerConfig,
read_retry: ReadRetryConfig,
write_retry: WriteRetryConfig,
}
impl Default for TransportConfigBuilder {
fn default() -> Self {
Self {
shared_endpoints: Vec::new(),
read_endpoints: Vec::new(),
write_endpoints: Vec::new(),
ws_endpoint: None,
request_timeout: Duration::from_secs(5),
strategy: Strategy::default(),
circuit_breaker: CircuitBreakerConfig::default(),
read_retry: ReadRetryConfig::default(),
write_retry: WriteRetryConfig::default(),
}
}
}
impl TransportConfigBuilder {
pub fn shared_endpoint(mut self, url: impl Into<String>) -> Self {
self.shared_endpoints.push(url.into());
self
}
pub fn read_endpoint(mut self, url: impl Into<String>) -> Self {
self.read_endpoints.push(url.into());
self
}
pub fn write_endpoint(mut self, url: impl Into<String>) -> Self {
self.write_endpoints.push(url.into());
self
}
pub fn ws_endpoint(mut self, url: impl Into<String>) -> Self {
self.ws_endpoint = Some(url.into());
self
}
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
pub fn strategy(mut self, strategy: Strategy) -> Self {
self.strategy = strategy;
self
}
pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
self.circuit_breaker = config;
self
}
pub fn read_retry(mut self, config: ReadRetryConfig) -> Self {
self.read_retry = config;
self
}
pub fn write_retry(mut self, config: WriteRetryConfig) -> Self {
self.write_retry = config;
self
}
pub fn build(self) -> crate::Result<TransportConfig> {
let total =
self.shared_endpoints.len() + self.read_endpoints.len() + self.write_endpoints.len();
if total == 0 {
return Err(PerpCityError::InvalidConfig {
reason: "no endpoints configured".into(),
});
}
if self.write_endpoints.is_empty() && self.shared_endpoints.is_empty() {
return Err(PerpCityError::InvalidConfig {
reason: "writes have no reachable endpoint: \
configure at least one shared or write endpoint"
.into(),
});
}
if let Strategy::Hedged { fan_out } = self.strategy
&& fan_out < 2
{
return Err(PerpCityError::InvalidConfig {
reason: "hedged strategy requires fan_out >= 2".into(),
});
}
Ok(TransportConfig {
shared_endpoints: self.shared_endpoints,
read_endpoints: self.read_endpoints,
write_endpoints: self.write_endpoints,
ws_endpoint: self.ws_endpoint,
request_timeout: self.request_timeout,
strategy: self.strategy,
circuit_breaker: self.circuit_breaker,
read_retry: self.read_retry,
write_retry: self.write_retry,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder_defaults() {
let config = TransportConfig::builder()
.shared_endpoint("https://rpc1.example.com")
.build()
.unwrap();
assert_eq!(config.shared_endpoints.len(), 1);
assert!(config.read_endpoints.is_empty());
assert!(config.write_endpoints.is_empty());
assert!(config.ws_endpoint.is_none());
assert_eq!(config.request_timeout, Duration::from_secs(5));
assert_eq!(config.strategy, Strategy::LatencyBased);
assert_eq!(config.circuit_breaker.failure_threshold, 3);
assert_eq!(config.read_retry.max_retries, 2);
assert_eq!(config.write_retry.max_retries, 3);
}
#[test]
fn builder_all_options() {
let config = TransportConfig::builder()
.shared_endpoint("https://rpc1.example.com")
.shared_endpoint("https://rpc2.example.com")
.read_endpoint("https://read.example.com")
.write_endpoint("https://write.example.com")
.ws_endpoint("wss://ws.example.com")
.request_timeout(Duration::from_millis(500))
.strategy(Strategy::Hedged { fan_out: 3 })
.circuit_breaker(CircuitBreakerConfig {
failure_threshold: 5,
recovery_timeout: Duration::from_secs(60),
half_open_max_requests: 2,
})
.read_retry(ReadRetryConfig {
max_retries: 5,
base_delay: Duration::from_millis(50),
})
.write_retry(WriteRetryConfig {
max_retries: 1,
base_delay: Duration::from_millis(500),
})
.build()
.unwrap();
assert_eq!(config.shared_endpoints.len(), 2);
assert_eq!(config.read_endpoints.len(), 1);
assert_eq!(config.write_endpoints.len(), 1);
assert_eq!(config.ws_endpoint.as_deref(), Some("wss://ws.example.com"));
assert_eq!(config.request_timeout, Duration::from_millis(500));
assert!(matches!(config.strategy, Strategy::Hedged { fan_out: 3 }));
assert_eq!(config.circuit_breaker.failure_threshold, 5);
assert_eq!(config.read_retry.max_retries, 5);
assert_eq!(config.write_retry.max_retries, 1);
}
#[test]
fn read_write_split() {
let config = TransportConfig::builder()
.shared_endpoint("https://alchemy.example.com")
.read_endpoint("https://public.example.com")
.build()
.unwrap();
assert_eq!(config.shared_endpoints.len(), 1);
assert_eq!(config.read_endpoints.len(), 1);
assert!(config.write_endpoints.is_empty());
}
#[test]
fn no_endpoints_errors() {
let result = TransportConfig::builder().build();
assert!(result.is_err());
}
#[test]
fn read_only_endpoints_errors() {
let result = TransportConfig::builder()
.read_endpoint("https://read.example.com")
.build();
assert!(result.is_err());
}
#[test]
fn write_only_endpoints_ok() {
let result = TransportConfig::builder()
.write_endpoint("https://write.example.com")
.build();
assert!(result.is_ok());
}
#[test]
fn hedged_fan_out_one_errors() {
let result = TransportConfig::builder()
.shared_endpoint("https://rpc1.example.com")
.strategy(Strategy::Hedged { fan_out: 1 })
.build();
assert!(result.is_err());
}
}