use alloy::transports::{RpcError, TransportErrorKind};
use thiserror::Error;
use tokio::{sync::broadcast::error::RecvError, time::error as TokioError};
#[derive(Error, Debug)]
pub enum Error {
#[error("Operation timed out")]
Timeout,
#[error("RPC call failed after exhausting all retry attempts: {0}")]
RpcError(RpcError<TransportErrorKind>),
#[error("Block not found")]
BlockNotFound,
#[error("Subscription channel closed")]
Closed,
#[error("Subscription lagged behind by: {0}")]
Lagged(u64),
}
#[derive(Error, Debug)]
pub enum FailoverError {
#[error("Subscription channel closed")]
Closed,
#[error("Operation timed out")]
Timeout,
#[error("RPC call failed after exhausting all retry attempts: {0}")]
RpcError(RpcError<TransportErrorKind>),
}
impl From<RpcError<TransportErrorKind>> for FailoverError {
fn from(err: RpcError<TransportErrorKind>) -> Self {
FailoverError::RpcError(err)
}
}
impl From<FailoverError> for Error {
fn from(err: FailoverError) -> Self {
match err {
FailoverError::Closed => Error::Closed,
FailoverError::Timeout => Error::Timeout,
FailoverError::RpcError(RpcError::ErrorResp(ref err_resp))
if is_block_not_found(err_resp.code, err_resp.message.as_ref()) =>
{
Error::BlockNotFound
}
FailoverError::RpcError(e) => Error::RpcError(e),
}
}
}
impl From<TokioError::Elapsed> for FailoverError {
fn from(_: TokioError::Elapsed) -> Self {
FailoverError::Timeout
}
}
impl From<RpcError<TransportErrorKind>> for Error {
fn from(err: RpcError<TransportErrorKind>) -> Self {
Error::RpcError(err)
}
}
impl From<TokioError::Elapsed> for Error {
fn from(_: TokioError::Elapsed) -> Self {
Error::Timeout
}
}
impl From<RecvError> for Error {
fn from(err: RecvError) -> Self {
match err {
RecvError::Closed => Error::Closed,
RecvError::Lagged(count) => Error::Lagged(count),
}
}
}
pub(crate) fn is_retryable_error(code: i64, message: &str) -> bool {
let non_retryable = is_block_not_found(code, message) || is_invalid_log_filter(code, message);
!non_retryable
}
pub(crate) fn is_block_not_found(code: i64, message: &str) -> bool {
geth::is_block_not_found(code, message) ||
besu::is_block_not_found(code, message) ||
anvil::is_block_not_found(code, message)
}
pub(crate) fn is_invalid_log_filter(code: i64, message: &str) -> bool {
geth::is_invalid_log_filter(code, message)
}
mod geth {
pub const DEFAULT_ERROR_CODE: i64 = -32000;
pub fn is_block_not_found(code: i64, message: &str) -> bool {
if code != DEFAULT_ERROR_CODE {
return false;
}
matches!(
message,
"pending block is not available"
| "finalized block not found"
| "safe block not found"
|
"earliest header not found"
| "finalized header not found"
| "safe header not found"
|
"header not found"
| "header for hash not found"
) || (
message.starts_with("block") && message.ends_with("not found")
)
}
pub fn is_invalid_log_filter(code: i64, message: &str) -> bool {
matches!(
(code, message),
(
DEFAULT_ERROR_CODE,
"invalid block range params" |
"block range extends beyond current head block" |
"can't specify fromBlock/toBlock with blockHash" |
"pending logs are not supported" |
"unknown block" |
"exceed max topics" |
"exceed max addresses or topics per search position" |
"filter not found"
)
)
}
}
mod besu {
pub const UNKNOWN_BLOCK_ERROR_CODE: i64 = -39001;
pub fn is_block_not_found(code: i64, message: &str) -> bool {
matches!((code, message), (UNKNOWN_BLOCK_ERROR_CODE, "Unknown block"))
}
}
mod anvil {
pub const INVALID_PARAMS_ERROR_CODE: i64 = -32602;
pub fn is_block_not_found(code: i64, message: &str) -> bool {
if code != INVALID_PARAMS_ERROR_CODE {
return false;
}
message.contains("BlockOutOfRangeError")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_geth_block_not_found() {
assert!(geth::is_block_not_found(-32000, "pending block is not available"));
assert!(geth::is_block_not_found(-32000, "finalized block not found"));
assert!(geth::is_block_not_found(-32000, "safe block not found"));
assert!(geth::is_block_not_found(-32000, "header not found"));
assert!(geth::is_block_not_found(-32000, "header for hash not found"));
assert!(geth::is_block_not_found(-32000, "block 12345 not found"));
assert!(geth::is_block_not_found(-32000, "block 0x1234 not found"));
assert!(!geth::is_block_not_found(-32000, "some other error"));
assert!(!geth::is_block_not_found(-32001, "header not found"));
}
#[test]
fn test_geth_invalid_log_filter() {
assert!(geth::is_invalid_log_filter(-32000, "invalid block range params"));
assert!(geth::is_invalid_log_filter(
-32000,
"block range extends beyond current head block"
));
assert!(geth::is_invalid_log_filter(
-32000,
"can't specify fromBlock/toBlock with blockHash"
));
assert!(geth::is_invalid_log_filter(-32000, "pending logs are not supported"));
assert!(geth::is_invalid_log_filter(-32000, "unknown block"));
assert!(geth::is_invalid_log_filter(-32000, "exceed max topics"));
assert!(geth::is_invalid_log_filter(
-32000,
"exceed max addresses or topics per search position"
));
assert!(geth::is_invalid_log_filter(-32000, "filter not found"));
assert!(!geth::is_invalid_log_filter(-32000, "some other error"));
assert!(!geth::is_invalid_log_filter(-32001, "invalid block range params"));
}
#[test]
fn test_besu_block_not_found() {
assert!(besu::is_block_not_found(-39001, "Unknown block"));
assert!(!besu::is_block_not_found(-39001, "some other error"));
assert!(!besu::is_block_not_found(-32000, "Unknown block"));
}
#[test]
fn test_should_retry_rpc_error() {
assert!(!is_retryable_error(-32000, "header not found"));
assert!(!is_retryable_error(-32000, "invalid block range params"));
assert!(!is_retryable_error(-39001, "Unknown block"));
assert!(!is_retryable_error(-32000, "pending logs are not supported"));
assert!(!is_retryable_error(-32000, "unknown block"));
assert!(!is_retryable_error(-32000, "exceed max topics"));
assert!(!is_retryable_error(-32000, "exceed max addresses or topics per search position"));
assert!(!is_retryable_error(-32000, "filter not found"));
assert!(is_retryable_error(-32000, "some transient error"));
assert!(is_retryable_error(-32603, "internal error"));
}
}