ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use std::time::Duration;

use alloy::{
    network::Ethereum,
    providers::{DynProvider, Provider, ProviderBuilder, WsConnect},
};
use anyhow::Result;
use reqwest::Url;
use tracing::warn;

use crate::rpc::RpcError;
use crate::rpc::provider::Provider as RpcProvider;

pub const MAX_RETRIES: u32 = 5;
pub const BASE_BACKOFF_SECS: u64 = 2;
pub const DEFAULT_HTTP_CONNECT_TIMEOUT_SECS: u64 = 5;
pub const DEFAULT_HTTP_REQUEST_TIMEOUT_SECS: u64 = 15;

#[derive(Debug, Clone)]
pub struct HttpRpcSettings {
    pub url: Url,
    pub max_concurrency: u32,
    pub max_batch_size: u32,
    pub init_batch_size: u32,
    pub max_logs_per_request: u32,
}

impl HttpRpcSettings {
    pub fn new(url: String, max_concurrency: u32, max_batch_size: u32) -> Self {
        Self {
            url: url.parse().expect("Invalid URL"),
            max_concurrency,
            max_batch_size,
            max_logs_per_request: 2_500,
            init_batch_size: 500,
        }
    }

    pub fn host(&self) -> String {
        self.url.host_str().unwrap_or("unknown").to_string()
    }
}

#[derive(Debug, Clone)]
pub struct ProviderSettings {
    pub http_providers: Vec<HttpRpcSettings>,
    pub wss_endpoints: Vec<Url>,
    pub max_retries: u32,
    pub base_backoff_secs: u64,
    pub http_connect_timeout_secs: u64,
    pub http_request_timeout_secs: u64,
}

impl Default for ProviderSettings {
    fn default() -> Self {
        Self {
            http_providers: vec![],
            wss_endpoints: vec![],
            max_retries: MAX_RETRIES,
            base_backoff_secs: BASE_BACKOFF_SECS,
            http_connect_timeout_secs: DEFAULT_HTTP_CONNECT_TIMEOUT_SECS,
            http_request_timeout_secs: DEFAULT_HTTP_REQUEST_TIMEOUT_SECS,
        }
    }
}

#[derive(Debug, Clone, Default)]
pub struct ProviderOptions {
    pub ankr_api_key: Option<String>,
    pub infura_api_key: Option<String>,
    pub quicknode_api_key: Option<String>,
    pub alchemy_api_key: Option<String>,
    pub rpc_urls: Option<Vec<String>>,
    pub ws_urls: Option<Vec<String>>,
}

impl ProviderOptions {
    pub fn ankr_api_key(mut self, key: impl AsRef<str>) -> Self {
        self.ankr_api_key = Some(key.as_ref().to_string());
        self
    }

    pub fn infura_api_key(mut self, key: impl AsRef<str>) -> Self {
        self.infura_api_key = Some(key.as_ref().to_string());
        self
    }

    pub fn quicknode_api_key(mut self, key: impl AsRef<str>) -> Self {
        self.quicknode_api_key = Some(key.as_ref().to_string());
        self
    }

    pub fn alchemy_api_key(mut self, key: impl AsRef<str>) -> Self {
        self.alchemy_api_key = Some(key.as_ref().to_string());
        self
    }

    pub fn add_http(mut self, url: String) -> Self {
        if self.rpc_urls.is_none() {
            self.rpc_urls = Some(vec![]);
        }
        self.rpc_urls.as_mut().unwrap().push(url);
        self
    }

    pub fn add_websocket(mut self, url: String) -> Self {
        if self.ws_urls.is_none() {
            self.ws_urls = Some(vec![]);
        }
        self.ws_urls.as_mut().unwrap().push(url);
        self
    }
}

impl ProviderSettings {
    pub fn http(rpc_url: impl AsRef<str>) -> Result<Self> {
        let url = rpc_url.as_ref();
        let provider = RpcProvider::detect(url).map_err(|_| {
            anyhow::anyhow!("Unknown RPC provider for URL {url}. Provide explicit RPC settings.")
        })?;
        let http_setting = provider.rpc_settings(url);
        Ok(Self {
            http_providers: vec![http_setting],
            ..Self::default()
        })
    }

    pub fn with_max_retries(mut self, retries: u32) -> Self {
        self.max_retries = retries;
        self
    }

    pub fn with_base_backoff_secs(mut self, secs: u64) -> Self {
        self.base_backoff_secs = secs;
        self
    }

    pub fn with_http_connect_timeout_secs(mut self, secs: u64) -> Self {
        self.http_connect_timeout_secs = secs;
        self
    }

    pub fn with_http_request_timeout_secs(mut self, secs: u64) -> Self {
        self.http_request_timeout_secs = secs;
        self
    }

    pub fn build(options: ProviderOptions, chain_id: u64) -> Result<Self> {
        let mut settings = Self::default();

        if let Some(key) = options.ankr_api_key {
            let provider = RpcProvider::Ankr;
            let http_url = provider.http_url(&key, chain_id)?;
            let ws_url = provider.ws_url(&key, chain_id)?;
            settings
                .http_providers
                .push(provider.rpc_settings(http_url.as_str()));
            settings.wss_endpoints.push(ws_url);
        }

        if let Some(key) = options.infura_api_key {
            let provider = RpcProvider::Infura;
            let http_url = provider.http_url(&key, chain_id)?;
            let ws_url = provider.ws_url(&key, chain_id)?;
            settings
                .http_providers
                .push(provider.rpc_settings(http_url.as_str()));
            settings.wss_endpoints.push(ws_url);
        }

        if let Some(key) = options.quicknode_api_key {
            let provider = RpcProvider::QuickNode;
            let http_url = provider.http_url(&key, chain_id)?;
            let ws_url = provider.ws_url(&key, chain_id)?;
            settings
                .http_providers
                .push(provider.rpc_settings(http_url.as_str()));
            settings.wss_endpoints.push(ws_url);
        }

        if let Some(key) = options.alchemy_api_key {
            let provider = RpcProvider::Alchemy;
            let http_url = provider.http_url(&key, chain_id)?;
            let ws_url = provider.ws_url(&key, chain_id)?;
            settings
                .http_providers
                .push(provider.rpc_settings(http_url.as_str()));
            settings.wss_endpoints.push(ws_url);
        }

        if let Some(urls) = options.rpc_urls {
            for url in urls {
                let provider = RpcProvider::detect(&url).map_err(|_| {
                    anyhow::anyhow!(
                        "Unknown RPC provider for URL {url}.  Provide explicit RPC settings."
                    )
                })?;
                settings.http_providers.push(provider.rpc_settings(&url));
            }
        }

        if let Some(urls) = options.ws_urls {
            for url in urls {
                let parsed_url: Url = url
                    .parse()
                    .map_err(|e| anyhow::anyhow!("Invalid WS URL {}: {}", url, e))?;
                settings.wss_endpoints.push(parsed_url);
            }
        }

        if settings.http_providers.is_empty() {
            return Err(anyhow::anyhow!(
                "At least one HTTP provider must be configured via API key or custom URL"
            ));
        }

        if settings.wss_endpoints.is_empty() {
            return Err(anyhow::anyhow!(
                "At least one WebSocket endpoint must be configured via API key or custom URL"
            ));
        }

        Ok(settings)
    }

    pub fn new(http_providers: Vec<HttpRpcSettings>, wss_endpoints: Vec<Url>) -> Self {
        Self {
            http_providers,
            wss_endpoints,
            ..Self::default()
        }
    }

    pub fn http_settings(&self, index: usize) -> &HttpRpcSettings {
        let index = index % self.http_providers.len();
        &self.http_providers[index]
    }

    pub fn connect_http(&self, index: usize) -> DynProvider<Ethereum> {
        let index = index % self.http_providers.len();
        let provider = &self.http_providers[index];
        ProviderBuilder::new()
            .with_reqwest(provider.url.clone(), |client_builder| {
                client_builder
                    .connect_timeout(Duration::from_secs(self.http_connect_timeout_secs))
                    .timeout(Duration::from_secs(self.http_request_timeout_secs))
                    .build()
                    .expect("failed to build reqwest client")
            })
            .erased()
    }

    pub async fn connect_ws(&self, start_idx: usize) -> Result<DynProvider<Ethereum>, RpcError> {
        let mut attempt = 0;
        let mut provider_idx = start_idx;
        let mut last_endpoint_host: Option<String> = None;

        while attempt < MAX_RETRIES {
            let endpoint = &self.wss_endpoints[provider_idx % self.wss_endpoints.len()];
            let ws_conn = ProviderBuilder::new()
                .connect_ws(WsConnect::new(endpoint.clone()))
                .await;

            match ws_conn {
                Ok(ws) => return Ok(ws.erased()),
                Err(e) => {
                    let endpoint_host = endpoint.host_str().unwrap_or("unknown").to_string();
                    last_endpoint_host = Some(endpoint_host.clone());
                    attempt += 1;
                    let backoff = Duration::from_secs(self.base_backoff_secs.pow(attempt));
                    warn!(
                        "Failed to connect to WebSocket {}: {} (attempt {}/{}, backoff {:?})",
                        endpoint_host, e, attempt, self.max_retries, backoff
                    );

                    if attempt >= self.max_retries {
                        return Err(RpcError::ConnectionError(format!(
                            "Failed to connect to WebSocket {} after {}/{} attempts: {}",
                            endpoint_host, attempt, self.max_retries, e
                        )));
                    }

                    tokio::time::sleep(backoff).await;
                    provider_idx += 1;
                }
            }
        }

        Err(RpcError::ConnectionError(
            match last_endpoint_host.as_deref() {
                Some(host) => {
                    format!("WebSocket connection failed for {host}: max retries exceeded")
                }
                None => "WebSocket connection failed: max retries exceeded".to_string(),
            },
        ))
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_http_returns_error_for_unknown_provider() {
        let result = ProviderSettings::http("https://unknown-provider.example.com/rpc");
        assert!(result.is_err());
        let err = result.unwrap_err();
        assert!(
            err.to_string().contains("Unknown RPC provider"),
            "Error message should mention unknown provider, got: {}",
            err
        );
    }

    #[test]
    fn test_build_returns_error_for_unknown_rpc_url() {
        let options = ProviderOptions::default()
            .add_http("https://unknown-provider.example.com/rpc".to_string())
            .add_websocket("wss://eth-mainnet.g.alchemy.com/ws/v2/test".to_string());

        let result = ProviderSettings::build(options, 1);
        assert!(result.is_err());
        let err = result.unwrap_err();
        assert!(
            err.to_string().contains("Unknown RPC provider"),
            "Error message should mention unknown provider, got: {}",
            err
        );
    }

    #[test]
    fn test_build_returns_error_for_unsupported_chain_id() {
        let options = ProviderOptions::default().infura_api_key("test-key");

        let result = ProviderSettings::build(options, 10200);
        assert!(result.is_err());

        let err = result.unwrap_err().to_string();
        assert!(
            err.contains("unsupported chain ID 10200"),
            "Error should mention unsupported chain ID, got: {}",
            err
        );
        assert!(
            err.contains("provider infura"),
            "Error should mention the provider, got: {}",
            err
        );
    }

    #[test]
    fn test_default_http_timeouts() {
        let settings = ProviderSettings::default();
        assert_eq!(
            settings.http_connect_timeout_secs,
            DEFAULT_HTTP_CONNECT_TIMEOUT_SECS
        );
        assert_eq!(
            settings.http_request_timeout_secs,
            DEFAULT_HTTP_REQUEST_TIMEOUT_SECS
        );
    }
}