use std::{
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use alloy::{
network::EthereumWallet,
primitives::ChainId,
providers::{
DynProvider, Provider, ProviderBuilder, WsConnect,
fillers::{BlobGasFiller, ChainIdFiller},
},
rpc::{client::RpcClient, json_rpc::RequestPacket},
transports::{
RpcError, TransportError, TransportErrorKind,
http::{
Http,
reqwest::{self, Url},
},
layers::{FallbackLayer, OrRetryPolicyFn, RateLimitRetryPolicy, RetryPolicy},
},
};
use backon::{ExponentialBuilder, Retryable as _};
use serde::Deserialize;
use tower::{Layer, Service, ServiceBuilder};
use crate::Environment;
pub mod erc165;
#[derive(Clone)]
pub struct RpcProvider {
http_provider: DynProvider,
ws_provider: DynProvider,
}
#[derive(Debug, Clone, Deserialize)]
#[non_exhaustive]
pub struct RpcProviderConfig {
pub http_urls: Vec<Url>,
pub ws_url: Url,
#[serde(default)]
pub chain_id: Option<ChainId>,
#[serde(default = "RpcProviderConfig::default_timeout")]
#[serde(with = "humantime_serde")]
pub timeout: Duration,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub confirmations_poll_interval: Option<Duration>,
#[serde(default)]
pub retry_policy_config: RetryPolicyConfig,
}
#[derive(Debug, Clone, Deserialize)]
#[non_exhaustive]
pub struct RetryPolicyConfig {
#[serde(default = "RetryPolicyConfig::default_min_delay")]
#[serde(with = "humantime_serde")]
pub min_delay: Duration,
#[serde(default = "RetryPolicyConfig::default_max_delay")]
#[serde(with = "humantime_serde")]
pub max_delay: Duration,
#[serde(default = "RetryPolicyConfig::default_max_times")]
pub max_times: usize,
}
impl RpcProviderConfig {
#[must_use]
pub fn with_default_values(http_urls: Vec<Url>, ws_url: Url) -> Self {
Self {
http_urls,
ws_url,
timeout: Self::default_timeout(),
confirmations_poll_interval: None,
chain_id: None,
retry_policy_config: RetryPolicyConfig::default(),
}
}
fn default_timeout() -> Duration {
Duration::from_secs(10)
}
}
impl RetryPolicyConfig {
fn default_min_delay() -> Duration {
Duration::from_secs(1)
}
fn default_max_delay() -> Duration {
Duration::from_secs(8)
}
fn default_max_times() -> usize {
5
}
fn with_default_values() -> Self {
Self {
min_delay: Self::default_min_delay(),
max_delay: Self::default_max_delay(),
max_times: Self::default_max_times(),
}
}
}
impl Default for RetryPolicyConfig {
fn default() -> Self {
Self::with_default_values()
}
}
pub struct RpcProviderBuilder {
http_urls: Vec<Url>,
ws_rpc_url: Url,
retry_policy_config: RetryPolicyConfig,
chain_id: Option<ChainId>,
timeout: Duration,
confirmations_poll_interval: Option<Duration>,
is_local: bool,
wallet: Option<EthereumWallet>,
}
impl From<RpcProviderConfig> for RpcProviderBuilder {
fn from(value: RpcProviderConfig) -> Self {
Self::from(&value)
}
}
impl From<&RpcProviderConfig> for RpcProviderBuilder {
fn from(value: &RpcProviderConfig) -> Self {
Self::with_config(value)
}
}
impl RpcProviderBuilder {
#[must_use]
pub fn with_config(config: &RpcProviderConfig) -> Self {
assert!(!config.http_urls.is_empty(), "http URLs must not be empty");
Self {
http_urls: config.http_urls.clone(),
ws_rpc_url: config.ws_url.clone(),
retry_policy_config: config.retry_policy_config.clone(),
timeout: config.timeout,
chain_id: config.chain_id,
is_local: false,
wallet: None,
confirmations_poll_interval: config.confirmations_poll_interval,
}
}
#[must_use]
pub fn with_default_values(http_urls: Vec<Url>, ws_url: Url) -> Self {
Self::with_config(&RpcProviderConfig::with_default_values(http_urls, ws_url))
}
#[must_use]
pub fn environment(mut self, environment: Environment) -> Self {
self.is_local = environment.is_dev();
self
}
#[must_use]
pub fn http_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
#[must_use]
pub fn confirmations_poll_interval(mut self, confirmations_poll_interval: Duration) -> Self {
self.confirmations_poll_interval = Some(confirmations_poll_interval);
self
}
#[must_use]
pub fn chain_id(mut self, chain_id: ChainId) -> Self {
self.chain_id = Some(chain_id);
self
}
#[must_use]
pub fn retry_policy(mut self, retry_policy_config: RetryPolicyConfig) -> Self {
self.retry_policy_config = retry_policy_config;
self
}
#[must_use]
pub fn wallet(mut self, wallet: EthereumWallet) -> Self {
self.wallet = Some(wallet);
self
}
pub async fn build(self) -> Result<RpcProvider, TransportError> {
let Self {
http_urls,
retry_policy_config,
chain_id,
timeout,
is_local,
wallet,
ws_rpc_url,
confirmations_poll_interval,
} = self;
let reqwest = reqwest::ClientBuilder::new()
.timeout(timeout)
.build()
.expect("Failed to build reqwest HTTP client");
let transports = http_urls
.into_iter()
.map(|url| Http::with_client(reqwest.clone(), url))
.collect::<Vec<_>>();
let transport_count =
NonZeroUsize::try_from(transports.len()).expect("Checked non-empty in with_config");
let fallback_layer = FallbackLayer::default().with_active_transport_count(transport_count);
let retry_policy =
RateLimitRetryPolicy::default().or(|error: &TransportError| match error {
RpcError::Transport(TransportErrorKind::HttpError(e)) => {
matches!(e.status, 408 | 502 | 504)
}
RpcError::Transport(TransportErrorKind::Custom(_)) => true,
_ => false,
});
let retry_layer = RetryLayer::new(retry_policy, &retry_policy_config);
let transport = ServiceBuilder::new()
.layer(retry_layer)
.layer(fallback_layer)
.service(transports);
let client = RpcClient::builder().transport(transport, is_local);
let client = if let Some(confirmations_poll_interval) = confirmations_poll_interval {
client.with_poll_interval(confirmations_poll_interval)
} else {
client
};
let http_provider_builder = ProviderBuilder::new()
.filler(ChainIdFiller::new(chain_id))
.filler(BlobGasFiller::default())
.with_simple_nonce_management()
.with_gas_estimation();
let http_provider = if let Some(wallet) = wallet {
http_provider_builder
.wallet(wallet)
.connect_client(client)
.erased()
} else {
http_provider_builder.connect_client(client).erased()
};
let ws_provider = ProviderBuilder::new()
.connect_ws(WsConnect::new(ws_rpc_url))
.await?
.erased();
Ok(RpcProvider {
http_provider,
ws_provider,
})
}
}
impl RpcProvider {
#[must_use]
pub fn http(&self) -> DynProvider {
self.http_provider.clone()
}
#[must_use]
pub fn subscriptions(&self) -> DynProvider {
self.ws_provider.clone()
}
}
#[derive(Debug, Clone)]
struct RetryLayer {
policy: OrRetryPolicyFn,
backoff: ExponentialBuilder,
}
impl RetryLayer {
pub fn new(policy: OrRetryPolicyFn, config: &RetryPolicyConfig) -> Self {
let backoff = ExponentialBuilder::default()
.with_min_delay(config.min_delay)
.with_max_delay(config.max_delay)
.with_max_times(config.max_times)
.with_jitter();
Self { policy, backoff }
}
}
impl<S> Layer<S> for RetryLayer {
type Service = RetryService<S>;
fn layer(&self, inner: S) -> Self::Service {
RetryService {
inner,
policy: self.policy.clone(),
backoff: self.backoff,
}
}
}
#[derive(Debug, Clone)]
struct RetryService<S> {
inner: S,
policy: OrRetryPolicyFn,
backoff: ExponentialBuilder,
}
impl<S> Service<RequestPacket> for RetryService<S>
where
S: Service<
RequestPacket,
Response = alloy::rpc::json_rpc::ResponsePacket,
Error = TransportError,
> + Clone
+ Send
+ Sync
+ 'static,
S::Future: Send,
{
type Response = alloy::rpc::json_rpc::ResponsePacket;
type Error = TransportError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, request: RequestPacket) -> Self::Future {
let service = self.clone();
let backoff = self.backoff;
let policy = self.policy.clone();
Box::pin(async move {
(|| service.clone().call_and_parse_error(request.clone()))
.retry(backoff)
.sleep(tokio::time::sleep)
.when(|e| policy.should_retry(e))
.notify(|_, duration| tracing::debug!("Retrying RPC request after: {duration:?}"))
.adjust(|e, dur| dur.and_then(|d| policy.backoff_hint(e).or(Some(d))))
.await
})
}
}
impl<S> RetryService<S>
where
S: Service<
RequestPacket,
Response = alloy::rpc::json_rpc::ResponsePacket,
Error = TransportError,
> + Clone
+ Send
+ Sync
+ 'static,
S::Future: Send,
{
async fn call_and_parse_error(
mut self,
request: RequestPacket,
) -> Result<alloy::rpc::json_rpc::ResponsePacket, RpcError<TransportErrorKind>> {
let resp = self.inner.call(request).await?;
if let Some(e) = resp.as_error() {
Err(TransportError::ErrorResp(e.to_owned()))
} else {
Ok(resp)
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use alloy::node_bindings::{Anvil, AnvilInstance};
use crate::{
Environment,
web3::{RpcProvider, RpcProviderBuilder, RpcProviderConfig},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum WithWallet {
Yes,
No,
}
pub(crate) async fn fixture(with_wallet: WithWallet) -> (AnvilInstance, RpcProvider) {
let anvil = Anvil::new().spawn();
let mut rpc_provider_builder =
RpcProviderBuilder::with_config(&RpcProviderConfig::with_default_values(
vec![anvil.endpoint_url()],
anvil.ws_endpoint_url(),
))
.environment(Environment::Dev);
if with_wallet == WithWallet::Yes {
rpc_provider_builder =
rpc_provider_builder.wallet(anvil.wallet().expect("anvil should have a wallet"));
}
let rpc_provider = rpc_provider_builder
.chain_id(31_337)
.build()
.await
.expect("Should be able to spawn on local anvil");
(anvil, rpc_provider)
}
}