use std::time::Duration;
use alloy::{
network::Ethereum,
providers::{DynProvider, Provider, ProviderBuilder, WsConnect},
};
use anyhow::Result;
use reqwest::Url;
use tracing::warn;
use crate::rpc::RpcError;
use crate::rpc::provider::Provider as RpcProvider;
pub const MAX_RETRIES: u32 = 5;
pub const BASE_BACKOFF_SECS: u64 = 2;
pub const DEFAULT_HTTP_CONNECT_TIMEOUT_SECS: u64 = 5;
pub const DEFAULT_HTTP_REQUEST_TIMEOUT_SECS: u64 = 15;
#[derive(Debug, Clone)]
pub struct HttpRpcSettings {
pub url: Url,
pub max_concurrency: u32,
pub max_batch_size: u32,
pub init_batch_size: u32,
pub max_logs_per_request: u32,
}
impl HttpRpcSettings {
pub fn new(url: String, max_concurrency: u32, max_batch_size: u32) -> Self {
Self {
url: url.parse().expect("Invalid URL"),
max_concurrency,
max_batch_size,
max_logs_per_request: 2_500,
init_batch_size: 500,
}
}
pub fn host(&self) -> String {
self.url.host_str().unwrap_or("unknown").to_string()
}
}
#[derive(Debug, Clone)]
pub struct ProviderSettings {
pub http_providers: Vec<HttpRpcSettings>,
pub wss_endpoints: Vec<Url>,
pub max_retries: u32,
pub base_backoff_secs: u64,
pub http_connect_timeout_secs: u64,
pub http_request_timeout_secs: u64,
}
impl Default for ProviderSettings {
fn default() -> Self {
Self {
http_providers: vec![],
wss_endpoints: vec![],
max_retries: MAX_RETRIES,
base_backoff_secs: BASE_BACKOFF_SECS,
http_connect_timeout_secs: DEFAULT_HTTP_CONNECT_TIMEOUT_SECS,
http_request_timeout_secs: DEFAULT_HTTP_REQUEST_TIMEOUT_SECS,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ProviderOptions {
pub ankr_api_key: Option<String>,
pub infura_api_key: Option<String>,
pub quicknode_api_key: Option<String>,
pub alchemy_api_key: Option<String>,
pub rpc_urls: Option<Vec<String>>,
pub ws_urls: Option<Vec<String>>,
}
impl ProviderOptions {
pub fn ankr_api_key(mut self, key: impl AsRef<str>) -> Self {
self.ankr_api_key = Some(key.as_ref().to_string());
self
}
pub fn infura_api_key(mut self, key: impl AsRef<str>) -> Self {
self.infura_api_key = Some(key.as_ref().to_string());
self
}
pub fn quicknode_api_key(mut self, key: impl AsRef<str>) -> Self {
self.quicknode_api_key = Some(key.as_ref().to_string());
self
}
pub fn alchemy_api_key(mut self, key: impl AsRef<str>) -> Self {
self.alchemy_api_key = Some(key.as_ref().to_string());
self
}
pub fn add_http(mut self, url: String) -> Self {
if self.rpc_urls.is_none() {
self.rpc_urls = Some(vec![]);
}
self.rpc_urls.as_mut().unwrap().push(url);
self
}
pub fn add_websocket(mut self, url: String) -> Self {
if self.ws_urls.is_none() {
self.ws_urls = Some(vec![]);
}
self.ws_urls.as_mut().unwrap().push(url);
self
}
}
impl ProviderSettings {
pub fn http(rpc_url: impl AsRef<str>) -> Result<Self> {
let url = rpc_url.as_ref();
let provider = RpcProvider::detect(url).map_err(|_| {
anyhow::anyhow!("Unknown RPC provider for URL {url}. Provide explicit RPC settings.")
})?;
let http_setting = provider.rpc_settings(url);
Ok(Self {
http_providers: vec![http_setting],
..Self::default()
})
}
pub fn with_max_retries(mut self, retries: u32) -> Self {
self.max_retries = retries;
self
}
pub fn with_base_backoff_secs(mut self, secs: u64) -> Self {
self.base_backoff_secs = secs;
self
}
pub fn with_http_connect_timeout_secs(mut self, secs: u64) -> Self {
self.http_connect_timeout_secs = secs;
self
}
pub fn with_http_request_timeout_secs(mut self, secs: u64) -> Self {
self.http_request_timeout_secs = secs;
self
}
pub fn build(options: ProviderOptions, chain_id: u64) -> Result<Self> {
let mut settings = Self::default();
if let Some(key) = options.ankr_api_key {
let provider = RpcProvider::Ankr;
let http_url = provider.http_url(&key, chain_id)?;
let ws_url = provider.ws_url(&key, chain_id)?;
settings
.http_providers
.push(provider.rpc_settings(http_url.as_str()));
settings.wss_endpoints.push(ws_url);
}
if let Some(key) = options.infura_api_key {
let provider = RpcProvider::Infura;
let http_url = provider.http_url(&key, chain_id)?;
let ws_url = provider.ws_url(&key, chain_id)?;
settings
.http_providers
.push(provider.rpc_settings(http_url.as_str()));
settings.wss_endpoints.push(ws_url);
}
if let Some(key) = options.quicknode_api_key {
let provider = RpcProvider::QuickNode;
let http_url = provider.http_url(&key, chain_id)?;
let ws_url = provider.ws_url(&key, chain_id)?;
settings
.http_providers
.push(provider.rpc_settings(http_url.as_str()));
settings.wss_endpoints.push(ws_url);
}
if let Some(key) = options.alchemy_api_key {
let provider = RpcProvider::Alchemy;
let http_url = provider.http_url(&key, chain_id)?;
let ws_url = provider.ws_url(&key, chain_id)?;
settings
.http_providers
.push(provider.rpc_settings(http_url.as_str()));
settings.wss_endpoints.push(ws_url);
}
if let Some(urls) = options.rpc_urls {
for url in urls {
let provider = RpcProvider::detect(&url).map_err(|_| {
anyhow::anyhow!(
"Unknown RPC provider for URL {url}. Provide explicit RPC settings."
)
})?;
settings.http_providers.push(provider.rpc_settings(&url));
}
}
if let Some(urls) = options.ws_urls {
for url in urls {
let parsed_url: Url = url
.parse()
.map_err(|e| anyhow::anyhow!("Invalid WS URL {}: {}", url, e))?;
settings.wss_endpoints.push(parsed_url);
}
}
if settings.http_providers.is_empty() {
return Err(anyhow::anyhow!(
"At least one HTTP provider must be configured via API key or custom URL"
));
}
if settings.wss_endpoints.is_empty() {
return Err(anyhow::anyhow!(
"At least one WebSocket endpoint must be configured via API key or custom URL"
));
}
Ok(settings)
}
pub fn new(http_providers: Vec<HttpRpcSettings>, wss_endpoints: Vec<Url>) -> Self {
Self {
http_providers,
wss_endpoints,
..Self::default()
}
}
pub fn http_settings(&self, index: usize) -> &HttpRpcSettings {
let index = index % self.http_providers.len();
&self.http_providers[index]
}
pub fn connect_http(&self, index: usize) -> DynProvider<Ethereum> {
let index = index % self.http_providers.len();
let provider = &self.http_providers[index];
ProviderBuilder::new()
.with_reqwest(provider.url.clone(), |client_builder| {
client_builder
.connect_timeout(Duration::from_secs(self.http_connect_timeout_secs))
.timeout(Duration::from_secs(self.http_request_timeout_secs))
.build()
.expect("failed to build reqwest client")
})
.erased()
}
pub async fn connect_ws(&self, start_idx: usize) -> Result<DynProvider<Ethereum>, RpcError> {
let mut attempt = 0;
let mut provider_idx = start_idx;
let mut last_endpoint_host: Option<String> = None;
while attempt < MAX_RETRIES {
let endpoint = &self.wss_endpoints[provider_idx % self.wss_endpoints.len()];
let ws_conn = ProviderBuilder::new()
.connect_ws(WsConnect::new(endpoint.clone()))
.await;
match ws_conn {
Ok(ws) => return Ok(ws.erased()),
Err(e) => {
let endpoint_host = endpoint.host_str().unwrap_or("unknown").to_string();
last_endpoint_host = Some(endpoint_host.clone());
attempt += 1;
let backoff = Duration::from_secs(self.base_backoff_secs.pow(attempt));
warn!(
"Failed to connect to WebSocket {}: {} (attempt {}/{}, backoff {:?})",
endpoint_host, e, attempt, self.max_retries, backoff
);
if attempt >= self.max_retries {
return Err(RpcError::ConnectionError(format!(
"Failed to connect to WebSocket {} after {}/{} attempts: {}",
endpoint_host, attempt, self.max_retries, e
)));
}
tokio::time::sleep(backoff).await;
provider_idx += 1;
}
}
}
Err(RpcError::ConnectionError(
match last_endpoint_host.as_deref() {
Some(host) => {
format!("WebSocket connection failed for {host}: max retries exceeded")
}
None => "WebSocket connection failed: max retries exceeded".to_string(),
},
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_http_returns_error_for_unknown_provider() {
let result = ProviderSettings::http("https://unknown-provider.example.com/rpc");
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.to_string().contains("Unknown RPC provider"),
"Error message should mention unknown provider, got: {}",
err
);
}
#[test]
fn test_build_returns_error_for_unknown_rpc_url() {
let options = ProviderOptions::default()
.add_http("https://unknown-provider.example.com/rpc".to_string())
.add_websocket("wss://eth-mainnet.g.alchemy.com/ws/v2/test".to_string());
let result = ProviderSettings::build(options, 1);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.to_string().contains("Unknown RPC provider"),
"Error message should mention unknown provider, got: {}",
err
);
}
#[test]
fn test_build_returns_error_for_unsupported_chain_id() {
let options = ProviderOptions::default().infura_api_key("test-key");
let result = ProviderSettings::build(options, 10200);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("unsupported chain ID 10200"),
"Error should mention unsupported chain ID, got: {}",
err
);
assert!(
err.contains("provider infura"),
"Error should mention the provider, got: {}",
err
);
}
#[test]
fn test_default_http_timeouts() {
let settings = ProviderSettings::default();
assert_eq!(
settings.http_connect_timeout_secs,
DEFAULT_HTTP_CONNECT_TIMEOUT_SECS
);
assert_eq!(
settings.http_request_timeout_secs,
DEFAULT_HTTP_REQUEST_TIMEOUT_SECS
);
}
}