use crate::authentication::authenticate;
use crate::debug::chain_config::ChainConfigRequest;
use crate::debug::execution_witness::ExecutionWitnessRequest;
use crate::debug::execution_witness_by_hash::ExecutionWitnessByBlockHashRequest;
use crate::engine::blobs::{BlobsV2Request, BlobsV3Request};
use crate::engine::client_version::GetClientVersionV1Request;
use crate::engine::payload::{
GetPayloadV5Request, GetPayloadV6Request, NewPayloadV5Request, NewPayloadWithWitnessV5Request,
};
use crate::engine::{
ExchangeCapabilitiesRequest,
blobs::BlobsV1Request,
exchange_transition_config::ExchangeTransitionConfigV1Req,
fork_choice::{
ForkChoiceUpdatedV1, ForkChoiceUpdatedV2, ForkChoiceUpdatedV3, ForkChoiceUpdatedV4,
},
payload::{
GetPayloadBodiesByHashV1Request, GetPayloadBodiesByHashV2Request,
GetPayloadBodiesByRangeV1Request, GetPayloadBodiesByRangeV2Request, GetPayloadV1Request,
GetPayloadV2Request, GetPayloadV3Request, GetPayloadV4Request, NewPayloadV1Request,
NewPayloadV2Request, NewPayloadV3Request, NewPayloadV4Request,
},
};
use crate::eth::client::Config;
use crate::eth::{
account::{
GetBalanceRequest, GetCodeRequest, GetProofRequest, GetStorageAtRequest,
GetTransactionCountRequest,
},
block::{
BlockNumberRequest, GetBlobBaseFee, GetBlockByHashRequest, GetBlockByNumberRequest,
GetBlockReceiptsRequest, GetBlockTransactionCountRequest, GetRawBlockRequest,
GetRawHeaderRequest, GetRawReceipts,
},
block_access_list::BlockAccessListRequest,
client::{ChainId, Syncing},
fee_market::FeeHistoryRequest,
filter::{self, ActiveFilters, DeleteFilterRequest, FilterChangesRequest, NewFilterRequest},
gas_price::GasPrice,
gas_tip_estimator::GasTipEstimator,
logs::LogsFilter,
transaction::{
CallRequest, CreateAccessListRequest, EstimateGasRequest, GetRawTransaction,
GetTransactionByBlockHashAndIndexRequest, GetTransactionByBlockNumberAndIndexRequest,
GetTransactionByHashRequest, GetTransactionReceiptRequest,
},
};
use crate::subscription_manager::{SubscriptionManager, SubscriptionManagerProtocol};
use crate::tracing::{TraceBlockByNumberRequest, TraceTransactionRequest};
use crate::types::transaction::SendRawTransactionRequest;
use crate::utils::{
RpcErr, RpcErrorMetadata, RpcErrorResponse, RpcNamespace, RpcRequest, RpcRequestId,
RpcSuccessResponse,
};
use crate::{admin, net};
use crate::{eth, mempool};
use axum::extract::ws::{Message, WebSocket};
use axum::extract::{DefaultBodyLimit, State, WebSocketUpgrade};
use axum::{Json, Router, http::StatusCode, routing::post};
use axum_extra::{
TypedHeader,
headers::{Authorization, authorization::Bearer},
};
use bytes::Bytes;
use ethrex_blockchain::Blockchain;
use ethrex_blockchain::error::ChainError;
use ethrex_common::types::Block;
use ethrex_common::types::block_access_list::BlockAccessList;
use ethrex_common::types::block_execution_witness::ExecutionWitness;
use ethrex_metrics::rpc::{RpcOutcome, record_async_duration, record_rpc_outcome};
use ethrex_p2p::peer_handler::PeerHandler;
use ethrex_p2p::sync_manager::SyncManager;
use ethrex_p2p::types::Node;
use ethrex_p2p::types::NodeRecord;
use ethrex_storage::Store;
use serde::Deserialize;
use serde_json::Value;
use spawned_concurrency::tasks::ActorRef;
use std::{
collections::{HashMap, HashSet},
future::IntoFuture,
net::SocketAddr,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::net::TcpListener;
use tokio::sync::{
Mutex as TokioMutex,
mpsc::{UnboundedSender, unbounded_channel},
oneshot,
};
use tokio::time::timeout;
use tower_http::cors::CorsLayer;
use tracing::{error, info, warn};
use tracing_subscriber::{EnvFilter, Registry, reload};
#[cfg(all(feature = "jemalloc_profiling", target_os = "linux"))]
use axum::response::IntoResponse;
#[cfg(all(feature = "jemalloc_profiling", target_os = "linux"))]
pub async fn handle_get_heap() -> Result<impl IntoResponse, (StatusCode, String)> {
let Some(mutex) = jemalloc_pprof::PROF_CTL.as_ref() else {
return Err((
StatusCode::NOT_IMPLEMENTED,
"jemalloc profiling is not available".into(),
));
};
let mut prof_ctl = mutex.lock().await;
require_profiling_activated(&prof_ctl)?;
let pprof = prof_ctl
.dump_pprof()
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
Ok(pprof)
}
#[cfg(all(feature = "jemalloc_profiling", target_os = "linux"))]
fn require_profiling_activated(
prof_ctl: &jemalloc_pprof::JemallocProfCtl,
) -> Result<(), (StatusCode, String)> {
if prof_ctl.activated() {
Ok(())
} else {
Err((
axum::http::StatusCode::FORBIDDEN,
"heap profiling not activated".into(),
))
}
}
#[cfg(all(feature = "jemalloc_profiling", target_os = "linux"))]
pub async fn handle_get_heap_flamegraph() -> Result<impl IntoResponse, (StatusCode, String)> {
use axum::body::Body;
use axum::http::header::CONTENT_TYPE;
use axum::response::Response;
let Some(mutex) = jemalloc_pprof::PROF_CTL.as_ref() else {
return Err((
StatusCode::NOT_IMPLEMENTED,
"jemalloc profiling is not available".into(),
));
};
let mut prof_ctl = mutex.lock().await;
require_profiling_activated(&prof_ctl)?;
let svg = prof_ctl
.dump_flamegraph()
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
Response::builder()
.header(CONTENT_TYPE, "image/svg+xml")
.body(Body::from(svg))
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
}
#[cfg(not(all(feature = "jemalloc_profiling", target_os = "linux")))]
pub async fn handle_get_heap() -> Result<(), (StatusCode, String)> {
Err((
StatusCode::NOT_IMPLEMENTED,
"jemalloc profiling is not available (build with `ethrex-rpc/jemalloc_profiling`, it only works on linux)".into(),
))
}
#[cfg(not(all(feature = "jemalloc_profiling", target_os = "linux")))]
pub async fn handle_get_heap_flamegraph() -> Result<(), (StatusCode, String)> {
Err((
StatusCode::NOT_IMPLEMENTED,
"jemalloc profiling is not available (build with `ethrex-rpc/jemalloc_profiling`, it only works on linux)".into(),
))
}
#[derive(Deserialize)]
#[serde(untagged)]
pub enum RpcRequestWrapper {
Single(RpcRequest),
Multiple(Vec<RpcRequest>),
}
type BlockWorkerMessage = (
oneshot::Sender<Result<Option<ExecutionWitness>, ChainError>>,
Block,
Option<BlockAccessList>,
bool,
);
#[derive(Clone)]
pub struct RpcApiContext {
pub storage: Store,
pub blockchain: Arc<Blockchain>,
pub active_filters: ActiveFilters,
pub syncer: Option<Arc<SyncManager>>,
pub peer_handler: Option<PeerHandler>,
pub node_data: NodeData,
pub gas_tip_estimator: Arc<TokioMutex<GasTipEstimator>>,
pub log_filter_handler: Option<reload::Handle<EnvFilter, Registry>>,
pub gas_ceil: u64,
pub block_worker_channel: UnboundedSender<BlockWorkerMessage>,
pub ws: Option<WebSocketConfig>,
pub allowed_namespaces: Arc<HashSet<RpcNamespace>>,
}
#[derive(Clone)]
pub struct WebSocketConfig {
pub addr: SocketAddr,
pub subscription_manager: ActorRef<SubscriptionManager>,
}
impl std::fmt::Debug for RpcApiContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct("RpcApiContext");
s.field("storage", &self.storage)
.field("blockchain", &self.blockchain)
.field("syncer", &self.syncer.as_ref().map(|_| ".."))
.field("peer_handler", &self.peer_handler.as_ref().map(|_| ".."))
.field("gas_ceil", &self.gas_ceil);
s.finish()
}
}
#[derive(Debug, Clone)]
pub struct ClientVersion {
pub name: String,
pub version: String,
pub branch: String,
pub commit: String,
pub os_arch: String,
pub rustc_version: String,
formatted: String,
}
impl ClientVersion {
pub fn new(
name: String,
version: String,
branch: String,
commit: String,
os_arch: String,
rustc_version: String,
) -> Self {
let formatted = format!(
"{}/v{}-{}-{}/{}/rustc-v{}",
name, version, branch, commit, os_arch, rustc_version
);
Self {
name,
version,
branch,
commit,
os_arch,
rustc_version,
formatted,
}
}
}
impl std::fmt::Display for ClientVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.formatted)
}
}
#[derive(Debug, Clone)]
pub struct NodeData {
pub jwt_secret: Bytes,
pub local_p2p_node: Node,
pub local_node_record: NodeRecord,
pub client_version: ClientVersion,
pub extra_data: Bytes,
}
#[allow(async_fn_in_trait)]
pub trait RpcHandler: Sized {
fn parse(params: &Option<Vec<Value>>) -> Result<Self, RpcErr>;
async fn call(req: &RpcRequest, context: RpcApiContext) -> Result<Value, RpcErr> {
let request = Self::parse(&req.params)?;
let namespace = match req.namespace() {
Ok(RpcNamespace::Engine) => "engine",
_ => "rpc",
};
let method = req.method.as_str();
let result =
record_async_duration(
namespace,
method,
async move { request.handle(context).await },
)
.await;
let outcome = match &result {
Ok(_) => RpcOutcome::Success,
Err(err) => RpcOutcome::Error(get_error_kind(err)),
};
record_rpc_outcome(namespace, method, outcome);
result
}
async fn handle(&self, context: RpcApiContext) -> Result<Value, RpcErr>;
}
fn get_error_kind(err: &RpcErr) -> &'static str {
match err {
RpcErr::MethodNotFound(_) => "MethodNotFound",
RpcErr::WrongParam(_) => "WrongParam",
RpcErr::BadParams(_) => "BadParams",
RpcErr::InvalidRequest(_) => "InvalidRequest",
RpcErr::MissingParam(_) => "MissingParam",
RpcErr::TooLargeRequest => "TooLargeRequest",
RpcErr::BadHexFormat(_) => "BadHexFormat",
RpcErr::UnsupportedFork(_) => "UnsupportedFork",
RpcErr::Internal(_) => "Internal",
RpcErr::Vm(_) => "Vm",
RpcErr::Revert { .. } => "Revert",
RpcErr::Halt { .. } => "Halt",
RpcErr::AuthenticationError(_) => "AuthenticationError",
RpcErr::InvalidForkChoiceState(_) => "InvalidForkChoiceState",
RpcErr::InvalidPayloadAttributes(_) => "InvalidPayloadAttributes",
RpcErr::TooDeepReorg(_) => "TooDeepReorg",
RpcErr::UnknownPayload(_) => "UnknownPayload",
RpcErr::InvalidProofFormat(_) => "InvalidProofFormat",
RpcErr::InvalidHeaderFormat(_) => "InvalidHeaderFormat",
RpcErr::InvalidPayload(_) => "InvalidPayload",
RpcErr::ProofGenerationUnavailable(_) => "ProofGenerationUnavailable",
}
}
pub const FILTER_DURATION: Duration = {
if cfg!(test) {
Duration::from_secs(1)
} else {
Duration::from_secs(5 * 60)
}
};
pub fn start_block_executor(blockchain: Arc<Blockchain>) -> UnboundedSender<BlockWorkerMessage> {
let (block_worker_channel, mut block_receiver) = unbounded_channel::<BlockWorkerMessage>();
std::thread::Builder::new()
.name("block_executor".to_string())
.spawn(move || {
while let Some((notify, block, bal, make_witness)) = block_receiver.blocking_recv() {
let result = (|| {
if make_witness {
let witness =
blockchain.add_block_pipeline_with_witness(block, bal.as_ref())?;
Ok(Some(witness))
} else {
blockchain.add_block_pipeline(block, bal.as_ref())?;
Ok(None)
}
})();
let _ = notify
.send(result)
.inspect_err(|_| tracing::error!("failed to notify caller"));
}
})
.expect("Falied to spawn block_executor thread");
block_worker_channel
}
#[allow(clippy::too_many_arguments)]
pub async fn start_api(
http_addr: SocketAddr,
ws: Option<WebSocketConfig>,
authrpc_addr: SocketAddr,
storage: Store,
blockchain: Arc<Blockchain>,
jwt_secret: Bytes,
local_p2p_node: Node,
local_node_record: NodeRecord,
syncer: SyncManager,
peer_handler: PeerHandler,
client_version: ClientVersion,
log_filter_handler: Option<reload::Handle<EnvFilter, Registry>>,
gas_ceil: u64,
extra_data: String,
allowed_namespaces: HashSet<RpcNamespace>,
) -> Result<(), RpcErr> {
let active_filters = Arc::new(Mutex::new(HashMap::new()));
let block_worker_channel = start_block_executor(blockchain.clone());
let service_context = RpcApiContext {
storage,
blockchain,
active_filters: active_filters.clone(),
syncer: Some(Arc::new(syncer)),
peer_handler: Some(peer_handler),
node_data: NodeData {
jwt_secret,
local_p2p_node,
local_node_record,
client_version,
extra_data: extra_data.into(),
},
gas_tip_estimator: Arc::new(TokioMutex::new(GasTipEstimator::new())),
log_filter_handler,
gas_ceil,
block_worker_channel,
ws: ws.clone(),
allowed_namespaces: Arc::new(allowed_namespaces),
};
tokio::task::spawn(async move {
let mut interval = tokio::time::interval(FILTER_DURATION);
let filters = active_filters.clone();
loop {
interval.tick().await;
tracing::debug!("Running filter clean task");
filter::clean_outdated_filters(filters.clone(), FILTER_DURATION);
tracing::debug!("Filter clean task complete");
}
});
let cors = CorsLayer::permissive();
let http_router = Router::new()
.route("/debug/pprof/allocs", axum::routing::get(handle_get_heap))
.route(
"/debug/pprof/allocs/flamegraph",
axum::routing::get(handle_get_heap_flamegraph),
)
.route("/", post(handle_http_request))
.layer(cors.clone())
.with_state(service_context.clone());
let http_listener = TcpListener::bind(http_addr)
.await
.map_err(|error| RpcErr::Internal(error.to_string()))?;
let http_server = axum::serve(http_listener, http_router)
.with_graceful_shutdown(shutdown_signal())
.into_future();
info!("Starting HTTP server at {http_addr}");
let (timer_sender, mut timer_receiver) = tokio::sync::watch::channel(());
tokio::spawn(async move {
loop {
let result = timeout(Duration::from_secs(30), timer_receiver.changed()).await;
if result.is_err() {
warn!("No messages from the consensus layer. Is the consensus client running?");
}
}
});
let authrpc_handler = move |ctx, auth, body| async move {
let _ = timer_sender.send(());
handle_authrpc_request(ctx, auth, body).await
};
let authrpc_router = Router::new()
.route("/", post(authrpc_handler))
.with_state(service_context.clone())
.layer(DefaultBodyLimit::max(256 * 1024 * 1024));
let authrpc_listener = TcpListener::bind(authrpc_addr)
.await
.map_err(|error| RpcErr::Internal(error.to_string()))?;
let authrpc_server = axum::serve(authrpc_listener, authrpc_router)
.with_graceful_shutdown(shutdown_signal())
.into_future();
info!("Starting Auth-RPC server at {authrpc_addr}");
if let Some(ref ws_config) = ws {
let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async move {
ws.on_upgrade(|mut socket| async move {
handle_websocket(&mut socket, &ctx, |req| {
let c = ctx.clone();
async move { map_http_requests(&req, c).await }
})
.await;
})
};
let ws_router = Router::new()
.route("/", axum::routing::any(ws_handler))
.layer(cors)
.with_state(service_context);
let ws_listener = TcpListener::bind(ws_config.addr)
.await
.map_err(|error| RpcErr::Internal(error.to_string()))?;
let ws_server = axum::serve(ws_listener, ws_router)
.with_graceful_shutdown(shutdown_signal())
.into_future();
info!("Starting WS server at {}", ws_config.addr);
let _ = tokio::try_join!(authrpc_server, http_server, ws_server)
.inspect_err(|e| error!("Error shutting down servers: {e:?}"));
} else {
let _ = tokio::try_join!(authrpc_server, http_server)
.inspect_err(|e| error!("Error shutting down servers: {e:?}"));
}
Ok(())
}
pub async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
}
const MAX_BATCH_SIZE: usize = 1000;
fn null_id_error(err: RpcErr) -> Value {
let meta: RpcErrorMetadata = err.into();
serde_json::json!({
"jsonrpc": "2.0",
"id": Value::Null,
"error": meta,
})
}
fn validate_batch(wrapper: &RpcRequestWrapper) -> Option<Value> {
let RpcRequestWrapper::Multiple(requests) = wrapper else {
return None;
};
if requests.is_empty() {
return Some(null_id_error(RpcErr::InvalidRequest(
"empty batch is not a valid Request".to_string(),
)));
}
if requests.len() > MAX_BATCH_SIZE {
return Some(null_id_error(RpcErr::InvalidRequest(format!(
"batch too large: {} > {MAX_BATCH_SIZE}",
requests.len()
))));
}
None
}
pub(crate) async fn handle_http_request(
State(service_context): State<RpcApiContext>,
body: String,
) -> Result<Json<Value>, StatusCode> {
let wrapper: RpcRequestWrapper = match serde_json::from_str(&body) {
Ok(w) => w,
Err(_) => {
return Ok(Json(
rpc_response(
RpcRequestId::String("".to_string()),
Err(RpcErr::BadParams("Invalid request body".to_string())),
)
.map_err(|_| StatusCode::BAD_REQUEST)?,
));
}
};
if let Some(err) = validate_batch(&wrapper) {
return Ok(Json(err));
}
let res = match wrapper {
RpcRequestWrapper::Single(request) => {
let res = map_http_requests(&request, service_context).await;
rpc_response(request.id, res).map_err(|_| StatusCode::BAD_REQUEST)?
}
RpcRequestWrapper::Multiple(requests) => {
let mut responses = Vec::with_capacity(requests.len());
for req in requests {
let res = map_http_requests(&req, service_context.clone()).await;
responses.push(rpc_response(req.id, res).map_err(|_| StatusCode::BAD_REQUEST)?);
}
serde_json::to_value(responses).map_err(|_| StatusCode::BAD_REQUEST)?
}
};
Ok(Json(res))
}
pub async fn handle_authrpc_request(
State(service_context): State<RpcApiContext>,
auth_header: Option<TypedHeader<Authorization<Bearer>>>,
body: String,
) -> Result<Json<Value>, StatusCode> {
let wrapper: RpcRequestWrapper = match serde_json::from_str(&body) {
Ok(w) => w,
Err(_) => {
return Ok(Json(null_id_error(RpcErr::InvalidRequest(
"could not parse JSON-RPC request body".to_string(),
))));
}
};
if let Some(err) = validate_batch(&wrapper) {
return Ok(Json(err));
}
if let Err(error) = authenticate(&service_context.node_data.jwt_secret, auth_header) {
let error_meta: RpcErrorMetadata = error.into();
let res = match wrapper {
RpcRequestWrapper::Single(req) => serde_json::json!({
"jsonrpc": "2.0",
"id": req.id,
"error": error_meta,
}),
RpcRequestWrapper::Multiple(requests) => {
let mut responses = Vec::with_capacity(requests.len());
for req in requests {
responses.push(serde_json::json!({
"jsonrpc": "2.0",
"id": req.id,
"error": error_meta.clone(),
}));
}
serde_json::to_value(responses).map_err(|_| StatusCode::BAD_REQUEST)?
}
};
return Ok(Json(res));
}
let res = match wrapper {
RpcRequestWrapper::Single(req) => {
let res = map_authrpc_requests(&req, service_context).await;
rpc_response(req.id, res).map_err(|_| StatusCode::BAD_REQUEST)?
}
RpcRequestWrapper::Multiple(requests) => {
let mut responses = Vec::with_capacity(requests.len());
for req in requests {
let res = map_authrpc_requests(&req, service_context.clone()).await;
responses.push(rpc_response(req.id, res).map_err(|_| StatusCode::BAD_REQUEST)?);
}
serde_json::to_value(responses).map_err(|_| StatusCode::BAD_REQUEST)?
}
};
Ok(Json(res))
}
pub async fn handle_websocket<F, Fut, E>(
socket: &mut WebSocket,
context: &RpcApiContext,
route_request: F,
) where
F: Fn(RpcRequest) -> Fut,
Fut: std::future::Future<Output = Result<Value, E>>,
E: Into<RpcErrorMetadata>,
{
let (out_tx, mut out_rx) = tokio::sync::mpsc::channel::<String>(
crate::subscription_manager::SUBSCRIBER_CHANNEL_CAPACITY,
);
let mut subscription_ids: Vec<String> = Vec::new();
loop {
tokio::select! {
msg = socket.recv() => {
let Some(msg) = msg else { break };
let body = match msg {
Ok(Message::Text(text)) => text.to_string(),
Ok(Message::Close(_)) => break,
Ok(_) => continue,
Err(_) => break,
};
let response = handle_ws_request(
&body, context, &out_tx, &mut subscription_ids, &route_request,
).await;
if let Some(resp) = response
&& socket.send(Message::Text(resp.into())).await.is_err()
{
break;
}
}
Some(msg) = out_rx.recv() => {
if socket.send(Message::Text(msg.into())).await.is_err() {
break;
}
}
}
}
if let Some(ws) = &context.ws {
for id in subscription_ids {
let _ = ws.subscription_manager.unsubscribe(id).await;
}
}
}
async fn handle_ws_request<F, Fut, E>(
body: &str,
context: &RpcApiContext,
out_tx: &tokio::sync::mpsc::Sender<String>,
subscription_ids: &mut Vec<String>,
route_request: &F,
) -> Option<String>
where
F: Fn(RpcRequest) -> Fut,
Fut: std::future::Future<Output = Result<Value, E>>,
E: Into<RpcErrorMetadata>,
{
let parsed: Value = match serde_json::from_str(body) {
Ok(v) => v,
Err(_) => return Some(ws_error_response(None, -32700, "Parse error")),
};
let wrapper: RpcRequestWrapper = match serde_json::from_value(parsed) {
Ok(w) => w,
Err(_) => return Some(ws_error_response(None, -32600, "Invalid Request")),
};
match wrapper {
RpcRequestWrapper::Single(req) => {
let resp =
process_ws_request(req, context, out_tx, subscription_ids, route_request).await?;
Some(resp.to_string())
}
RpcRequestWrapper::Multiple(reqs) => {
if reqs.is_empty() {
return Some(ws_error_response(None, -32600, "Invalid Request"));
}
let mut responses = Vec::with_capacity(reqs.len());
for req in reqs {
if let Some(resp) =
process_ws_request(req, context, out_tx, subscription_ids, route_request).await
{
responses.push(resp);
}
}
if responses.is_empty() {
None
} else {
serde_json::to_string(&responses).ok()
}
}
}
}
async fn process_ws_request<F, Fut, E>(
req: RpcRequest,
context: &RpcApiContext,
out_tx: &tokio::sync::mpsc::Sender<String>,
subscription_ids: &mut Vec<String>,
route_request: &F,
) -> Option<Value>
where
F: Fn(RpcRequest) -> Fut,
Fut: std::future::Future<Output = Result<Value, E>>,
E: Into<RpcErrorMetadata>,
{
match req.method.as_str() {
"eth_subscribe" | "eth_unsubscribe" => {
if !context.allowed_namespaces.contains(&RpcNamespace::Eth) {
let err: Result<Value, RpcErr> = Err(RpcErr::MethodNotFound(req.method.clone()));
return rpc_response(req.id, err).ok();
}
let result = if req.method == "eth_subscribe" {
handle_eth_subscribe(&req, context, out_tx, subscription_ids).await
} else {
handle_eth_unsubscribe(&req, context, subscription_ids).await
};
rpc_response(req.id, result).ok()
}
_ => {
let id = req.id.clone();
let res = route_request(req).await;
rpc_response(id, res).ok()
}
}
}
fn ws_error_response(id: Option<RpcRequestId>, code: i32, message: &str) -> String {
let id = match id {
Some(id) => serde_json::to_value(id).unwrap_or(Value::Null),
None => Value::Null,
};
serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"error": { "code": code, "message": message },
})
.to_string()
}
pub async fn handle_eth_subscribe(
req: &crate::utils::RpcRequest,
context: &RpcApiContext,
out_tx: &tokio::sync::mpsc::Sender<String>,
subscription_ids: &mut Vec<String>,
) -> Result<Value, RpcErr> {
use crate::subscription_manager::MAX_SUBSCRIPTIONS_PER_CONNECTION;
let params = req.params.as_deref().unwrap_or(&[]);
let sub_type = params.first().and_then(|v| v.as_str()).ok_or_else(|| {
RpcErr::BadParams("eth_subscribe requires a subscription type parameter".to_string())
})?;
if subscription_ids.len() >= MAX_SUBSCRIPTIONS_PER_CONNECTION {
return Err(RpcErr::BadParams(format!(
"Too many subscriptions (max {MAX_SUBSCRIPTIONS_PER_CONNECTION})"
)));
}
match sub_type {
"newHeads" => {
let ws = context
.ws
.as_ref()
.ok_or_else(|| RpcErr::Internal("WebSocket server not enabled".to_string()))?;
let id = ws
.subscription_manager
.subscribe(out_tx.clone())
.await
.map_err(|e| RpcErr::Internal(format!("Subscription failed: {e}")))?
.ok_or_else(|| RpcErr::Internal("Global subscription cap reached".to_string()))?;
subscription_ids.push(id.clone());
Ok(Value::String(id))
}
other => Err(RpcErr::BadParams(format!(
"Unsupported subscription type: {other}"
))),
}
}
pub async fn handle_eth_unsubscribe(
req: &crate::utils::RpcRequest,
context: &RpcApiContext,
subscription_ids: &mut Vec<String>,
) -> Result<Value, RpcErr> {
let params = req.params.as_deref().unwrap_or(&[]);
let sub_id = params
.first()
.and_then(|v| v.as_str())
.ok_or_else(|| {
RpcErr::BadParams("eth_unsubscribe requires a subscription ID parameter".to_string())
})?
.to_string();
let Some(pos) = subscription_ids.iter().position(|id| id == &sub_id) else {
return Ok(Value::Bool(false));
};
let removed = if let Some(ref ws) = context.ws {
ws.subscription_manager
.unsubscribe(sub_id)
.await
.unwrap_or(false)
} else {
false
};
if removed {
subscription_ids.swap_remove(pos);
}
Ok(Value::Bool(removed))
}
pub async fn map_http_requests(req: &RpcRequest, context: RpcApiContext) -> Result<Value, RpcErr> {
let namespace = match req.namespace() {
Ok(ns) => ns,
Err(rpc_err) => return Err(rpc_err),
};
if !context.allowed_namespaces.contains(&namespace) {
return Err(RpcErr::MethodNotFound(req.method.clone()));
}
match namespace {
RpcNamespace::Eth => map_eth_requests(req, context).await,
RpcNamespace::Admin => map_admin_requests(req, context).await,
RpcNamespace::Debug => map_debug_requests(req, context).await,
RpcNamespace::Web3 => map_web3_requests(req, context),
RpcNamespace::Net => map_net_requests(req, context).await,
RpcNamespace::Mempool => map_mempool_requests(req, context),
RpcNamespace::Engine => Err(RpcErr::MethodNotFound(req.method.clone())),
}
}
pub async fn map_authrpc_requests(
req: &RpcRequest,
context: RpcApiContext,
) -> Result<Value, RpcErr> {
match req.namespace() {
Ok(RpcNamespace::Engine) => map_engine_requests(req, context).await,
Ok(RpcNamespace::Eth) => map_eth_requests(req, context).await,
_ => Err(RpcErr::MethodNotFound(req.method.clone())),
}
}
pub async fn map_eth_requests(req: &RpcRequest, context: RpcApiContext) -> Result<Value, RpcErr> {
match req.method.as_str() {
"eth_chainId" => ChainId::call(req, context).await,
"eth_syncing" => Syncing::call(req, context).await,
"eth_getBlockByNumber" => GetBlockByNumberRequest::call(req, context).await,
"eth_getBlockByHash" => GetBlockByHashRequest::call(req, context).await,
"eth_getBalance" => GetBalanceRequest::call(req, context).await,
"eth_getCode" => GetCodeRequest::call(req, context).await,
"eth_getStorageAt" => GetStorageAtRequest::call(req, context).await,
"eth_getBlockTransactionCountByNumber" => {
GetBlockTransactionCountRequest::call(req, context).await
}
"eth_getBlockTransactionCountByHash" => {
GetBlockTransactionCountRequest::call(req, context).await
}
"eth_getTransactionByBlockNumberAndIndex" => {
GetTransactionByBlockNumberAndIndexRequest::call(req, context).await
}
"eth_getTransactionByBlockHashAndIndex" => {
GetTransactionByBlockHashAndIndexRequest::call(req, context).await
}
"eth_getBlockReceipts" => GetBlockReceiptsRequest::call(req, context).await,
"eth_getBlockAccessList" => BlockAccessListRequest::call(req, context).await,
"eth_getTransactionByHash" => GetTransactionByHashRequest::call(req, context).await,
"eth_getTransactionReceipt" => GetTransactionReceiptRequest::call(req, context).await,
"eth_createAccessList" => CreateAccessListRequest::call(req, context).await,
"eth_blockNumber" => BlockNumberRequest::call(req, context).await,
"eth_call" => CallRequest::call(req, context).await,
"eth_blobBaseFee" => GetBlobBaseFee::call(req, context).await,
"eth_getTransactionCount" => GetTransactionCountRequest::call(req, context).await,
"eth_feeHistory" => FeeHistoryRequest::call(req, context).await,
"eth_estimateGas" => EstimateGasRequest::call(req, context).await,
"eth_getLogs" => LogsFilter::call(req, context).await,
"eth_newFilter" => {
NewFilterRequest::stateful_call(req, context.storage, context.active_filters).await
}
"eth_uninstallFilter" => {
DeleteFilterRequest::stateful_call(req, context.storage, context.active_filters)
}
"eth_getFilterChanges" => {
FilterChangesRequest::stateful_call(req, context.storage, context.active_filters).await
}
"eth_sendRawTransaction" => SendRawTransactionRequest::call(req, context).await,
"eth_getProof" => GetProofRequest::call(req, context).await,
"eth_gasPrice" => GasPrice::call(req, context).await,
"eth_maxPriorityFeePerGas" => {
eth::max_priority_fee::MaxPriorityFee::call(req, context).await
}
"eth_config" => Config::call(req, context).await,
unknown_eth_method => Err(RpcErr::MethodNotFound(unknown_eth_method.to_owned())),
}
}
pub async fn map_debug_requests(req: &RpcRequest, context: RpcApiContext) -> Result<Value, RpcErr> {
match req.method.as_str() {
"debug_getRawHeader" => GetRawHeaderRequest::call(req, context).await,
"debug_getRawBlock" => GetRawBlockRequest::call(req, context).await,
"debug_getRawTransaction" => GetRawTransaction::call(req, context).await,
"debug_getRawReceipts" => GetRawReceipts::call(req, context).await,
"debug_executionWitness" => ExecutionWitnessRequest::call(req, context).await,
"debug_executionWitnessByBlockHash" => {
ExecutionWitnessByBlockHashRequest::call(req, context).await
}
"debug_chainConfig" => ChainConfigRequest::call(req, context).await,
"debug_traceTransaction" => TraceTransactionRequest::call(req, context).await,
"debug_traceBlockByNumber" => TraceBlockByNumberRequest::call(req, context).await,
unknown_debug_method => Err(RpcErr::MethodNotFound(unknown_debug_method.to_owned())),
}
}
pub async fn map_engine_requests(
req: &RpcRequest,
context: RpcApiContext,
) -> Result<Value, RpcErr> {
match req.method.as_str() {
"engine_exchangeCapabilities" => ExchangeCapabilitiesRequest::call(req, context).await,
"engine_forkchoiceUpdatedV1" => ForkChoiceUpdatedV1::call(req, context).await,
"engine_forkchoiceUpdatedV2" => ForkChoiceUpdatedV2::call(req, context).await,
"engine_forkchoiceUpdatedV3" => ForkChoiceUpdatedV3::call(req, context).await,
"engine_forkchoiceUpdatedV4" => ForkChoiceUpdatedV4::call(req, context).await,
"engine_newPayloadWithWitnessV5" => {
Box::pin(NewPayloadWithWitnessV5Request::call(req, context)).await
}
"engine_newPayloadV5" => Box::pin(NewPayloadV5Request::call(req, context)).await,
"engine_newPayloadV4" => Box::pin(NewPayloadV4Request::call(req, context)).await,
"engine_newPayloadV3" => Box::pin(NewPayloadV3Request::call(req, context)).await,
"engine_newPayloadV2" => NewPayloadV2Request::call(req, context).await,
"engine_newPayloadV1" => NewPayloadV1Request::call(req, context).await,
"engine_exchangeTransitionConfigurationV1" => {
ExchangeTransitionConfigV1Req::call(req, context).await
}
"engine_getPayloadV6" => GetPayloadV6Request::call(req, context).await,
"engine_getPayloadV5" => GetPayloadV5Request::call(req, context).await,
"engine_getPayloadV4" => GetPayloadV4Request::call(req, context).await,
"engine_getPayloadV3" => GetPayloadV3Request::call(req, context).await,
"engine_getPayloadV2" => GetPayloadV2Request::call(req, context).await,
"engine_getPayloadV1" => GetPayloadV1Request::call(req, context).await,
"engine_getPayloadBodiesByHashV1" => {
GetPayloadBodiesByHashV1Request::call(req, context).await
}
"engine_getPayloadBodiesByRangeV1" => {
GetPayloadBodiesByRangeV1Request::call(req, context).await
}
"engine_getPayloadBodiesByHashV2" => {
GetPayloadBodiesByHashV2Request::call(req, context).await
}
"engine_getPayloadBodiesByRangeV2" => {
GetPayloadBodiesByRangeV2Request::call(req, context).await
}
"engine_getBlobsV1" => BlobsV1Request::call(req, context).await,
"engine_getBlobsV2" => BlobsV2Request::call(req, context).await,
"engine_getBlobsV3" => BlobsV3Request::call(req, context).await,
"engine_getClientVersionV1" => GetClientVersionV1Request::call(req, context).await,
unknown_engine_method => Err(RpcErr::MethodNotFound(unknown_engine_method.to_owned())),
}
}
pub async fn map_admin_requests(
req: &RpcRequest,
mut context: RpcApiContext,
) -> Result<Value, RpcErr> {
match req.method.as_str() {
"admin_nodeInfo" => admin::node_info(context.storage, &context.node_data).await,
"admin_peers" => admin::peers(&mut context).await,
"admin_peerScores" => admin::peer_scores(&mut context).await,
"admin_syncStatus" => admin::sync_status(&mut context).await,
"admin_setLogLevel" => admin::set_log_level(req, &context.log_filter_handler),
"admin_addPeer" => admin::add_peer(&mut context, req).await,
unknown_admin_method => Err(RpcErr::MethodNotFound(unknown_admin_method.to_owned())),
}
}
pub fn map_web3_requests(req: &RpcRequest, context: RpcApiContext) -> Result<Value, RpcErr> {
match req.method.as_str() {
"web3_clientVersion" => Ok(Value::String(context.node_data.client_version.to_string())),
unknown_web3_method => Err(RpcErr::MethodNotFound(unknown_web3_method.to_owned())),
}
}
pub async fn map_net_requests(req: &RpcRequest, contex: RpcApiContext) -> Result<Value, RpcErr> {
match req.method.as_str() {
"net_version" => net::version(req, contex),
"net_peerCount" => net::peer_count(req, contex).await,
unknown_net_method => Err(RpcErr::MethodNotFound(unknown_net_method.to_owned())),
}
}
pub fn map_mempool_requests(req: &RpcRequest, contex: RpcApiContext) -> Result<Value, RpcErr> {
match req.method.as_str() {
"txpool_content" => mempool::content(contex),
"txpool_contentFrom" => mempool::content_from(&req.params, contex),
"txpool_status" => mempool::status(contex),
"txpool_inspect" => mempool::inspect(contex),
unknown_mempool_method => Err(RpcErr::MethodNotFound(unknown_mempool_method.to_owned())),
}
}
pub fn rpc_response<E>(id: RpcRequestId, res: Result<Value, E>) -> Result<Value, RpcErr>
where
E: Into<RpcErrorMetadata>,
{
Ok(match res {
Ok(result) => serde_json::to_value(RpcSuccessResponse {
id,
jsonrpc: "2.0".to_string(),
result,
}),
Err(error) => serde_json::to_value(RpcErrorResponse {
id,
jsonrpc: "2.0".to_string(),
error: error.into(),
}),
}?)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::default_context_with_storage;
use ethrex_common::{
H160,
types::{BlockHeader, ChainConfig, Genesis},
};
use ethrex_crypto::keccak::keccak_hash;
use ethrex_storage::{EngineType, Store};
use std::io::BufReader;
use std::str::FromStr;
use std::{fs::File, path::Path};
#[tokio::test]
async fn http_api_allowlist_blocks_debug_namespace_by_default() {
let body = r#"{"jsonrpc":"2.0","method":"debug_traceTransaction","params":["0x0"],"id":1}"#;
let request: RpcRequest = serde_json::from_str(body).unwrap();
let mut storage =
Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
storage
.set_chain_config(&example_chain_config())
.await
.unwrap();
let mut context = default_context_with_storage(storage).await;
context.allowed_namespaces = Arc::new(crate::DEFAULT_HTTP_API.iter().copied().collect());
let result = map_http_requests(&request, context).await;
match result {
Err(RpcErr::MethodNotFound(method)) => {
assert_eq!(method, "debug_traceTransaction");
}
other => panic!("expected MethodNotFound, got {other:?}"),
}
}
#[tokio::test]
async fn http_api_allowlist_default_routes_standard_namespaces() {
let mut storage =
Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
storage
.set_chain_config(&example_chain_config())
.await
.unwrap();
let mut context = default_context_with_storage(storage).await;
context.allowed_namespaces = Arc::new(crate::DEFAULT_HTTP_API.iter().copied().collect());
for method in ["eth_chainId", "net_version", "web3_clientVersion"] {
let body = format!(r#"{{"jsonrpc":"2.0","method":"{method}","params":[],"id":1}}"#);
let request: RpcRequest = serde_json::from_str(&body).unwrap();
let result = map_http_requests(&request, context.clone()).await;
assert!(
!matches!(result, Err(RpcErr::MethodNotFound(_))),
"default allowlist should route {method}, got {result:?}"
);
}
}
#[tokio::test]
async fn ws_subscribe_blocked_when_eth_namespace_disabled() {
let body = r#"{"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"],"id":1}"#;
let request: RpcRequest = serde_json::from_str(body).unwrap();
let mut storage =
Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
storage
.set_chain_config(&example_chain_config())
.await
.unwrap();
let mut context = default_context_with_storage(storage).await;
let mut without_eth: HashSet<RpcNamespace> = crate::test_utils::all_namespaces_for_tests();
without_eth.remove(&RpcNamespace::Eth);
context.allowed_namespaces = Arc::new(without_eth);
let (out_tx, _out_rx) = tokio::sync::mpsc::channel::<String>(1);
let mut subscription_ids: Vec<String> = Vec::new();
let route_request = |_req: RpcRequest| async move {
panic!(
"route_request must not be called for eth_subscribe when the namespace is disabled"
);
#[allow(unreachable_code)]
Ok::<Value, RpcErr>(Value::Null)
};
let response = process_ws_request(
request,
&context,
&out_tx,
&mut subscription_ids,
&route_request,
)
.await
.expect("process_ws_request should return an error response");
let err = response.get("error").expect("expected error field");
assert_eq!(
err.get("code").and_then(|v| v.as_i64()),
Some(-32601),
"expected MethodNotFound (-32601), got {response}"
);
assert!(
subscription_ids.is_empty(),
"no subscription should have been registered"
);
}
#[tokio::test]
async fn engine_namespace_rejected_on_http() {
let body = r#"{"jsonrpc":"2.0","method":"engine_forkchoiceUpdatedV3","params":[],"id":1}"#;
let request: RpcRequest = serde_json::from_str(body).unwrap();
let mut storage =
Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
storage
.set_chain_config(&example_chain_config())
.await
.unwrap();
let mut context = default_context_with_storage(storage).await;
let mut all_with_engine: HashSet<RpcNamespace> =
crate::test_utils::all_namespaces_for_tests();
all_with_engine.insert(RpcNamespace::Engine);
context.allowed_namespaces = Arc::new(all_with_engine);
let result = map_http_requests(&request, context).await;
assert!(matches!(result, Err(RpcErr::MethodNotFound(_))));
}
fn to_rpc_response_success_value(str: &str) -> serde_json::Value {
serde_json::to_value(serde_json::from_str::<RpcSuccessResponse>(str).unwrap()).unwrap()
}
#[tokio::test]
async fn admin_nodeinfo_request() {
let body = r#"{"jsonrpc":"2.0", "method":"admin_nodeInfo", "params":[], "id":1}"#;
let request: RpcRequest = serde_json::from_str(body).unwrap();
let mut storage =
Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
storage
.set_chain_config(&example_chain_config())
.await
.unwrap();
let context = default_context_with_storage(storage).await;
let local_p2p_node = context.node_data.local_p2p_node.clone();
let enr_url = context.node_data.local_node_record.enr_url().unwrap();
let result = map_http_requests(&request, context).await;
let rpc_response = rpc_response(request.id, result).unwrap();
let blob_schedule = serde_json::json!({
"cancun": { "baseFeeUpdateFraction": 3338477, "max": 6, "target": 3, },
"prague": { "baseFeeUpdateFraction": 5007716, "max": 9, "target": 6, },
"osaka": { "baseFeeUpdateFraction": 5007716, "max": 9, "target": 6, },
"bpo1": { "baseFeeUpdateFraction": 8346193, "max": 15, "target": 10, },
"bpo2": { "baseFeeUpdateFraction": 11684671, "max": 21, "target": 14, },
});
let default_hash = BlockHeader::default().hash();
let json = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"result": {
"enode": "enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@127.0.0.1:30303",
"enr": enr_url,
"id": hex::encode(keccak_hash(local_p2p_node.public_key)),
"ip": "127.0.0.1",
"listenAddr": "127.0.0.1:30303",
"name": "ethrex/v0.1.0-test-abcd1234/x86_64-unknown-linux/rustc-v1.70.0",
"ports": {
"discovery": 30303,
"listener": 30303
},
"protocols": {
"eth": {
"network": 3151908,
"genesis": default_hash,
"config": {
"chainId": 3151908,
"homesteadBlock": 0,
"daoForkBlock": null,
"daoForkSupport": false,
"eip150Block": 0,
"eip155Block": 0,
"eip158Block": 0,
"byzantiumBlock": 0,
"constantinopleBlock": 0,
"petersburgBlock": 0,
"istanbulBlock": 0,
"muirGlacierBlock": null,
"berlinBlock": 0,
"londonBlock": 0,
"arrowGlacierBlock": null,
"grayGlacierBlock": null,
"mergeNetsplitBlock": 0,
"shanghaiTime": 0,
"cancunTime": 0,
"pragueTime": 1718232101,
"verkleTime": null,
"osakaTime": null,
"bpo1Time": null,
"bpo2Time": null,
"bpo3Time": null,
"bpo4Time": null,
"bpo5Time": null,
"amsterdamTime": null,
"terminalTotalDifficulty": "0x0",
"terminalTotalDifficultyPassed": true,
"blobSchedule": blob_schedule,
"depositContractAddress": H160::from_str("0x00000000219ab540356cbb839cbe05303d7705fa").unwrap(),
"enableVerkleAtGenesis": false,
},
"head": default_hash,
}
},
}
});
let expected_response = to_rpc_response_success_value(&json.to_string());
assert_eq!(rpc_response.to_string(), expected_response.to_string())
}
fn read_execution_api_genesis_file() -> Genesis {
let file = File::open("../../../fixtures/genesis/execution-api.json")
.expect("Failed to open genesis file");
let reader = BufReader::new(file);
serde_json::from_reader(reader).expect("Failed to deserialize genesis file")
}
#[tokio::test]
async fn create_access_list_simple_transfer() {
let body = r#"{"jsonrpc":"2.0","id":1,"method":"eth_createAccessList","params":[{"from":"0x0c2c51a0990aee1d73c1228de158688341557508","nonce":"0x0","to":"0x0100000000000000000000000000000000000000","value":"0xa"},"0x00"]}"#;
let request: RpcRequest = serde_json::from_str(body).unwrap();
let mut storage =
Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
let genesis = read_execution_api_genesis_file();
storage
.add_initial_state(genesis)
.await
.expect("Failed to add genesis block to DB");
let context = default_context_with_storage(storage).await;
let result = map_http_requests(&request, context).await;
let response = rpc_response(request.id, result).unwrap();
let expected_response = to_rpc_response_success_value(
r#"{"jsonrpc":"2.0","id":1,"result":{"accessList":[],"gasUsed":"0x5208"}}"#,
);
assert_eq!(response.to_string(), expected_response.to_string());
}
fn example_chain_config() -> ChainConfig {
ChainConfig {
chain_id: 3151908_u64,
homestead_block: Some(0),
eip150_block: Some(0),
eip155_block: Some(0),
eip158_block: Some(0),
byzantium_block: Some(0),
constantinople_block: Some(0),
petersburg_block: Some(0),
istanbul_block: Some(0),
berlin_block: Some(0),
london_block: Some(0),
merge_netsplit_block: Some(0),
shanghai_time: Some(0),
cancun_time: Some(0),
prague_time: Some(1718232101),
terminal_total_difficulty: Some(0),
terminal_total_difficulty_passed: true,
deposit_contract_address: H160::from_str("0x00000000219ab540356cbb839cbe05303d7705fa")
.unwrap(),
..Default::default()
}
}
#[tokio::test]
async fn admin_nodeinfo_large_terminal_total_difficulty() {
let mainnet_ttd: u128 = 58_750_000_000_000_000_000_000;
let body = r#"{"jsonrpc":"2.0", "method":"admin_nodeInfo", "params":[], "id":1}"#;
let request: RpcRequest = serde_json::from_str(body).unwrap();
let mut storage =
Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
let mut config = example_chain_config();
config.terminal_total_difficulty = Some(mainnet_ttd);
storage.set_chain_config(&config).await.unwrap();
let context = default_context_with_storage(storage).await;
let result = map_http_requests(&request, context).await;
assert!(
result.is_ok(),
"admin_nodeInfo should not fail with large terminal_total_difficulty"
);
let value = result.unwrap();
let ttd = value
.pointer("/protocols/eth/config/terminalTotalDifficulty")
.expect("terminalTotalDifficulty should be present in response");
assert_eq!(ttd.as_str().unwrap(), "0xc70d808a128d7380000");
}
#[tokio::test]
async fn net_version_test() {
let body = r#"{"jsonrpc":"2.0","method":"net_version","params":[],"id":67}"#;
let request: RpcRequest = serde_json::from_str(body).expect("serde serialization failed");
let mut storage =
Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB");
storage
.set_chain_config(&example_chain_config())
.await
.unwrap();
let chain_id = storage.get_chain_config().chain_id.to_string();
let context = default_context_with_storage(storage).await;
let result = map_http_requests(&request, context).await;
let response = rpc_response(request.id, result).unwrap();
let expected_response_string =
format!(r#"{{"id":67,"jsonrpc": "2.0","result": "{chain_id}"}}"#);
let expected_response = to_rpc_response_success_value(&expected_response_string);
assert_eq!(response.to_string(), expected_response.to_string());
}
#[tokio::test]
async fn eth_config_request_cancun_with_prague_scheduled() {
let body = r#"{"jsonrpc":"2.0", "method":"eth_config", "params":[], "id":1}"#;
let request: RpcRequest = serde_json::from_str(body).unwrap();
let storage = Store::new_from_genesis(
Path::new("temp.db"),
EngineType::InMemory,
"../../../cmd/ethrex/networks/hoodi/genesis.json",
)
.await
.expect("Failed to create test DB");
let context = default_context_with_storage(storage).await;
let result = map_http_requests(&request, context).await;
let rpc_response = rpc_response(request.id, result).unwrap();
let json = serde_json::json!({
"id": 1,
"jsonrpc": "2.0",
"result": {
"current": {
"activationTime": 0,
"blobSchedule": {
"baseFeeUpdateFraction": 3338477,
"max": 6,
"target": 3
},
"chainId": "0x88bb0",
"forkId": "0xbef71d30",
"precompiles": {
"BLAKE2F": "0x0000000000000000000000000000000000000009",
"BN254_ADD": "0x0000000000000000000000000000000000000006",
"BN254_MUL": "0x0000000000000000000000000000000000000007",
"BN254_PAIRING": "0x0000000000000000000000000000000000000008",
"ECREC": "0x0000000000000000000000000000000000000001",
"ID": "0x0000000000000000000000000000000000000004",
"KZG_POINT_EVALUATION": "0x000000000000000000000000000000000000000a",
"MODEXP": "0x0000000000000000000000000000000000000005",
"RIPEMD160": "0x0000000000000000000000000000000000000003",
"SHA256": "0x0000000000000000000000000000000000000002"
},
"systemContracts": {
"BEACON_ROOTS_ADDRESS": "0x000f3df6d732807ef1319fb7b8bb8522d0beac02"
}
},
"next": {
"activationTime": 1742999832,
"blobSchedule": {
"baseFeeUpdateFraction": 5007716,
"max": 9,
"target": 6
},
"chainId": "0x88bb0",
"forkId": "0x0929e24e",
"precompiles": {
"BLAKE2F": "0x0000000000000000000000000000000000000009",
"BLS12_G1ADD": "0x000000000000000000000000000000000000000b",
"BLS12_G1MSM": "0x000000000000000000000000000000000000000c",
"BLS12_G2ADD": "0x000000000000000000000000000000000000000d",
"BLS12_G2MSM": "0x000000000000000000000000000000000000000e",
"BLS12_MAP_FP2_TO_G2": "0x0000000000000000000000000000000000000011",
"BLS12_MAP_FP_TO_G1": "0x0000000000000000000000000000000000000010",
"BLS12_PAIRING_CHECK": "0x000000000000000000000000000000000000000f",
"BN254_ADD": "0x0000000000000000000000000000000000000006",
"BN254_MUL": "0x0000000000000000000000000000000000000007",
"BN254_PAIRING": "0x0000000000000000000000000000000000000008",
"ECREC": "0x0000000000000000000000000000000000000001",
"ID": "0x0000000000000000000000000000000000000004",
"KZG_POINT_EVALUATION": "0x000000000000000000000000000000000000000a",
"MODEXP": "0x0000000000000000000000000000000000000005",
"RIPEMD160": "0x0000000000000000000000000000000000000003",
"SHA256": "0x0000000000000000000000000000000000000002"
},
"systemContracts": {
"BEACON_ROOTS_ADDRESS": "0x000f3df6d732807ef1319fb7b8bb8522d0beac02",
"CONSOLIDATION_REQUEST_PREDEPLOY_ADDRESS": "0x0000bbddc7ce488642fb579f8b00f3a590007251",
"DEPOSIT_CONTRACT_ADDRESS": "0x00000000219ab540356cbb839cbe05303d7705fa",
"HISTORY_STORAGE_ADDRESS": "0x0000f90827f1c53a10cb7a02335b175320002935",
"WITHDRAWAL_REQUEST_PREDEPLOY_ADDRESS": "0x00000961ef480eb55e80d19ad83579a64c007002"
}
},
"last": {
"activationTime": 1762955544,
"blobSchedule": {
"baseFeeUpdateFraction": 11684671,
"max": 21,
"target": 14,
},
"chainId": "0x88bb0",
"forkId": "0x23aa1351",
"precompiles": {
"BLAKE2F": "0x0000000000000000000000000000000000000009",
"BLS12_G1ADD": "0x000000000000000000000000000000000000000b",
"BLS12_G1MSM": "0x000000000000000000000000000000000000000c",
"BLS12_G2ADD": "0x000000000000000000000000000000000000000d",
"BLS12_G2MSM": "0x000000000000000000000000000000000000000e",
"BLS12_MAP_FP2_TO_G2": "0x0000000000000000000000000000000000000011",
"BLS12_MAP_FP_TO_G1": "0x0000000000000000000000000000000000000010",
"BLS12_PAIRING_CHECK": "0x000000000000000000000000000000000000000f",
"BN254_ADD": "0x0000000000000000000000000000000000000006",
"BN254_MUL": "0x0000000000000000000000000000000000000007",
"BN254_PAIRING": "0x0000000000000000000000000000000000000008",
"ECREC": "0x0000000000000000000000000000000000000001",
"ID": "0x0000000000000000000000000000000000000004",
"KZG_POINT_EVALUATION": "0x000000000000000000000000000000000000000a",
"MODEXP": "0x0000000000000000000000000000000000000005",
"P256VERIFY":"0x0000000000000000000000000000000000000100",
"RIPEMD160": "0x0000000000000000000000000000000000000003",
"SHA256": "0x0000000000000000000000000000000000000002"
},
"systemContracts": {
"BEACON_ROOTS_ADDRESS": "0x000f3df6d732807ef1319fb7b8bb8522d0beac02",
"CONSOLIDATION_REQUEST_PREDEPLOY_ADDRESS": "0x0000bbddc7ce488642fb579f8b00f3a590007251",
"DEPOSIT_CONTRACT_ADDRESS": "0x00000000219ab540356cbb839cbe05303d7705fa",
"HISTORY_STORAGE_ADDRESS": "0x0000f90827f1c53a10cb7a02335b175320002935",
"WITHDRAWAL_REQUEST_PREDEPLOY_ADDRESS": "0x00000961ef480eb55e80d19ad83579a64c007002"
}
},
}
});
let expected_response = to_rpc_response_success_value(&json.to_string());
assert_eq!(rpc_response.to_string(), expected_response.to_string())
}
}